diff --git a/apps/labrinth/.env.docker-compose b/apps/labrinth/.env.docker-compose index a270a77041..1f8ab58d97 100644 --- a/apps/labrinth/.env.docker-compose +++ b/apps/labrinth/.env.docker-compose @@ -12,7 +12,8 @@ DATABASE_URL=postgresql://labrinth:labrinth@labrinth-postgres/labrinth DATABASE_MIN_CONNECTIONS=0 DATABASE_MAX_CONNECTIONS=16 -MEILISEARCH_ADDR=http://labrinth-meilisearch:7700 +MEILISEARCH_READ_ADDR=http://localhost:7700 +MEILISEARCH_WRITE_ADDRS=http://localhost:7700 MEILISEARCH_KEY=modrinth REDIS_URL=redis://labrinth-redis diff --git a/apps/labrinth/.env.local b/apps/labrinth/.env.local index 340ffb6117..10d1d25143 100644 --- a/apps/labrinth/.env.local +++ b/apps/labrinth/.env.local @@ -13,7 +13,13 @@ DATABASE_URL=postgresql://labrinth:labrinth@localhost/labrinth DATABASE_MIN_CONNECTIONS=0 DATABASE_MAX_CONNECTIONS=16 -MEILISEARCH_ADDR=http://localhost:7700 +MEILISEARCH_READ_ADDR=http://localhost:7700 +MEILISEARCH_WRITE_ADDRS=http://localhost:7700 + +# # For a sharded Meilisearch setup (sharded-meilisearch docker compose profile) +# MEILISEARCH_READ_ADDR=http://localhost:7710 +# MEILISEARCH_WRITE_ADDRS=http://localhost:7700,http://localhost:7701 + MEILISEARCH_KEY=modrinth REDIS_URL=redis://localhost diff --git a/apps/labrinth/nginx/meili-lb.conf b/apps/labrinth/nginx/meili-lb.conf new file mode 100644 index 0000000000..855998baab --- /dev/null +++ b/apps/labrinth/nginx/meili-lb.conf @@ -0,0 +1,14 @@ +upstream meilisearch_upstream { + server meilisearch0:7700; + server meilisearch1:7700; +} + +server { + listen 80; + + location / { + proxy_pass http://meilisearch_upstream; + proxy_set_header Host $host; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } +} \ No newline at end of file diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index e42cc3f81d..ae6a5e900a 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -19,6 +19,7 @@ use crate::background_task::update_versions; use crate::database::ReadOnlyPgPool; use crate::queue::billing::{index_billing, index_subscriptions}; use crate::queue::moderation::AutomatedModerationQueue; +use crate::search::MeilisearchReadClient; use crate::util::anrok; use crate::util::archon::ArchonClient; use crate::util::env::{parse_strings_from_var, parse_var}; @@ -68,6 +69,7 @@ pub struct LabrinthConfig { pub email_queue: web::Data, pub archon_client: web::Data, pub gotenberg_client: GotenbergClient, + pub search_read_client: web::Data, } #[allow(clippy::too_many_arguments)] @@ -274,6 +276,11 @@ pub fn app_setup( file_host, scheduler: Arc::new(scheduler), ip_salt, + search_read_client: web::Data::new( + search_config.make_loadbalanced_read_client().expect( + "Failed to make Meilisearch client for read operations", + ), + ), search_config, session_queue, payouts_queue: web::Data::new(PayoutsQueue::new()), @@ -325,6 +332,7 @@ pub fn app_config( .app_data(labrinth_config.archon_client.clone()) .app_data(web::Data::new(labrinth_config.stripe_client.clone())) .app_data(web::Data::new(labrinth_config.anrok_client.clone())) + .app_data(labrinth_config.search_read_client.clone()) .app_data(labrinth_config.rate_limiter.clone()) .configure({ #[cfg(target_os = "linux")] @@ -373,7 +381,8 @@ pub fn check_env_vars() -> bool { failed |= check_var::("LABRINTH_EXTERNAL_NOTIFICATION_KEY"); failed |= check_var::("RATE_LIMIT_IGNORE_KEY"); failed |= check_var::("DATABASE_URL"); - failed |= check_var::("MEILISEARCH_ADDR"); + failed |= check_var::("MEILISEARCH_READ_ADDR"); + failed |= check_var::("MEILISEARCH_WRITE_ADDRS"); failed |= check_var::("MEILISEARCH_KEY"); failed |= check_var::("REDIS_URL"); failed |= check_var::("BIND_ADDR"); diff --git a/apps/labrinth/src/routes/v2/projects.rs b/apps/labrinth/src/routes/v2/projects.rs index 234c458635..664148cc2e 100644 --- a/apps/labrinth/src/routes/v2/projects.rs +++ b/apps/labrinth/src/routes/v2/projects.rs @@ -13,7 +13,9 @@ use crate::queue::moderation::AutomatedModerationQueue; use crate::queue::session::AuthQueue; use crate::routes::v3::projects::ProjectIds; use crate::routes::{ApiError, v2_reroute, v3}; -use crate::search::{SearchConfig, SearchError, search_for_project}; +use crate::search::{ + MeilisearchReadClient, SearchConfig, SearchError, search_for_project, +}; use actix_web::{HttpRequest, HttpResponse, delete, get, patch, post, web}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; @@ -54,6 +56,7 @@ pub fn config(cfg: &mut web::ServiceConfig) { pub async fn project_search( web::Query(info): web::Query, config: web::Data, + read_client: web::Data, ) -> Result { // Search now uses loader_fields instead of explicit 'client_side' and 'server_side' fields // While the backend for this has changed, it doesnt affect much @@ -99,7 +102,7 @@ pub async fn project_search( ..info }; - let results = search_for_project(&info, &config).await?; + let results = search_for_project(&info, &config, &read_client).await?; let results = LegacySearchResults::from(results); diff --git a/apps/labrinth/src/routes/v3/projects.rs b/apps/labrinth/src/routes/v3/projects.rs index 0b93d9896d..e35c40b6d0 100644 --- a/apps/labrinth/src/routes/v3/projects.rs +++ b/apps/labrinth/src/routes/v3/projects.rs @@ -25,7 +25,9 @@ use crate::queue::moderation::AutomatedModerationQueue; use crate::queue::session::AuthQueue; use crate::routes::ApiError; use crate::search::indexing::remove_documents; -use crate::search::{SearchConfig, SearchError, search_for_project}; +use crate::search::{ + MeilisearchReadClient, SearchConfig, SearchError, search_for_project, +}; use crate::util::img; use crate::util::img::{delete_old_images, upload_image_optimized}; use crate::util::routes::read_limited_from_payload; @@ -1018,8 +1020,9 @@ pub async fn edit_project_categories( pub async fn project_search( web::Query(info): web::Query, config: web::Data, + read_client: web::Data, ) -> Result { - let results = search_for_project(&info, &config).await?; + let results = search_for_project(&info, &config, &read_client).await?; // TODO: add this back // let results = ReturnSearchResults { diff --git a/apps/labrinth/src/search/indexing/mod.rs b/apps/labrinth/src/search/indexing/mod.rs index 89f98d2fa0..52c8c5d909 100644 --- a/apps/labrinth/src/search/indexing/mod.rs +++ b/apps/labrinth/src/search/indexing/mod.rs @@ -7,14 +7,14 @@ use crate::database::redis::RedisPool; use crate::search::{SearchConfig, UploadSearchProject}; use ariadne::ids::base62_impl::to_base62; use futures::StreamExt; -use futures::stream::FuturesUnordered; +use futures::stream::FuturesOrdered; use local_import::index_local; use meilisearch_sdk::client::{Client, SwapIndexes}; use meilisearch_sdk::indexes::Index; use meilisearch_sdk::settings::{PaginationSetting, Settings}; use sqlx::postgres::PgPool; use thiserror::Error; -use tracing::info; +use tracing::{info, trace}; #[derive(Error, Debug)] pub enum IndexingError { @@ -43,30 +43,37 @@ pub async fn remove_documents( config: &SearchConfig, ) -> Result<(), meilisearch_sdk::errors::Error> { let mut indexes = get_indexes_for_indexing(config, false).await?; - let mut indexes_next = get_indexes_for_indexing(config, true).await?; - indexes.append(&mut indexes_next); + let indexes_next = get_indexes_for_indexing(config, true).await?; - let client = config.make_client()?; + for list in &mut indexes { + for alt_list in &indexes_next { + list.extend(alt_list.iter().cloned()); + } + } + + let client = config.make_batch_client()?; let client = &client; - let mut deletion_tasks = FuturesUnordered::new(); - for index in &indexes { - deletion_tasks.push(async move { - // After being successfully submitted, Meilisearch tasks are executed - // asynchronously, so wait some time for them to complete - index - .delete_documents( - &ids.iter().map(|x| to_base62(x.0)).collect::>(), - ) - .await? - .wait_for_completion( - client, - None, - Some(Duration::from_secs(15)), - ) - .await - }); - } + let ids_base62 = ids.iter().map(|x| to_base62(x.0)).collect::>(); + let mut deletion_tasks = FuturesOrdered::new(); + + client.across_all(indexes, |index_list, client| { + for index in index_list { + let owned_client = client.clone(); + let ids_base62_ref = &ids_base62; + deletion_tasks.push_back(async move { + index + .delete_documents(ids_base62_ref) + .await? + .wait_for_completion( + &owned_client, + None, + Some(Duration::from_secs(15)), + ) + .await + }); + } + }); while let Some(result) = deletion_tasks.next().await { result?; @@ -82,14 +89,20 @@ pub async fn index_projects( ) -> Result<(), IndexingError> { info!("Indexing projects."); + trace!("Ensuring current indexes exists"); // First, ensure current index exists (so no error happens- current index should be worst-case empty, not missing) get_indexes_for_indexing(config, false).await?; + trace!("Deleting surplus indexes"); // Then, delete the next index if it still exists let indices = get_indexes_for_indexing(config, true).await?; - for index in indices { - index.delete().await?; + for client_indices in indices { + for index in client_indices { + index.delete().await?; + } } + + trace!("Recreating next index"); // Recreate the next index for indexing let indices = get_indexes_for_indexing(config, true).await?; @@ -103,15 +116,24 @@ pub async fn index_projects( .collect::>(); let uploads = index_local(&pool).await?; - add_projects(&indices, uploads, all_loader_fields.clone(), config).await?; + + add_projects_batch_client( + &indices, + uploads, + all_loader_fields.clone(), + config, + ) + .await?; // Swap the index swap_index(config, "projects").await?; swap_index(config, "projects_filtered").await?; // Delete the now-old index - for index in indices { - index.delete().await?; + for index_list in indices { + for index in index_list { + index.delete().await?; + } } info!("Done adding projects."); @@ -122,17 +144,24 @@ pub async fn swap_index( config: &SearchConfig, index_name: &str, ) -> Result<(), IndexingError> { - let client = config.make_client()?; + let client = config.make_batch_client()?; let index_name_next = config.get_index_name(index_name, true); let index_name = config.get_index_name(index_name, false); let swap_indices = SwapIndexes { indexes: (index_name_next, index_name), rename: None, }; + + let swap_indices_ref = &swap_indices; + client - .swap_indexes([&swap_indices]) - .await? - .wait_for_completion(&client, None, Some(TIMEOUT)) + .with_all_clients("swap_indexes", |client| async move { + client + .swap_indexes([swap_indices_ref]) + .await? + .wait_for_completion(client, None, Some(TIMEOUT)) + .await + }) .await?; Ok(()) @@ -141,41 +170,52 @@ pub async fn swap_index( pub async fn get_indexes_for_indexing( config: &SearchConfig, next: bool, // Get the 'next' one -) -> Result, meilisearch_sdk::errors::Error> { - let client = config.make_client()?; +) -> Result>, meilisearch_sdk::errors::Error> { + let client = config.make_batch_client()?; let project_name = config.get_index_name("projects", next); let project_filtered_name = config.get_index_name("projects_filtered", next); - let projects_index = create_or_update_index( - &client, - &project_name, - Some(&[ - "words", - "typo", - "proximity", - "attribute", - "exactness", - "sort", - ]), - ) - .await?; - let projects_filtered_index = create_or_update_index( - &client, - &project_filtered_name, - Some(&[ - "sort", - "words", - "typo", - "proximity", - "attribute", - "exactness", - ]), - ) - .await?; - Ok(vec![projects_index, projects_filtered_index]) + let project_name_ref = &project_name; + let project_filtered_name_ref = &project_filtered_name; + + let results = client + .with_all_clients("get_indexes_for_indexing", |client| async move { + let projects_index = create_or_update_index( + client, + project_name_ref, + Some(&[ + "words", + "typo", + "proximity", + "attribute", + "exactness", + "sort", + ]), + ) + .await?; + let projects_filtered_index = create_or_update_index( + client, + project_filtered_name_ref, + Some(&[ + "sort", + "words", + "typo", + "proximity", + "attribute", + "exactness", + ]), + ) + .await?; + + Ok(vec![projects_index, projects_filtered_index]) + }) + .await?; + + Ok(results) } +#[tracing::instrument(skip_all, fields(%name))] async fn create_or_update_index( client: &Client, name: &str, @@ -302,16 +342,40 @@ async fn update_and_add_to_index( Ok(()) } -pub async fn add_projects( - indices: &[Index], +pub async fn add_projects_batch_client( + indices: &[Vec], projects: Vec, additional_fields: Vec, config: &SearchConfig, ) -> Result<(), IndexingError> { - let client = config.make_client()?; - for index in indices { - update_and_add_to_index(&client, index, &projects, &additional_fields) - .await?; + let client = config.make_batch_client()?; + + let index_references = indices + .iter() + .map(|x| x.iter().collect()) + .collect::>>(); + + let mut tasks = FuturesOrdered::new(); + + client.across_all(index_references, |index_list, client| { + for index in index_list { + let owned_client = client.clone(); + let projects_ref = &projects; + let additional_fields_ref = &additional_fields; + tasks.push_back(async move { + update_and_add_to_index( + &owned_client, + index, + projects_ref, + additional_fields_ref, + ) + .await + }); + } + }); + + while let Some(result) = tasks.next().await { + result?; } Ok(()) diff --git a/apps/labrinth/src/search/mod.rs b/apps/labrinth/src/search/mod.rs index 78e9941ea7..2b9f1ba74a 100644 --- a/apps/labrinth/src/search/mod.rs +++ b/apps/labrinth/src/search/mod.rs @@ -3,6 +3,8 @@ use crate::models::projects::SearchRequest; use actix_web::HttpResponse; use actix_web::http::StatusCode; use chrono::{DateTime, Utc}; +use futures::TryStreamExt; +use futures::stream::FuturesOrdered; use itertools::Itertools; use meilisearch_sdk::client::Client; use serde::{Deserialize, Serialize}; @@ -11,6 +13,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::fmt::Write; use thiserror::Error; +use tracing::{Instrument, info_span}; pub mod indexing; @@ -58,9 +61,71 @@ impl actix_web::ResponseError for SearchError { } } +#[derive(Debug, Clone)] +pub struct MeilisearchReadClient { + pub client: Client, +} + +impl std::ops::Deref for MeilisearchReadClient { + type Target = Client; + + fn deref(&self) -> &Self::Target { + &self.client + } +} + +pub struct BatchClient { + pub clients: Vec, +} + +impl BatchClient { + pub fn new(clients: Vec) -> Self { + Self { clients } + } + + pub async fn with_all_clients<'a, T, G, Fut>( + &'a self, + task_name: &str, + generator: G, + ) -> Result, meilisearch_sdk::errors::Error> + where + G: Fn(&'a Client) -> Fut, + Fut: Future> + 'a, + { + let mut tasks = FuturesOrdered::new(); + for (idx, client) in self.clients.iter().enumerate() { + tasks.push_back(generator(client).instrument(info_span!( + "client_task", + task.name = task_name, + client.idx = idx, + ))); + } + + let results = tasks.try_collect::>().await?; + Ok(results) + } + + pub fn across_all(&self, data: Vec, mut predicate: F) -> Vec + where + F: FnMut(T, &Client) -> R, + { + assert_eq!( + data.len(), + self.clients.len(), + "mismatch between data len and meilisearch client count" + ); + self.clients + .iter() + .zip(data) + .map(|(client, item)| predicate(item, client)) + .collect() + } +} + #[derive(Debug, Clone)] pub struct SearchConfig { - pub address: String, + pub addresses: Vec, + pub read_lb_address: String, pub key: String, pub meta_namespace: String, } @@ -69,22 +134,48 @@ impl SearchConfig { // Panics if the environment variables are not set, // but these are already checked for on startup. pub fn new(meta_namespace: Option) -> Self { - let address = - dotenvy::var("MEILISEARCH_ADDR").expect("MEILISEARCH_ADDR not set"); + let address_many = dotenvy::var("MEILISEARCH_WRITE_ADDRS") + .expect("MEILISEARCH_WRITE_ADDRS not set"); + + let read_lb_address = dotenvy::var("MEILISEARCH_READ_ADDR") + .expect("MEILISEARCH_READ_ADDR not set"); + + let addresses = address_many + .split(',') + .filter(|s| !s.trim().is_empty()) + .map(|s| s.to_string()) + .collect::>(); + let key = dotenvy::var("MEILISEARCH_KEY").expect("MEILISEARCH_KEY not set"); Self { - address, + addresses, key, meta_namespace: meta_namespace.unwrap_or_default(), + read_lb_address, } } - pub fn make_client( + pub fn make_loadbalanced_read_client( + &self, + ) -> Result { + Ok(MeilisearchReadClient { + client: Client::new(&self.read_lb_address, Some(&self.key))?, + }) + } + + pub fn make_batch_client( &self, - ) -> Result { - Client::new(self.address.as_str(), Some(self.key.as_str())) + ) -> Result { + Ok(BatchClient::new( + self.addresses + .iter() + .map(|address| { + Client::new(address.as_str(), Some(self.key.as_str())) + }) + .collect::, _>>()?, + )) } // Next: true if we want the next index (we are preparing the next swap), false if we want the current index (searching) @@ -192,9 +283,8 @@ pub fn get_sort_index( pub async fn search_for_project( info: &SearchRequest, config: &SearchConfig, + client: &MeilisearchReadClient, ) -> Result { - let client = Client::new(&*config.address, Some(&*config.key))?; - let offset: usize = info.offset.as_deref().unwrap_or("0").parse()?; let index = info.index.as_deref().unwrap_or("relevance"); let limit = info diff --git a/docker-compose.yml b/docker-compose.yml index 8ec3603919..235f794ae7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,13 +13,15 @@ services: POSTGRES_PASSWORD: labrinth POSTGRES_HOST_AUTH_METHOD: trust healthcheck: - test: ['CMD', 'pg_isready', '-U', 'labrinth'] + test: [ 'CMD', 'pg_isready', '-U', 'labrinth' ] interval: 3s timeout: 5s retries: 3 - meilisearch: + meilisearch0: image: getmeili/meilisearch:v1.12.0 - container_name: labrinth-meilisearch + container_name: labrinth-meilisearch0 + networks: + - meilisearch-mesh restart: on-failure ports: - '127.0.0.1:7700:7700' @@ -30,7 +32,7 @@ services: MEILI_HTTP_PAYLOAD_SIZE_LIMIT: 107374182400 MEILI_LOG_LEVEL: warn healthcheck: - test: ['CMD', 'curl', '--fail', 'http://localhost:7700/health'] + test: [ 'CMD', 'curl', '--fail', 'http://localhost:7700/health' ] interval: 3s timeout: 5s retries: 3 @@ -43,7 +45,7 @@ services: volumes: - redis-data:/data healthcheck: - test: ['CMD', 'redis-cli', 'PING'] + test: [ 'CMD', 'redis-cli', 'PING' ] interval: 3s timeout: 5s retries: 3 @@ -56,7 +58,7 @@ services: CLICKHOUSE_USER: default CLICKHOUSE_PASSWORD: default healthcheck: - test: ['CMD-SHELL', 'clickhouse-client --query "SELECT 1"'] + test: [ 'CMD-SHELL', 'clickhouse-client --query "SELECT 1"' ] interval: 3s timeout: 5s retries: 3 @@ -69,14 +71,7 @@ services: environment: MP_ENABLE_SPAMASSASSIN: postmark healthcheck: - test: - [ - 'CMD', - 'wget', - '-q', - '-O/dev/null', - 'http://localhost:8025/api/v1/info', - ] + test: [ 'CMD', 'wget', '-q', '-O/dev/null', 'http://localhost:8025/api/v1/info' ] interval: 3s timeout: 5s retries: 3 @@ -127,8 +122,7 @@ services: LABRINTH_ENDPOINT: http://host.docker.internal:8000/_internal/delphi/ingest LABRINTH_ADMIN_KEY: feedbeef healthcheck: - test: - ['CMD', 'wget', '-q', '-O/dev/null', 'http://localhost:59999/health'] + test: [ 'CMD', 'wget', '-q', '-O/dev/null', 'http://localhost:59999/health' ] interval: 3s timeout: 5s retries: 3 @@ -140,8 +134,52 @@ services: # Delphi must send a message on a webhook to our backend, # so it must have access to our local network - 'host.docker.internal:host-gateway' + + # Sharded Meilisearch + meilisearch1: + profiles: + - sharded-meilisearch + image: getmeili/meilisearch:v1.12.0 + container_name: labrinth-meilisearch1 + restart: on-failure + networks: + - meilisearch-mesh + ports: + - '127.0.0.1:7701:7700' + volumes: + - meilisearch1-data:/data.ms + environment: + MEILI_MASTER_KEY: modrinth + MEILI_HTTP_PAYLOAD_SIZE_LIMIT: 107374182400 + MEILI_LOG_LEVEL: warn + healthcheck: + test: [ 'CMD', 'curl', '--fail', 'http://localhost:7700/health' ] + interval: 3s + timeout: 5s + retries: 3 + + nginx-meilisearch-lb: + profiles: + - sharded-meilisearch + image: nginx:alpine + container_name: labrinth-meili-lb + networks: + - meilisearch-mesh + depends_on: + meilisearch0: + condition: service_healthy + meilisearch1: + condition: service_healthy + ports: + - '127.0.0.1:7710:80' + volumes: + - ./apps/labrinth/nginx/meili-lb.conf:/etc/nginx/conf.d/default.conf:ro +networks: + meilisearch-mesh: + driver: bridge volumes: meilisearch-data: + meilisearch1-data: db-data: redis-data: labrinth-cdn-data: