Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions rust/blockstore/src/arrow/block/delta/ordered_block_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ impl OrderedBlockDelta {
}
}

pub fn len(&self) -> usize {
self.builder.len()
}

fn copy_up_to<'me, K: ArrowReadableKey<'me>, V: ArrowReadableValue<'me>>(
&'me mut self,
excluded_prefix: &str,
Expand Down
123 changes: 54 additions & 69 deletions rust/blockstore/src/arrow/ordered_blockfile_writer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use super::block::delta::types::Delta;
use super::block::delta::OrderedBlockDelta;
use super::migrations::apply_migrations_to_blockfile;
use super::migrations::MigrationError;
Expand All @@ -10,14 +9,14 @@ use super::{
flusher::ArrowBlockfileFlusher,
types::{ArrowWriteableKey, ArrowWriteableValue},
};
use crate::arrow::block::Block;
use crate::arrow::root::CURRENT_VERSION;
use crate::arrow::sparse_index::SparseIndexWriter;
use crate::key::CompositeKey;
use chroma_error::ChromaError;
use chroma_error::ErrorCodes;
use chroma_types::Cmek;
use itertools::Itertools;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::Arc;
use thiserror::Error;
Expand All @@ -36,8 +35,8 @@ struct Inner {
remaining_block_stack: VecDeque<BlockIdAndEndKey>,
/// Holds the current block delta and its end key. When we receive a write past the end key, this delta is moved into `completed_block_deltas`.
current_block_delta: Option<CurrentDeltaAndEndKey>,
/// Deltas in this vec can no longer receive writes and are ready to be committed.
completed_block_deltas: Vec<OrderedBlockDelta>,
/// Blocks in this vec can no longer receive writes and are ready to be committed.
completed_blocks: Vec<Block>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -90,7 +89,7 @@ impl ArrowOrderedBlockfileWriter {
id,
inner: Arc::new(Mutex::new(Inner {
current_block_delta: Some((initial_block, None)),
completed_block_deltas: Vec::new(),
completed_blocks: Vec::new(),
remaining_block_stack: VecDeque::new(),
})),
cmek,
Expand Down Expand Up @@ -133,7 +132,7 @@ impl ArrowOrderedBlockfileWriter {
id,
inner: Arc::new(Mutex::new(Inner {
current_block_delta: None,
completed_block_deltas: Vec::new(),
completed_blocks: Vec::new(),
remaining_block_stack,
})),
cmek,
Expand All @@ -145,68 +144,26 @@ impl ArrowOrderedBlockfileWriter {
) -> Result<ArrowBlockfileFlusher, Box<dyn ChromaError>> {
let mut inner = std::mem::take(&mut *self.inner.lock().await);

Self::complete_current_delta::<K, V>(&mut inner);

let mut split_block_deltas = Vec::new();
for delta in inner.completed_block_deltas.drain(..) {
// Don't we split on-mutation (.set() calls)?
// Yes, but that is only a performance optimization. For correctness, we must also split on commit. Why?
//
// We need to defer copying old forked data until:
// - we receive a set()/delete() for a later key
// - we are committing the delta (it will receive no further writes)
//
// Because of this constraint, we cannot always effectively split on-mutation if the writer is over a forked blockfile. Imagine this scenario:
// 1. There is 1 existing block whose size == limit.
// 2. We receive a .set() for a key before the existing block's start key.
// 3. We turn the existing block into a delta and add the new KV pair.
// 4. At this point, the total size of the delta (materialized + pending forked data) is above the limit.
// 5. We would like to split our delta into two immediately after the newly-added key. However, this means that the right half of the split is empty (there is no materialized data), which violates a fundamental assumption made by our blockstore code. And we cannot materialize only the first key in the right half from the pending forked data because that would violate the above constraint.
//
// Thus, we handle splitting in two places:
//
// 1. Split deltas in half on-mutation if the materialized size is over the limit (just a performance optimization).
// 2. During the commit phase, after all deltas have been fully materialized, split if necessary.
//
// An alternative would be to create a fresh delta that does not fork from an existing block if we receive a .set() for a key that is not contained in any existing block key range, however this complicates writing logic and potentially increases fragmentation.
if delta.get_size::<K, V>() > self.root.max_block_size_bytes {
let split_blocks = delta.split::<K, V>(self.root.max_block_size_bytes);
for (split_key, split_delta) in split_blocks {
self.root
.sparse_index
.add_block(split_key, split_delta.id)
.map_err(|e| Box::new(e) as Box<dyn ChromaError>)?;
split_block_deltas.push(split_delta);
}
}
split_block_deltas.push(delta);
}
self.complete_current_delta::<K, V>(&mut inner).await?;

let mut blocks = Vec::new();
let mut new_block_ids = HashSet::new();
for delta in split_block_deltas.drain(..) {
new_block_ids.insert(delta.id());
let mut removed = false;
// Skip empty blocks. Also, remove from sparse index.
if delta.len() == 0 {
tracing::info!("Delta with id {:?} is empty", delta.id());
removed = self.root.sparse_index.remove_block(&delta.id());
}
if !removed {
let mut blocks = Vec::with_capacity(inner.completed_blocks.len());
for block in inner.completed_blocks {
if block.len() > 0 || !self.root.sparse_index.remove_block(&block.id) {
self.root
.sparse_index
.set_count(delta.id(), delta.len() as u32)
.set_count(block.id, block.len() as u32)
.map_err(|e| Box::new(e) as Box<dyn ChromaError>)?;
let block = self.block_manager.commit::<K, V>(delta).await;
blocks.push(block);
}
}

apply_migrations_to_blockfile(&mut self.root, &self.block_manager, &new_block_ids)
.await
.map_err(|e| {
Box::new(ArrowBlockfileError::MigrationError(e)) as Box<dyn ChromaError>
})?;
apply_migrations_to_blockfile(
&mut self.root,
&self.block_manager,
&blocks.iter().map(|block| block.id).collect(),
)
.await
.map_err(|e| Box::new(ArrowBlockfileError::MigrationError(e)) as Box<dyn ChromaError>)?;

let count = self
.root
Expand All @@ -231,11 +188,28 @@ impl ArrowOrderedBlockfileWriter {
Ok(flusher)
}

fn complete_current_delta<K: ArrowWriteableKey, V: ArrowWriteableValue>(inner: &mut Inner) {
if let Some((mut delta, _)) = inner.current_block_delta.take() {
delta.copy_to_end::<K, V>();
inner.completed_block_deltas.push(delta);
async fn complete_current_delta<K: ArrowWriteableKey, V: ArrowWriteableValue>(
&self,
inner: &mut Inner,
) -> Result<(), Box<dyn ChromaError>> {
let Some((mut delta, _)) = inner.current_block_delta.take() else {
return Ok(());
};
delta.copy_to_end::<K, V>();
if delta.get_size::<K, V>() > self.root.max_block_size_bytes {
let split_blocks = delta.split::<K, V>(self.root.max_block_size_bytes);
for (split_key, split_delta) in split_blocks {
self.root
.sparse_index
.add_block(split_key, split_delta.id)
.map_err(|e| Box::new(e) as Box<dyn ChromaError>)?;
let block = self.block_manager.commit::<K, V>(split_delta).await;
inner.completed_blocks.push(block);
}
}
let block = self.block_manager.commit::<K, V>(delta).await;
inner.completed_blocks.push(block);
Ok(())
}

async fn swap_current_delta<K: ArrowWriteableKey, V: ArrowWriteableValue>(
Expand All @@ -244,7 +218,7 @@ impl ArrowOrderedBlockfileWriter {
new_delta_block_id: &Uuid,
new_delta_end_key: Option<CompositeKey>,
) -> Result<(), Box<dyn ChromaError>> {
Self::complete_current_delta::<K, V>(inner);
self.complete_current_delta::<K, V>(inner).await?;

let new_delta = self
.block_manager
Expand Down Expand Up @@ -325,8 +299,7 @@ impl ArrowOrderedBlockfileWriter {
delta.get_size::<K, V>()
};

let max_block_size_bytes = self.root.max_block_size_bytes;
if current_materialized_delta_size > max_block_size_bytes {
if current_materialized_delta_size > self.root.max_block_size_bytes {
let (mut current_delta, current_end_key) = inner
.current_block_delta
.take()
Expand All @@ -343,7 +316,19 @@ impl ArrowOrderedBlockfileWriter {
)
.map_err(|e| Box::new(e) as Box<dyn ChromaError>)?;

inner.completed_block_deltas.push(current_delta);
if current_delta.get_size::<K, V>() > self.root.max_block_size_bytes {
let split_blocks = current_delta.split::<K, V>(self.root.max_block_size_bytes);
for (split_key, split_delta) in split_blocks {
self.root
.sparse_index
.add_block(split_key, split_delta.id)
.map_err(|e| Box::new(e) as Box<dyn ChromaError>)?;
let block = self.block_manager.commit::<K, V>(split_delta).await;
inner.completed_blocks.push(block);
}
}
let block = self.block_manager.commit::<K, V>(current_delta).await;
inner.completed_blocks.push(block);
inner.current_block_delta = Some((new_delta, current_end_key));
}

Expand Down Expand Up @@ -970,7 +955,7 @@ mod tests {
inner: Arc::new(Mutex::new(Inner {
remaining_block_stack: VecDeque::new(),
current_block_delta: Some((initial_block, None)),
completed_block_deltas: Vec::new(),
completed_blocks: Vec::new(),
})),
cmek: None,
};
Expand Down
Loading