Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/blockstore/src/arrow/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
});

let block_iters: Vec<_> = futures::stream::iter(block_futures)
.buffer_unordered(self.block_manager.max_concurrent_block_loads())
.buffered(self.block_manager.max_concurrent_block_loads())
.try_collect()
.await?;

Expand Down Expand Up @@ -691,7 +691,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me

let blocks: Vec<&Block> = if !block_futures_is_empty {
futures::stream::iter(block_futures)
.buffer_unordered(self.block_manager.max_concurrent_block_loads())
.buffered(self.block_manager.max_concurrent_block_loads())
.try_collect()
.await?
} else {
Expand Down
145 changes: 145 additions & 0 deletions rust/index/src/sparse/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,4 +680,149 @@ mod tests {
"Should have 3 results from mixed query"
);
}

/// Regression test: when a dimension's block-max entries span multiple blocks
/// and the dimension has no updates in the next batch, the block maxes are
/// copied forward from the old reader via get_block_maxes() -> get_prefix().
/// get_prefix() must return entries in sorted key order; otherwise the ordered
/// max_writer panics on out-of-order keys.
#[tokio::test]
async fn test_incremental_commit_multiblock_block_maxes_ordering() {
use chroma_types::SignedRoaringBitmap;

// Use the small test block size (16KB) so we can span multiple blocks
// without needing millions of entries.
let (_temp_dir, provider) = test_arrow_blockfile_provider(1024);

let num_offsets: u32 = 2000;
let sparse_block_size: u32 = 2;

// =================================================================
// Batch 1: Write many offsets on dimension 1 so its block-max
// entries span multiple blocks in the max blockfile.
// With block_size=2, 2000 offsets produce 1000 block-max entries.
// =================================================================
let max_writer = provider
.write::<u32, f32>(BlockfileWriterOptions::new("".to_string()).ordered_mutations())
.await
.unwrap();
let offset_value_writer = provider
.write::<u32, f32>(BlockfileWriterOptions::new("".to_string()).ordered_mutations())
.await
.unwrap();

let writer = SparseWriter::new(sparse_block_size, max_writer, offset_value_writer, None);

for offset in 0..num_offsets {
writer.set(offset, vec![(1, 0.1 * (offset as f32))]).await;
}

let flusher = Box::pin(writer.commit()).await.unwrap();
let max_id_1 = flusher.max_id();
let ov_id_1 = flusher.offset_value_id();
Box::pin(flusher.flush()).await.unwrap();

// Verify dimension 1's block maxes span multiple blocks
let max_r1 = provider
.read::<u32, f32>(BlockfileReaderOptions::new(max_id_1, "".to_string()))
.await
.unwrap();
let dim1_encoded = encode_u32(1);
let block_maxes_batch1: Vec<(u32, f32)> =
max_r1.get_prefix(&dim1_encoded).await.unwrap().collect();
assert!(
block_maxes_batch1.len() > 1,
"Dimension 1 should have multiple block-max entries, got {}",
block_maxes_batch1.len()
);

// =================================================================
// Batch 2: Write ONLY to dimension 2 (disjoint from dimension 1).
// Dimension 1 has no delta, so its block maxes must be copied
// forward from the old reader via get_block_maxes().
// =================================================================
let max_writer2 = provider
.write::<u32, f32>(BlockfileWriterOptions::new("".to_string()).ordered_mutations())
.await
.unwrap();
let offset_value_writer2 = provider
.write::<u32, f32>(
BlockfileWriterOptions::new("".to_string())
.ordered_mutations()
.fork(ov_id_1),
)
.await
.unwrap();

let max_r = provider
.read::<u32, f32>(BlockfileReaderOptions::new(max_id_1, "".to_string()))
.await
.unwrap();
let ov_r = provider
.read::<u32, f32>(BlockfileReaderOptions::new(ov_id_1, "".to_string()))
.await
.unwrap();
let old_reader = SparseReader::new(max_r, ov_r);

let writer2 = SparseWriter::new(
sparse_block_size,
max_writer2,
offset_value_writer2,
Some(old_reader),
);
writer2.set(0, vec![(2, 1.0)]).await;

// This commit would panic with "Keys are not in order" if
// get_prefix() returns block-max entries out of order.
let flusher2 = Box::pin(writer2.commit()).await.unwrap();
let max_id_2 = flusher2.max_id();
let ov_id_2 = flusher2.offset_value_id();
Box::pin(flusher2.flush()).await.unwrap();

// =================================================================
// Verify: dimension 1's block maxes are present and sorted
// =================================================================
let final_max = provider
.read::<u32, f32>(BlockfileReaderOptions::new(max_id_2, "".to_string()))
.await
.unwrap();
let final_ov = provider
.read::<u32, f32>(BlockfileReaderOptions::new(ov_id_2, "".to_string()))
.await
.unwrap();

let block_maxes_batch2: Vec<(u32, f32)> =
final_max.get_prefix(&dim1_encoded).await.unwrap().collect();
assert_eq!(
block_maxes_batch1.len(),
block_maxes_batch2.len(),
"Dimension 1 block-max count should be preserved across incremental commit"
);

for window in block_maxes_batch2.windows(2) {
assert!(
window[0].0 < window[1].0,
"Block-max keys must be in strictly ascending order, but found {} >= {}",
window[0].0,
window[1].0
);
}

// Verify dimension 2 also exists
assert!(
final_max.get(DIMENSION_PREFIX, 2).await.unwrap().is_some(),
"Dimension 2 should have a DIM entry"
);

// Verify WAND still works for dimension 1
let reader = SparseReader::new(final_max, final_ov);
let results = reader
.wand(vec![(1, 1.0)], 5, SignedRoaringBitmap::full())
.await
.unwrap();
assert!(
!results.is_empty(),
"WAND on dimension 1 should return results after incremental commit"
);
}
}
Loading