Skip to content

Commit 9e8ef82

Browse files
psteinroeclaude
andcommitted
perf(sink): use concurrent index operations for Meilisearch
- Index documents to all target indexes concurrently with try_join_all - Improves throughput when routing to multiple indexes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 6728afa commit 9e8ef82

1 file changed

Lines changed: 37 additions & 26 deletions

File tree

src/sink/meilisearch.rs

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::collections::HashMap;
2424
use std::sync::Arc;
2525

2626
use etl::error::EtlResult;
27+
use futures::future::try_join_all;
2728
use meilisearch_sdk::client::Client;
2829
use serde::{Deserialize, Serialize};
2930

@@ -168,32 +169,42 @@ impl Sink for MeilisearchSink {
168169
.push(doc);
169170
}
170171

171-
// Index documents to each target index.
172-
for (index_name, documents) in index_documents {
173-
let index = self.client.index(&index_name);
174-
175-
let task = index
176-
.add_documents(&documents, Some("id"))
177-
.await
178-
.map_err(|e| {
179-
etl::etl_error!(
180-
etl::error::ErrorKind::DestinationError,
181-
"Failed to add documents to Meilisearch",
182-
e.to_string()
183-
)
184-
})?;
185-
186-
// Wait for the task to complete.
187-
task.wait_for_completion(&self.client, None, None)
188-
.await
189-
.map_err(|e| {
190-
etl::etl_error!(
191-
etl::error::ErrorKind::DestinationError,
192-
"Failed to wait for Meilisearch task",
193-
e.to_string()
194-
)
195-
})?;
196-
}
172+
// Index documents to all target indexes concurrently.
173+
let futures: Vec<_> = index_documents
174+
.into_iter()
175+
.map(|(index_name, documents)| {
176+
let client = self.client.clone();
177+
async move {
178+
let index = client.index(&index_name);
179+
180+
let task = index
181+
.add_documents(&documents, Some("id"))
182+
.await
183+
.map_err(|e| {
184+
etl::etl_error!(
185+
etl::error::ErrorKind::DestinationError,
186+
"Failed to add documents to Meilisearch",
187+
e.to_string()
188+
)
189+
})?;
190+
191+
// Wait for the task to complete.
192+
task.wait_for_completion(&client, None, None)
193+
.await
194+
.map_err(|e| {
195+
etl::etl_error!(
196+
etl::error::ErrorKind::DestinationError,
197+
"Failed to wait for Meilisearch task",
198+
e.to_string()
199+
)
200+
})?;
201+
202+
Ok::<_, etl::error::EtlError>(())
203+
}
204+
})
205+
.collect();
206+
207+
try_join_all(futures).await?;
197208

198209
Ok(())
199210
}

0 commit comments

Comments
 (0)