Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,15 @@ impl CompactionManager {
let collections = match self.context.sysdb.get_collections(options).await {
Ok(collections) => collections,
Err(e) => {
// TODO(tanujnay112): Propagate error up and then handle it there.
tracing::error!("Failed to get collections in rebuild: {}", e);
tracing::error!(
"Rebuild failed at sysdb.get_collections for collection IDs {:?}: {}",
collection_ids,
e
);
return;
}
};
let _ = collections
let results = collections
.iter()
.filter_map(|collection| {
match chroma_types::DatabaseName::new(collection.database.clone()) {
Expand All @@ -299,6 +302,25 @@ impl CompactionManager {
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await;

// Log results
for (result, collection) in results.into_iter().zip(collections.iter()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important

[Logic] There are actually two bugs causing the zip pairing between results and collections to be incorrect:

  1. filter_map misalignment: If any collection has an invalid DatabaseName, it's filtered out, making results shorter than collections. The Nth result will then be logged as belonging to the wrong collection.

  2. FuturesUnordered ordering (even more critical): FuturesUnordered yields results in arbitrary completion order — whichever future finishes first, not insertion order. So even if every collection has a valid database name, results will be in a random order that doesn't match collections, and zip will mis-pair them virtually every time there are 2+ concurrent rebuilds.

The fix is to carry the collection reference through the future chain so each result is self-describing, rather than relying on positional alignment:

let results = collections
    .iter()
    .filter_map(|collection| {
        match chroma_types::DatabaseName::new(collection.database.clone()) {
            Some(database_name) => Some((
                collection,
                self.context.clone().compact(
                    collection.collection_id,
                    database_name,
                    collection.tenant.clone(),
                    true,
                    segment_scopes.clone(),
                ),
            )),
            None => {
                tracing::error!(
                    "Invalid database name '{}' for collection {} (must be at least 3 characters)",
                    collection.database,
                    collection.collection_id
                );
                None
            }
        }
    })
    .map(|(collection, fut)| async move { (collection.collection_id, fut.await) })
    .collect::<FuturesUnordered<_>>()
    .collect::<Vec<_>>()
    .await;

Then log using the collection_id embedded in each tuple, removing the zip entirely.

Context for Agents
There are actually **two** bugs causing the `zip` pairing between `results` and `collections` to be incorrect:

1. **`filter_map` misalignment**: If any collection has an invalid `DatabaseName`, it's filtered out, making `results` shorter than `collections`. The Nth result will then be logged as belonging to the wrong collection.

2. **`FuturesUnordered` ordering** (even more critical): `FuturesUnordered` yields results in arbitrary completion order — whichever future finishes first, not insertion order. So even if every collection has a valid database name, `results` will be in a random order that doesn't match `collections`, and `zip` will mis-pair them virtually every time there are 2+ concurrent rebuilds.

The fix is to carry the collection reference through the future chain so each result is self-describing, rather than relying on positional alignment:

```rust
let results = collections
    .iter()
    .filter_map(|collection| {
        match chroma_types::DatabaseName::new(collection.database.clone()) {
            Some(database_name) => Some((
                collection,
                self.context.clone().compact(
                    collection.collection_id,
                    database_name,
                    collection.tenant.clone(),
                    true,
                    segment_scopes.clone(),
                ),
            )),
            None => {
                tracing::error!(
                    "Invalid database name '{}' for collection {} (must be at least 3 characters)",
                    collection.database,
                    collection.collection_id
                );
                None
            }
        }
    })
    .map(|(collection, fut)| async move { (collection.collection_id, fut.await) })
    .collect::<FuturesUnordered<_>>()
    .collect::<Vec<_>>()
    .await;
```
Then log using the `collection_id` embedded in each tuple, removing the `zip` entirely.

File: rust/worker/src/compactor/compaction_manager.rs
Line: 304

match result {
Ok(_) => {
tracing::info!(
"Rebuild succeeded for collection {}",
collection.collection_id
);
}
Err(e) => {
tracing::error!(
"Rebuild failed for collection {}: {}",
collection.collection_id,
e
);
}
}
}
}

#[instrument(name = "CompactionManager::purge_dirty_log", skip(ctx))]
Expand Down
Loading