Skip to content

Commit acde656

Browse files
authored
[ENH] Commit eagerly in ordered blockfile writer (#6109)
## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - N/A - New functionality - Eagerly commit blocks in ordered blockfile writer to avoid excessive memory usage ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent 97b1a01 commit acde656

File tree

2 files changed

+54
-73
lines changed

2 files changed

+54
-73
lines changed

rust/blockstore/src/arrow/block/delta/ordered_block_delta.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,6 @@ impl OrderedBlockDelta {
127127
}
128128
}
129129

130-
pub fn len(&self) -> usize {
131-
self.builder.len()
132-
}
133-
134130
fn copy_up_to<'me, K: ArrowReadableKey<'me>, V: ArrowReadableValue<'me>>(
135131
&'me mut self,
136132
excluded_prefix: &str,

rust/blockstore/src/arrow/ordered_blockfile_writer.rs

Lines changed: 54 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use super::block::delta::types::Delta;
21
use super::block::delta::OrderedBlockDelta;
32
use super::migrations::apply_migrations_to_blockfile;
43
use super::migrations::MigrationError;
@@ -10,14 +9,14 @@ use super::{
109
flusher::ArrowBlockfileFlusher,
1110
types::{ArrowWriteableKey, ArrowWriteableValue},
1211
};
12+
use crate::arrow::block::Block;
1313
use crate::arrow::root::CURRENT_VERSION;
1414
use crate::arrow::sparse_index::SparseIndexWriter;
1515
use crate::key::CompositeKey;
1616
use chroma_error::ChromaError;
1717
use chroma_error::ErrorCodes;
1818
use chroma_types::Cmek;
1919
use itertools::Itertools;
20-
use std::collections::HashSet;
2120
use std::collections::VecDeque;
2221
use std::sync::Arc;
2322
use thiserror::Error;
@@ -36,8 +35,8 @@ struct Inner {
3635
remaining_block_stack: VecDeque<BlockIdAndEndKey>,
3736
/// Holds the current block delta and its end key. When we receive a write past the end key, this delta is moved into `completed_block_deltas`.
3837
current_block_delta: Option<CurrentDeltaAndEndKey>,
39-
/// Deltas in this vec can no longer receive writes and are ready to be committed.
40-
completed_block_deltas: Vec<OrderedBlockDelta>,
38+
/// Blocks in this vec can no longer receive writes and are ready to be committed.
39+
completed_blocks: Vec<Block>,
4140
}
4241

4342
#[derive(Clone)]
@@ -90,7 +89,7 @@ impl ArrowOrderedBlockfileWriter {
9089
id,
9190
inner: Arc::new(Mutex::new(Inner {
9291
current_block_delta: Some((initial_block, None)),
93-
completed_block_deltas: Vec::new(),
92+
completed_blocks: Vec::new(),
9493
remaining_block_stack: VecDeque::new(),
9594
})),
9695
cmek,
@@ -133,7 +132,7 @@ impl ArrowOrderedBlockfileWriter {
133132
id,
134133
inner: Arc::new(Mutex::new(Inner {
135134
current_block_delta: None,
136-
completed_block_deltas: Vec::new(),
135+
completed_blocks: Vec::new(),
137136
remaining_block_stack,
138137
})),
139138
cmek,
@@ -145,68 +144,26 @@ impl ArrowOrderedBlockfileWriter {
145144
) -> Result<ArrowBlockfileFlusher, Box<dyn ChromaError>> {
146145
let mut inner = std::mem::take(&mut *self.inner.lock().await);
147146

148-
Self::complete_current_delta::<K, V>(&mut inner);
149-
150-
let mut split_block_deltas = Vec::new();
151-
for delta in inner.completed_block_deltas.drain(..) {
152-
// Don't we split on-mutation (.set() calls)?
153-
// Yes, but that is only a performance optimization. For correctness, we must also split on commit. Why?
154-
//
155-
// We need to defer copying old forked data until:
156-
// - we receive a set()/delete() for a later key
157-
// - we are committing the delta (it will receive no further writes)
158-
//
159-
// Because of this constraint, we cannot always effectively split on-mutation if the writer is over a forked blockfile. Imagine this scenario:
160-
// 1. There is 1 existing block whose size == limit.
161-
// 2. We receive a .set() for a key before the existing block's start key.
162-
// 3. We turn the existing block into a delta and add the new KV pair.
163-
// 4. At this point, the total size of the delta (materialized + pending forked data) is above the limit.
164-
// 5. We would like to split our delta into two immediately after the newly-added key. However, this means that the right half of the split is empty (there is no materialized data), which violates a fundamental assumption made by our blockstore code. And we cannot materialize only the first key in the right half from the pending forked data because that would violate the above constraint.
165-
//
166-
// Thus, we handle splitting in two places:
167-
//
168-
// 1. Split deltas in half on-mutation if the materialized size is over the limit (just a performance optimization).
169-
// 2. During the commit phase, after all deltas have been fully materialized, split if necessary.
170-
//
171-
// An alternative would be to create a fresh delta that does not fork from an existing block if we receive a .set() for a key that is not contained in any existing block key range, however this complicates writing logic and potentially increases fragmentation.
172-
if delta.get_size::<K, V>() > self.root.max_block_size_bytes {
173-
let split_blocks = delta.split::<K, V>(self.root.max_block_size_bytes);
174-
for (split_key, split_delta) in split_blocks {
175-
self.root
176-
.sparse_index
177-
.add_block(split_key, split_delta.id)
178-
.map_err(|e| Box::new(e) as Box<dyn ChromaError>)?;
179-
split_block_deltas.push(split_delta);
180-
}
181-
}
182-
split_block_deltas.push(delta);
183-
}
147+
self.complete_current_delta::<K, V>(&mut inner).await?;
184148

185-
let mut blocks = Vec::new();
186-
let mut new_block_ids = HashSet::new();
187-
for delta in split_block_deltas.drain(..) {
188-
new_block_ids.insert(delta.id());
189-
let mut removed = false;
190-
// Skip empty blocks. Also, remove from sparse index.
191-
if delta.len() == 0 {
192-
tracing::info!("Delta with id {:?} is empty", delta.id());
193-
removed = self.root.sparse_index.remove_block(&delta.id());
194-
}
195-
if !removed {
149+
let mut blocks = Vec::with_capacity(inner.completed_blocks.len());
150+
for block in inner.completed_blocks {
151+
if block.len() > 0 || !self.root.sparse_index.remove_block(&block.id) {
196152
self.root
197153
.sparse_index
198-
.set_count(delta.id(), delta.len() as u32)
154+
.set_count(block.id, block.len() as u32)
199155
.map_err(|e| Box::new(e) as Box<dyn ChromaError>)?;
200-
let block = self.block_manager.commit::<K, V>(delta).await;
201156
blocks.push(block);
202157
}
203158
}
204159

205-
apply_migrations_to_blockfile(&mut self.root, &self.block_manager, &new_block_ids)
206-
.await
207-
.map_err(|e| {
208-
Box::new(ArrowBlockfileError::MigrationError(e)) as Box<dyn ChromaError>
209-
})?;
160+
apply_migrations_to_blockfile(
161+
&mut self.root,
162+
&self.block_manager,
163+
&blocks.iter().map(|block| block.id).collect(),
164+
)
165+
.await
166+
.map_err(|e| Box::new(ArrowBlockfileError::MigrationError(e)) as Box<dyn ChromaError>)?;
210167

211168
let count = self
212169
.root
@@ -231,11 +188,28 @@ impl ArrowOrderedBlockfileWriter {
231188
Ok(flusher)
232189
}
233190

234-
fn complete_current_delta<K: ArrowWriteableKey, V: ArrowWriteableValue>(inner: &mut Inner) {
235-
if let Some((mut delta, _)) = inner.current_block_delta.take() {
236-
delta.copy_to_end::<K, V>();
237-
inner.completed_block_deltas.push(delta);
191+
async fn complete_current_delta<K: ArrowWriteableKey, V: ArrowWriteableValue>(
192+
&self,
193+
inner: &mut Inner,
194+
) -> Result<(), Box<dyn ChromaError>> {
195+
let Some((mut delta, _)) = inner.current_block_delta.take() else {
196+
return Ok(());
197+
};
198+
delta.copy_to_end::<K, V>();
199+
if delta.get_size::<K, V>() > self.root.max_block_size_bytes {
200+
let split_blocks = delta.split::<K, V>(self.root.max_block_size_bytes);
201+
for (split_key, split_delta) in split_blocks {
202+
self.root
203+
.sparse_index
204+
.add_block(split_key, split_delta.id)
205+
.map_err(|e| Box::new(e) as Box<dyn ChromaError>)?;
206+
let block = self.block_manager.commit::<K, V>(split_delta).await;
207+
inner.completed_blocks.push(block);
208+
}
238209
}
210+
let block = self.block_manager.commit::<K, V>(delta).await;
211+
inner.completed_blocks.push(block);
212+
Ok(())
239213
}
240214

241215
async fn swap_current_delta<K: ArrowWriteableKey, V: ArrowWriteableValue>(
@@ -244,7 +218,7 @@ impl ArrowOrderedBlockfileWriter {
244218
new_delta_block_id: &Uuid,
245219
new_delta_end_key: Option<CompositeKey>,
246220
) -> Result<(), Box<dyn ChromaError>> {
247-
Self::complete_current_delta::<K, V>(inner);
221+
self.complete_current_delta::<K, V>(inner).await?;
248222

249223
let new_delta = self
250224
.block_manager
@@ -325,8 +299,7 @@ impl ArrowOrderedBlockfileWriter {
325299
delta.get_size::<K, V>()
326300
};
327301

328-
let max_block_size_bytes = self.root.max_block_size_bytes;
329-
if current_materialized_delta_size > max_block_size_bytes {
302+
if current_materialized_delta_size > self.root.max_block_size_bytes {
330303
let (mut current_delta, current_end_key) = inner
331304
.current_block_delta
332305
.take()
@@ -343,7 +316,19 @@ impl ArrowOrderedBlockfileWriter {
343316
)
344317
.map_err(|e| Box::new(e) as Box<dyn ChromaError>)?;
345318

346-
inner.completed_block_deltas.push(current_delta);
319+
if current_delta.get_size::<K, V>() > self.root.max_block_size_bytes {
320+
let split_blocks = current_delta.split::<K, V>(self.root.max_block_size_bytes);
321+
for (split_key, split_delta) in split_blocks {
322+
self.root
323+
.sparse_index
324+
.add_block(split_key, split_delta.id)
325+
.map_err(|e| Box::new(e) as Box<dyn ChromaError>)?;
326+
let block = self.block_manager.commit::<K, V>(split_delta).await;
327+
inner.completed_blocks.push(block);
328+
}
329+
}
330+
let block = self.block_manager.commit::<K, V>(current_delta).await;
331+
inner.completed_blocks.push(block);
347332
inner.current_block_delta = Some((new_delta, current_end_key));
348333
}
349334

@@ -970,7 +955,7 @@ mod tests {
970955
inner: Arc::new(Mutex::new(Inner {
971956
remaining_block_stack: VecDeque::new(),
972957
current_block_delta: Some((initial_block, None)),
973-
completed_block_deltas: Vec::new(),
958+
completed_blocks: Vec::new(),
974959
})),
975960
cmek: None,
976961
};

0 commit comments

Comments
 (0)