From edfc65a795bfc90d76a0ebd8b136f9e4b615e8a7 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 21 Oct 2025 18:21:49 +0000 Subject: [PATCH 1/5] First draft --- kernel/Cargo.toml | 1 + kernel/src/actions/deletion_vector.rs | 306 +++++++- kernel/src/actions/deletion_vector_writer.rs | 711 +++++++++++++++++++ kernel/src/actions/mod.rs | 1 + kernel/src/transaction/mod.rs | 97 +++ 5 files changed, 1097 insertions(+), 19 deletions(-) create mode 100644 kernel/src/actions/deletion_vector_writer.rs diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index eb21b3702..8e5190754 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -42,6 +42,7 @@ pre-release-hook = [ delta_kernel_derive = { path = "../derive-macros", version = "0.16.0" } bytes = "1.10" chrono = "0.4.41" +crc = "3.2.2" indexmap = "2.10.0" itertools = "0.14" roaring = "0.11.2" diff --git a/kernel/src/actions/deletion_vector.rs b/kernel/src/actions/deletion_vector.rs index 2c269f036..4485f6a0b 100644 --- a/kernel/src/actions/deletion_vector.rs +++ b/kernel/src/actions/deletion_vector.rs @@ -10,6 +10,8 @@ use delta_kernel_derive::ToSchema; use roaring::RoaringTreemap; use url::Url; +use crc::{Crc, CRC_32_ISO_HDLC}; + use crate::schema::DataType; use crate::utils::require; use crate::{DeltaResult, Error, StorageHandler}; @@ -57,6 +59,53 @@ impl ToDataType for DeletionVectorStorageType { } } +pub struct DeletionVectorPath { + table_path: Url, + uuid: uuid::Uuid, + prefix: String, +} + +impl DeletionVectorPath { + pub(crate) fn new(table_path: Url, prefix: String) -> Self { + Self { + table_path, + uuid: uuid::Uuid::new_v4(), + prefix, + } + } + + #[cfg(test)] + pub(crate) fn new_with_uuid(table_path: Url, prefix: String, uuid: uuid::Uuid) -> Self { + Self { + table_path, + uuid, + prefix, + } + } + + fn relative_path(prefix: &str, uuid: &uuid::Uuid) -> String { + let uuid_as_string = uuid::Uuid::to_string(uuid); + if !prefix.is_empty() { + format!("{prefix}/deletion_vector_{uuid_as_string}.bin") + } else { + format!("deletion_vector_{uuid_as_string}.bin") + } + } + + pub fn absolute_path(&self) -> DeltaResult { + let dv_suffix = Self::relative_path(&self.prefix, &self.uuid); + let dv_path = self + .table_path + .join(&dv_suffix) + .map_err(|_| Error::DeletionVector(format!("invalid path: {dv_suffix}")))?; + Ok(dv_path) + } + + pub(crate) fn encoded_relative_path(&self) -> String { + format!("{}{}", self.prefix, z85::encode(self.uuid.as_bytes())) + } +} + #[derive(Debug, Clone, PartialEq, Eq, ToSchema)] #[cfg_attr(test, derive(serde::Serialize), serde(rename_all = "camelCase"))] pub struct DeletionVectorDescriptor { @@ -122,14 +171,8 @@ impl DeletionVectorDescriptor { .map_err(|_| Error::deletion_vector("Failed to decode DV uuid"))?; let uuid = uuid::Uuid::from_slice(&decoded) .map_err(|err| Error::DeletionVector(err.to_string()))?; - let dv_suffix = if prefix_len > 0 { - format!( - "{}/deletion_vector_{uuid}.bin", - &self.path_or_inline_dv[..prefix_len] - ) - } else { - format!("deletion_vector_{uuid}.bin") - }; + let dv_suffix = + DeletionVectorPath::relative_path(&self.path_or_inline_dv[..prefix_len], &uuid); let dv_path = parent .join(&dv_suffix) .map_err(|_| Error::DeletionVector(format!("invalid path: {dv_suffix}")))?; @@ -154,6 +197,16 @@ impl DeletionVectorDescriptor { &self, storage: Arc, parent: &Url, + ) -> DeltaResult { + self.read_with_possible_validation(storage, parent, false) + } + + /// Same as above, but optionally validate the CRC32 checksum in the file. + pub(crate) fn read_with_possible_validation( + &self, + storage: Arc, + parent: &Url, + validate_crc: bool, ) -> DeltaResult { match self.absolute_path(parent)? { None => { @@ -206,16 +259,35 @@ impl DeletionVectorDescriptor { ); // get the Bytes back out and limit it to dv_size - let position = cursor.position(); - let mut bytes = cursor.into_inner(); - let truncate_pos = position + dv_size as u64; - assert!( - truncate_pos <= usize::MAX as u64, - "Can't truncate as truncate_pos is > usize::MAX" - ); - bytes.truncate(truncate_pos as usize); - let mut cursor = Cursor::new(bytes); - cursor.set_position(position); + let position = cursor.position() as usize; + let bytes = cursor.into_inner(); + // -4 to remove CRC portion. + let truncate_pos = position + dv_size as usize - 4; + + if validate_crc { + require!( + bytes.len() >= truncate_pos + 4, + Error::DeletionVector(format!( + "Can't validate CRC as there are not enough bytes {} < {}", + bytes.len(), + truncate_pos + 4 + )) + ); + let mut crc_cursor: Cursor = + Cursor::new(bytes.slice(truncate_pos..truncate_pos + 4)); + let crc = read_u32(&mut crc_cursor, Endian::Big)?; + let crc32 = create_dv_crc32(); + // -4 to include magic portion of the CRC + let expected_crc = crc32.checksum(&bytes.slice(position - 4..truncate_pos)); + require!( + crc == expected_crc, + Error::DeletionVector(format!( + "CRC32 checksum mismatch: {crc} != {expected_crc}" + )) + ); + } + let dv_bytes = bytes.slice(position..truncate_pos); + let cursor = Cursor::new(dv_bytes.clone()); RoaringTreemap::deserialize_from(cursor) .map_err(|err| Error::DeletionVector(err.to_string())) } @@ -238,6 +310,12 @@ enum Endian { Little, } +/// Factory function to create a CRC-32 instance using the ISO HDLC algorithm. +/// This ensures consistent CRC algorithm usage for deletion vectors. +pub(crate) fn create_dv_crc32() -> Crc { + Crc::::new(&CRC_32_ISO_HDLC) +} + /// small helper to read a big or little endian u32 from a cursor fn read_u32(cursor: &mut Cursor, endian: Endian) -> DeltaResult { let mut buf = [0; 4]; @@ -437,7 +515,11 @@ mod tests { let storage = sync_engine.storage_handler(); let example = dv_example(); - let tree_map = example.read(storage, &parent).unwrap(); + let tree_map = example.read(storage.clone(), &parent).unwrap(); + let validated_treemap = example + .read_with_possible_validation(storage.clone(), &parent, true) + .unwrap(); + assert_eq!(tree_map, validated_treemap); let expected: Vec = vec![0, 9]; let found = tree_map.iter().collect::>(); @@ -562,4 +644,190 @@ mod tests { assert_eq!(variant, parsed); } } + + #[test] + fn test_deletion_vector_path_absolute_path_uniqueness() { + // Verify that two DeletionVectorPath instances created with the same arguments + // produce different absolute paths due to unique UUIDs + let table_path = Url::parse("file:///tmp/test_table/").unwrap(); + let prefix = String::from("deletion_vectors"); + + let dv_path1 = DeletionVectorPath::new(table_path.clone(), prefix.clone()); + let dv_path2 = DeletionVectorPath::new(table_path.clone(), prefix.clone()); + + let abs_path1 = dv_path1.absolute_path().unwrap(); + let abs_path2 = dv_path2.absolute_path().unwrap(); + + // The absolute paths should be different because each DeletionVectorPath + // gets a unique UUID + assert_ne!(abs_path1, abs_path2); + + // But they should share the same parent path + assert_eq!(abs_path1.join("..").unwrap(), abs_path2.join("..").unwrap()); + } + + #[test] + fn test_deletion_vector_path_absolute_path_with_prefix() { + let table_path = Url::parse("file:///tmp/test_table/").unwrap(); + let prefix = String::from("dv"); + let known_uuid = uuid::Uuid::parse_str("abcdef01-2345-6789-abcd-ef0123456789").unwrap(); + + let dv_path = DeletionVectorPath::new_with_uuid(table_path.clone(), prefix, known_uuid); + let abs_path = dv_path.absolute_path().unwrap(); + + // Verify the exact path with known UUID + let expected = + "file:///tmp/test_table/dv/deletion_vector_abcdef01-2345-6789-abcd-ef0123456789.bin"; + assert_eq!(abs_path.as_str(), expected); + } + + #[test] + fn test_deletion_vector_path_absolute_path_with_known_uuid() { + // Test with a known UUID to verify exact path construction + let table_path = Url::parse("file:///tmp/test_table/").unwrap(); + let prefix = String::from("dv"); + let known_uuid = uuid::Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap(); + + let dv_path = DeletionVectorPath::new_with_uuid(table_path, prefix, known_uuid); + let abs_path = dv_path.absolute_path().unwrap(); + + // Verify the exact path is constructed correctly + let expected_path = + "file:///tmp/test_table/dv/deletion_vector_550e8400-e29b-41d4-a716-446655440000.bin"; + assert_eq!(abs_path.as_str(), expected_path); + } + + #[test] + fn test_deletion_vector_path_absolute_path_with_known_uuid_empty_prefix() { + // Test with a known UUID and empty prefix + let table_path = Url::parse("file:///tmp/test_table/").unwrap(); + let prefix = String::from(""); + let known_uuid = uuid::Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000").unwrap(); + + let dv_path = DeletionVectorPath::new_with_uuid(table_path, prefix, known_uuid); + let abs_path = dv_path.absolute_path().unwrap(); + + // Verify the exact path is constructed correctly without prefix directory + let expected_path = + "file:///tmp/test_table/deletion_vector_123e4567-e89b-12d3-a456-426614174000.bin"; + assert_eq!(abs_path.as_str(), expected_path); + } + + #[test] + fn test_deletion_vector_path_encoded_with_known_uuid() { + // Test encoded relative path with a known UUID + let table_path = Url::parse("file:///tmp/test_table/").unwrap(); + let prefix = String::from("prefix"); + let known_uuid = uuid::Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap(); + + let dv_path = DeletionVectorPath::new_with_uuid(table_path, prefix.clone(), known_uuid); + let encoded = dv_path.encoded_relative_path(); + + // The encoded path should start with the prefix + assert!(encoded.starts_with(&prefix)); + + // Verify it has the correct length (prefix + 20 for z85 encoded UUID) + assert_eq!(encoded.len(), prefix.len() + 20); + + // Verify the z85 encoding of the UUID (550e8400-e29b-41d4-a716-446655440000 -> rsTVZ&*Sl-RXRWjryu/!) + assert_eq!(encoded, "prefixrsTVZ&*Sl-RXRWjryu/!"); + } + + #[test] + fn test_deletion_vector_path_absolute_path_empty_prefix() { + let table_path = Url::parse("file:///tmp/test_table/").unwrap(); + let prefix = String::from(""); + + let dv_path = DeletionVectorPath::new(table_path.clone(), prefix); + let abs_path = dv_path.absolute_path().unwrap(); + + // With empty prefix, the deletion vector file should be directly under table_path + let path_str = abs_path.as_str(); + assert!(path_str.starts_with("file:///tmp/test_table/deletion_vector_")); + assert!(path_str.ends_with(".bin")); + } + + #[test] + fn test_deletion_vector_path_encoded_relative_path() { + let table_path = Url::parse("file:///tmp/test_table/").unwrap(); + let prefix = String::from("dv"); + + let dv_path = DeletionVectorPath::new(table_path, prefix.clone()); + let encoded = dv_path.encoded_relative_path(); + + // The encoded path should start with the prefix + assert!(encoded.starts_with(&prefix)); + + // The encoded path should be at least prefix length + 20 (z85 encoded UUID) + assert!(encoded.len() >= prefix.len() + 20); + + // The part after the prefix should be exactly 20 characters (z85 encoded 16-byte UUID) + assert_eq!(encoded.len() - prefix.len(), 20); + } + + #[test] + fn test_deletion_vector_path_encoded_relative_path_empty_prefix() { + let table_path = Url::parse("file:///tmp/test_table/").unwrap(); + let prefix = String::from(""); + + let dv_path = DeletionVectorPath::new(table_path, prefix); + let encoded = dv_path.encoded_relative_path(); + + // With empty prefix, the encoded path should be exactly 20 characters + assert_eq!(encoded.len(), 20); + } + + #[test] + fn test_deletion_vector_path_with_s3_url() { + let table_path = Url::parse("s3://my-bucket/warehouse/delta_table/").unwrap(); + let prefix = String::from("deletion_vectors"); + + let dv_path = DeletionVectorPath::new(table_path, prefix); + let abs_path = dv_path.absolute_path().unwrap(); + + // Should maintain the s3 scheme + assert!(abs_path.as_str().starts_with("s3://")); + assert!(abs_path.as_str().contains("my-bucket")); + assert!(abs_path + .as_str() + .contains("/deletion_vectors/deletion_vector_")); + } + + #[test] + fn test_deletion_vector_path_with_nested_prefix() { + let table_path = Url::parse("file:///data/warehouse/my_table/").unwrap(); + let prefix = String::from("_delta_log/deletion_vectors"); + + let dv_path = DeletionVectorPath::new(table_path, prefix.clone()); + let abs_path = dv_path.absolute_path().unwrap(); + + // Should contain the nested path structure + assert!(abs_path + .as_str() + .contains("/_delta_log/deletion_vectors/deletion_vector_")); + + let encoded = dv_path.encoded_relative_path(); + // The encoded path should start with the full prefix + assert!(encoded.starts_with(&prefix)); + } + + #[test] + fn test_deletion_vector_path_encoded_uniqueness() { + // Verify that encoded relative paths are unique for different instances + let table_path = Url::parse("file:///tmp/test_table/").unwrap(); + let prefix = String::from("dv"); + + let dv_path1 = DeletionVectorPath::new(table_path.clone(), prefix.clone()); + let dv_path2 = DeletionVectorPath::new(table_path.clone(), prefix.clone()); + + let encoded1 = dv_path1.encoded_relative_path(); + let encoded2 = dv_path2.encoded_relative_path(); + + // Each instance should have a different encoded path due to unique UUIDs + assert_ne!(encoded1, encoded2); + + // But they should both start with the same prefix + assert!(encoded1.starts_with("dv")); + assert!(encoded2.starts_with("dv")); + } } diff --git a/kernel/src/actions/deletion_vector_writer.rs b/kernel/src/actions/deletion_vector_writer.rs new file mode 100644 index 000000000..d53e51054 --- /dev/null +++ b/kernel/src/actions/deletion_vector_writer.rs @@ -0,0 +1,711 @@ +//! Code for writing deletion vectors to object storage. +//! +//! This module provides APIs for engines to write deletion vectors as part of a Delta transaction. +//! It follows the design outlined in the "Deletion Vector Writes in Rust Kernel" design document. + +use std::io::Write; + +use roaring::RoaringTreemap; + +use crate::actions::deletion_vector::{ + create_dv_crc32, DeletionVectorDescriptor, DeletionVectorPath, DeletionVectorStorageType, +}; +use crate::{DeltaResult, Error}; + +/// A trait that allows engines to provide deletion vectors in various formats. +/// +/// Engines can implement this trait to provide their own deletion vector implementations, +/// or use the provided [`KernelDeletionVector`] implementation backed by RoaringTreemap. +/// +/// # Examples +/// +/// ```rust,ignore +/// use delta_kernel::actions::deletion_vector_writer::DeletionVector; +/// +/// struct MyDeletionVector { +/// deleted_indexes: Vec, +/// } +/// +/// impl DeletionVector for MyDeletionVector { +/// type IndexIterator = std::vec::IntoIter; +/// +/// fn into_iter(self) -> Self::IndexIterator { +/// self.deleted_indexes.into_iter() +/// } +/// } +/// ``` +pub trait DeletionVector: Sized { + /// Iterator type that yields deleted row indexes. + type IndexIterator: Iterator; + + /// Consume the deletion vector and return an iterator over deleted row indexes. + fn into_iter(self) -> Self::IndexIterator; + + /// Convert the deletion vector into a RoaringTreemap. + /// + /// This method has a default implementation that collects from the iterator, + /// but types can override it for more efficient conversion. + fn into_roaring_treemap(self) -> RoaringTreemap { + self.into_iter().collect() + } +} + +/// Metadata about a written deletion vector, excluding the storage path. +/// +/// This structure contains the information needed to construct a full +/// [`DeletionVectorDescriptor`](crate::actions::deletion_vector::DeletionVectorDescriptor) +/// after writing the DV to storage. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DeletionVectorWriteResult { + /// Start of the data for this DV in number of bytes from the beginning of the file. + /// Does not include CRC length or size in bytes prefix. + pub offset: i32, + + /// Size of the serialized DV in bytes (raw data size). + pub size_in_bytes: i32, + + /// Number of rows the deletion vector logically removes from the file. + pub cardinality: i64, +} + +impl DeletionVectorWriteResult { + /// Convert the write result to a deletion vector descriptor. + /// + /// As an implementation detail, this method will always use the persisted relative storage type. + /// + /// # Arguments + /// + /// * `path` - The path to the deletion vector file. + pub fn to_descriptor(self, path: &DeletionVectorPath) -> DeletionVectorDescriptor { + DeletionVectorDescriptor { + storage_type: DeletionVectorStorageType::PersistedRelative, + path_or_inline_dv: path.encoded_relative_path(), + offset: Some(self.offset), + size_in_bytes: self.size_in_bytes, + cardinality: self.cardinality, + } + } +} + +/// A Kernel-provided deletion vector implementation backed by [`RoaringTreemap`]. +/// +/// This is the default implementation that engines can use. It provides memory-efficient +/// storage of deleted row indexes using compressed bitmaps. +/// +/// # Examples +/// +/// ```rust,ignore +/// use delta_kernel::actions::deletion_vector_writer::KernelDeletionVector; +/// +/// let mut dv = KernelDeletionVector::new(); +/// dv.add_deleted_row_indexes([0, 5, 10]; +/// ``` +#[derive(Debug, Clone)] +pub struct KernelDeletionVector { + dv: RoaringTreemap, +} + +impl Default for KernelDeletionVector { + fn default() -> Self { + Self::new() + } +} + +impl KernelDeletionVector { + /// Create a new empty deletion vector. + pub fn new() -> Self { + Self { + dv: RoaringTreemap::new(), + } + } + + /// Adds indexes to be deleted to this deletion vector. + pub fn add_deleted_row_indexes(&mut self, iter: impl Iterator) { + for index in iter { + self.dv.insert(index); + } + } + /// Get the number of deleted rows in this deletion vector. + pub fn cardinality(&self) -> u64 { + self.dv.len() + } +} + +impl DeletionVector for KernelDeletionVector { + type IndexIterator = roaring::treemap::IntoIter; + + fn into_iter(self) -> Self::IndexIterator { + self.dv.into_iter() + } + + /// Optimized conversion that returns the internal RoaringTreemap directly. + fn into_roaring_treemap(self) -> RoaringTreemap { + self.dv + } +} + +/// A streaming writer for deletion vectors. +/// +/// This writer allows for writing multiple deletion vectors to a single file in a streaming +/// fashion, which is memory-efficient for distributed workloads where deletion vectors are +/// generated on executors. +/// +/// # Format +/// +/// The writer produces deletion vector files in the Delta Lake format: +/// - The first byte of the file is a version byte (currently 1) +/// - Each DV is prefixed with a 4-byte size (big-endian) of the serialized data +/// - Followed by a 4-byte magic number (0x64485871, little-endian) +/// - Followed by the serialized RoaringTreemap +/// - Followed by a 4-byte CRC32 checksum (big-endian) of the serialized data +/// +/// # Examples +/// +/// ```rust,ignore +/// use std::io::Cursor; +/// use delta_kernel::actions::deletion_vector_writer::{StreamingDeletionVectorWriter, KernelDeletionVector}; +/// +/// let mut buffer = Vec::new(); +/// let mut writer = StreamingDeletionVectorWriter::new(&mut buffer); +/// +/// let mut dv = KernelDeletionVector::new(); +/// dv.add_deleted_row_indexes([1, 5, 10].iter().copied()); +/// +/// let descriptor = writer.write_deletion_vector(dv)?; +/// writer.finalize()?; +/// # Ok::<(), delta_kernel::Error>(()) +/// ``` +pub struct StreamingDeletionVectorWriter<'a, W: Write> { + writer: &'a mut W, + current_offset: i32, + has_written_version: bool, +} + +impl<'a, W: Write> StreamingDeletionVectorWriter<'a, W> { + /// Create a new streaming deletion vector writer. + /// + /// # Arguments + /// + /// * `writer` - A mutable reference to any type implementing [`std::io::Write`] + /// + /// # Examples + /// + /// ```rust,ignore + /// let mut buffer = Vec::new(); + /// let writer = StreamingDeletionVectorWriter::new(&mut buffer); + /// ``` + pub fn new(writer: &'a mut W) -> Self { + Self { + writer, + current_offset: 0, + has_written_version: false, + } + } + + /// Write a deletion vector to the underlying writer. + /// + /// This method can be called multiple times to write multiple deletion vectors to the same + /// file. The caller is responsible for keeping track of which deletion vector corresponds to + /// which data file. + /// + /// # Arguments + /// + /// * `deletion_vector` - The deletion vector to write + /// + /// # Returns + /// + /// A [`DeletionVectorWriteResult`] containing the offset, size, and cardinality + /// of the written deletion vector. + /// + /// # Errors + /// + /// Returns an error if: + /// - The writer fails to write data + /// - The deletion vector cannot be serialized + /// - The offset or size would overflow an i32 + /// + /// # Examples + /// + /// ```rust,ignore + /// let mut dv = KernelDeletionVector::new(); + /// dv.add_deleted_row_indexes([1, 5, 10].iter().copied()); + /// + /// let descriptor = writer.write_deletion_vector(dv)?; + /// println!("Written DV at offset {} with size {}", descriptor.offset, descriptor.size_in_bytes); + /// # Ok::<(), delta_kernel::Error>(()) + /// ``` + pub fn write_deletion_vector( + &mut self, + deletion_vector: impl DeletionVector, + ) -> DeltaResult { + // Write version byte on first write + if !self.has_written_version { + self.writer + .write_all(&[1u8]) + .map_err(|e| Error::generic(format!("Failed to write version byte: {}", e)))?; + self.current_offset = 1; + self.has_written_version = true; + } + + // Convert deletion vector to RoaringTreemap + let treemap: RoaringTreemap = deletion_vector.into_roaring_treemap(); + let cardinality = treemap.len(); + + // Serialize the treemap to bytes + let mut serialized = Vec::new(); + treemap + .serialize_into(&mut serialized) + .map_err(|e| Error::generic(format!("Failed to serialize deletion vector: {}", e)))?; + + // Calculate sizes + + // The size field contains the size of data + magic(4) (doesn't include CRC) + let dv_size = serialized.len() + 4; + if dv_size > i32::MAX as usize { + return Err(Error::generic( + "Deletion vector size exceeds maximum allowed size", + )); + } + + // Record the offset where this DV size starts. + let dv_offset = self.current_offset; + + // Write size (big-endian, as per Delta spec) + let size_bytes = (dv_size as u32).to_be_bytes(); + self.writer + .write_all(&size_bytes) + .map_err(|e| Error::generic(format!("Failed to write size: {}", e)))?; + + // Write magic number (little-endian, 0x64485856 = 1681511510) + // This is the RoaringBitmapArray format magic + let magic: u32 = 1681511377; + self.writer + .write_all(&magic.to_le_bytes()) + .map_err(|e| Error::generic(format!("Failed to write magic: {}", e)))?; + + // Write the serialized treemap + self.writer + .write_all(&serialized) + .map_err(|e| Error::generic(format!("Failed to write deletion vector data: {}", e)))?; + + // Calculate and write CRC32 checksum (big-endian) + // The CRC must include both the magic and the serialized data + let crc_instance = create_dv_crc32(); + let mut digest = crc_instance.digest(); + digest.update(&magic.to_le_bytes()); + digest.update(&serialized); + let checksum = digest.finalize(); + self.writer + .write_all(&checksum.to_be_bytes()) + .map_err(|e| Error::generic(format!("Failed to write CRC32 checksum: {}", e)))?; + + // Update offset for next write (size_prefix + magic + data + crc) + let bytes_written = 4 + dv_size + 4; // size + (magic + data) + crc + self.current_offset = self + .current_offset + .checked_add(bytes_written as i32) + .ok_or_else(|| Error::generic("Deletion vector offset overflow"))?; + + Ok(DeletionVectorWriteResult { + offset: dv_offset, + size_in_bytes: dv_size as i32, + cardinality: cardinality as i64, + }) + } + + /// Finalize all writes and flush the underlying writer. + /// + /// This method should be called after all deletion vectors have been written. + /// After calling this method, the writer should not be used anymore. + /// + /// # Errors + /// + /// Returns an error if flushing the writer fails. + /// + /// # Examples + /// + /// ```rust,ignore + /// writer.write_deletion_vector(dv1)?; + /// writer.write_deletion_vector(dv2)?; + /// writer.finalize()?; + /// # Ok::<(), delta_kernel::Error>(()) + /// ``` + pub fn finalize(self) -> DeltaResult<()> { + self.writer + .flush() + .map_err(|e| Error::generic(format!("Failed to flush writer: {}", e))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + #[test] + fn test_kernel_deletion_vector_new() { + let dv = KernelDeletionVector::new(); + assert_eq!(dv.cardinality(), 0); + } + + #[test] + fn test_kernel_deletion_vector_add_indexes() { + let mut dv = KernelDeletionVector::new(); + dv.add_deleted_row_indexes([1u64, 5, 10].iter().copied()); + + assert_eq!(dv.cardinality(), 3); + assert_eq!( + dv.into_iter().collect::(), + RoaringTreemap::from_iter([1, 5, 10]) + ); + } + + #[test] + fn test_streaming_writer_single_dv() { + let mut buffer = Vec::new(); + let mut writer = StreamingDeletionVectorWriter::new(&mut buffer); + + let mut dv = KernelDeletionVector::new(); + dv.add_deleted_row_indexes([0u64, 9].iter().copied()); + + let descriptor = writer.write_deletion_vector(dv).unwrap(); + writer.finalize().unwrap(); + + // Check descriptor values + assert_eq!(descriptor.offset, 1); // After version byte + assert_eq!(descriptor.cardinality, 2); + assert!(descriptor.size_in_bytes > 0); + + // Check buffer contents + assert!(!buffer.is_empty()); + assert_eq!(buffer[0], 1); // Version byte + } + + #[test] + fn test_streaming_writer_multiple_dvs() { + let mut buffer = Vec::new(); + let mut writer = StreamingDeletionVectorWriter::new(&mut buffer); + + let mut dv1 = KernelDeletionVector::new(); + dv1.add_deleted_row_indexes([0u64, 9].iter().copied()); + + let mut dv2 = KernelDeletionVector::new(); + dv2.add_deleted_row_indexes([5u64, 15, 25].iter().copied()); + + let desc1 = writer.write_deletion_vector(dv1).unwrap(); + let desc2 = writer.write_deletion_vector(dv2).unwrap(); + writer.finalize().unwrap(); + + // Check that offsets are different and sequential + assert_eq!(desc1.offset, 1); + assert!(desc2.offset > desc1.offset); + assert_eq!(desc1.cardinality, 2); + assert_eq!(desc2.cardinality, 3); + } + + #[test] + fn test_streaming_writer_roundtrip() { + // Write a deletion vector + let mut buffer = Vec::new(); + let mut writer = StreamingDeletionVectorWriter::new(&mut buffer); + + let mut dv = KernelDeletionVector::new(); + let test_indexes = vec![3, 4, 7, 11, 18, 29]; + dv.add_deleted_row_indexes(test_indexes.iter().copied()); + + let descriptor = writer.write_deletion_vector(dv).unwrap(); + writer.finalize().unwrap(); + + // Now try to read it back + let mut cursor = Cursor::new(buffer); + cursor.set_position(descriptor.offset as u64); + + // Read size + let mut size_buf = [0u8; 4]; + std::io::Read::read_exact(&mut cursor, &mut size_buf).unwrap(); + let size = u32::from_be_bytes(size_buf); + assert_eq!(size, descriptor.size_in_bytes as u32); + + // Read magic + let mut magic_buf = [0u8; 4]; + std::io::Read::read_exact(&mut cursor, &mut magic_buf).unwrap(); + let magic = u32::from_le_bytes(magic_buf); + assert_eq!(magic, 1681511377); + + // Read the serialized data (size includes magic, so actual data is size - 4) + let serialized_data_len = (size - 4) as usize; + let mut serialized_data = vec![0u8; serialized_data_len]; + std::io::Read::read_exact(&mut cursor, &mut serialized_data).unwrap(); + + // Read and verify CRC32 checksum + let mut crc_buf = [0u8; 4]; + std::io::Read::read_exact(&mut cursor, &mut crc_buf).unwrap(); + let stored_checksum = u32::from_be_bytes(crc_buf); + + // Calculate expected checksum (must include magic + serialized data) + let crc_instance = create_dv_crc32(); + let mut digest = crc_instance.digest(); + digest.update(&magic_buf); + digest.update(&serialized_data); + let expected_checksum = digest.finalize(); + assert_eq!( + stored_checksum, expected_checksum, + "CRC32 checksum mismatch" + ); + + // Deserialize the treemap + let treemap = RoaringTreemap::deserialize_from(&serialized_data[..]).unwrap(); + assert_eq!(treemap.len(), test_indexes.len() as u64); + for idx in test_indexes { + assert!(treemap.contains(idx)); + } + } + + #[test] + fn test_deletion_vector_trait() { + struct TestDV { + indexes: Vec, + } + + impl DeletionVector for TestDV { + type IndexIterator = std::vec::IntoIter; + + fn into_iter(self) -> Self::IndexIterator { + self.indexes.into_iter() + } + } + + let test_dv = TestDV { + indexes: vec![1, 2, 3], + }; + + let mut buffer = Vec::new(); + let mut writer = StreamingDeletionVectorWriter::new(&mut buffer); + let descriptor = writer.write_deletion_vector(test_dv).unwrap(); + + assert_eq!(descriptor.cardinality, 3); + } + + #[test] + fn test_array_based_deletion_vector() { + use crate::Engine; + use std::fs::File; + use tempfile::tempdir; + use url::Url; + + // Custom DeletionVector implementation that wraps an array of u64 + struct ArrayDeletionVector { + deleted_rows: Vec, + } + + impl ArrayDeletionVector { + fn new(deleted_rows: Vec) -> Self { + Self { deleted_rows } + } + } + + impl DeletionVector for ArrayDeletionVector { + type IndexIterator = std::vec::IntoIter; + + fn into_iter(self) -> Self::IndexIterator { + self.deleted_rows.into_iter() + } + } + + // Create a temporary directory and file + let temp_dir = tempdir().unwrap(); + let table_url = Url::from_directory_path(temp_dir.path()).unwrap(); + + let dv_path = DeletionVectorPath::new(table_url.clone(), String::from("test")); + let file_path = dv_path.absolute_path().unwrap().to_file_path().unwrap(); + + // Create parent directory if it doesn't exist + if let Some(parent) = file_path.parent() { + std::fs::create_dir_all(parent).unwrap(); + } + + let mut file = File::create(&file_path).unwrap(); + + // Create an array-based deletion vector with specific deleted row indexes + let deleted_indexes = vec![5u64, 12, 23, 45, 67, 89, 100]; + let array_dv = ArrayDeletionVector::new(deleted_indexes.clone()); + + // Write using StreamingDeletionVectorWriter + let mut writer = StreamingDeletionVectorWriter::new(&mut file); + let write_result = writer.write_deletion_vector(array_dv).unwrap(); + writer.finalize().unwrap(); + drop(file); // Ensure file is closed + + // Verify the write result metadata + assert_eq!(write_result.cardinality, deleted_indexes.len() as i64); + assert_eq!(write_result.offset, 1); // After version byte + assert!(write_result.size_in_bytes > 0); + + // Read back using the descriptor to verify the data was written correctly + use crate::engine::sync::SyncEngine; + let engine = SyncEngine::new(); + let storage = engine.storage_handler(); + + let descriptor = write_result.to_descriptor(&dv_path); + let treemap = descriptor + .read_with_possible_validation(storage, &table_url, true) + .unwrap(); + + // Verify the exact set of indexes matches + let read_indexes: Vec = treemap.into_iter().collect(); + assert_eq!(read_indexes, deleted_indexes); + } + + #[test] + fn test_to_descriptor_preserves_absolute_path() { + use url::Url; + + let table_path = Url::parse("file:///tmp/test_table/").unwrap(); + let prefix = String::from("deletion_vectors"); + + let dv_path = DeletionVectorPath::new(table_path.clone(), prefix); + + // Get the absolute path from DeletionVectorPath + let expected_absolute_path = dv_path.absolute_path().unwrap(); + + // Create a write result and convert to descriptor + let write_result = DeletionVectorWriteResult { + offset: 1, + size_in_bytes: 100, + cardinality: 42, + }; + + let descriptor = write_result.to_descriptor(&dv_path); + + // Get the absolute path from the descriptor + let actual_absolute_path = descriptor.absolute_path(&table_path).unwrap(); + + // Verify they match + assert_eq!(Some(expected_absolute_path), actual_absolute_path); + } + + #[test] + fn test_to_descriptor_preserves_absolute_path_empty_prefix() { + use url::Url; + + let table_path = Url::parse("file:///tmp/test_table/").unwrap(); + let prefix = String::from(""); + + let dv_path = DeletionVectorPath::new(table_path.clone(), prefix); + + // Get the absolute path from DeletionVectorPath + let expected_absolute_path = dv_path.absolute_path().unwrap(); + + // Create a write result and convert to descriptor + let write_result = DeletionVectorWriteResult { + offset: 10, + size_in_bytes: 50, + cardinality: 5, + }; + + let descriptor = write_result.to_descriptor(&dv_path); + + // Get the absolute path from the descriptor + let actual_absolute_path = descriptor.absolute_path(&table_path).unwrap(); + + // Verify they match + assert_eq!(Some(expected_absolute_path), actual_absolute_path); + } + + #[test] + fn test_to_descriptor_fields() { + use url::Url; + + let table_path = Url::parse("s3://my-bucket/delta_table/").unwrap(); + let prefix = String::from("dv"); + + let dv_path = DeletionVectorPath::new(table_path.clone(), prefix); + + let write_result = DeletionVectorWriteResult { + offset: 42, + size_in_bytes: 256, + cardinality: 100, + }; + + let descriptor = write_result.to_descriptor(&dv_path); + + // Verify descriptor fields match write result + assert_eq!(descriptor.offset, Some(42)); + assert_eq!(descriptor.size_in_bytes, 256); + assert_eq!(descriptor.cardinality, 100); + assert_eq!( + descriptor.storage_type, + DeletionVectorStorageType::PersistedRelative + ); + } + + #[test] + fn test_multiple_deletion_vectors_roundtrip_with_descriptor() { + use crate::Engine; + use std::fs::File; + use tempfile::tempdir; + use url::Url; + + // Create a temporary directory and file + let temp_dir = tempdir().unwrap(); + let table_url = Url::from_directory_path(temp_dir.path()).unwrap(); + + let dv_path = DeletionVectorPath::new(table_url.clone(), String::from("abc")); + let file_path = dv_path.absolute_path().unwrap().to_file_path().unwrap(); + + // Create parent directory if it doesn't exist + if let Some(parent) = file_path.parent() { + std::fs::create_dir_all(parent).unwrap(); + } + + let mut file = File::create(&file_path).unwrap(); + + // Create multiple deletion vectors with different data + let test_data = vec![ + vec![0u64, 5, 10, 15], + vec![1u64, 2, 3, 100, 200], + vec![50u64, 51, 52, 53, 54, 55], + ]; + + // Write all deletion vectors and collect their descriptors + let mut descriptors = Vec::new(); + let mut writer = StreamingDeletionVectorWriter::new(&mut file); + + for indexes in &test_data { + let mut dv = KernelDeletionVector::new(); + dv.add_deleted_row_indexes(indexes.iter().copied()); + + let write_result = writer.write_deletion_vector(dv).unwrap(); + descriptors.push(write_result); + } + + writer.finalize().unwrap(); + drop(file); // Ensure file is closed + + // Create a storage handler using sync engine + use crate::engine::sync::SyncEngine; + let engine = SyncEngine::new(); + let storage = engine.storage_handler(); + + // Now read back each deletion vector using the descriptors + for (write_result, expected_indexes) in descriptors.iter().zip(&test_data) { + // Create a new DeletionVectorPath for each DV (they would have different UUIDs normally, + // but for this test we're writing multiple to the same file) + let descriptor = write_result.clone().to_descriptor(&dv_path); + + // Read the deletion vector back using the descriptor + let treemap = descriptor + .read_with_possible_validation(storage.clone(), &table_url, true) + .unwrap(); + + // Verify the content matches + assert_eq!( + treemap, + expected_indexes.iter().collect::(), + "read {:?} != expected {:?}", + treemap, + expected_indexes + ); + } + } +} diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index ae7669912..625b8b752 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -33,6 +33,7 @@ const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION"); const UNKNOWN_OPERATION: &str = "UNKNOWN"; pub mod deletion_vector; +pub mod deletion_vector_writer; pub mod set_transaction; pub(crate) mod crc; diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index f647ba0e8..6c5cd9900 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -5,6 +5,7 @@ use std::sync::{Arc, LazyLock}; use url::Url; +use crate::actions::deletion_vector::DeletionVectorPath; use crate::actions::{ as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_commit_info_schema, get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, @@ -696,6 +697,69 @@ impl WriteContext { pub fn logical_to_physical(&self) -> ExpressionRef { self.logical_to_physical.clone() } + + /// Generate a new unique absolute URL for a deletion vector file. + /// + /// This method generates a unique file name in the table directory. + /// Each call to this method returns a new unique path. + /// + /// # Arguments + /// + /// * `random_prefix` - A random prefix to use for the deletion vector file name. + /// Making this non-empty can help distributed load on object storage when writing/reading + /// to avoid throttling. Typically a random string fo 2-4 characters is sufficient + /// for this purpose. + /// + /// + /// # Examples + /// + /// ```rust,ignore + /// let write_context = transaction.get_write_context(); + /// let dv_path = write_context.new_deletion_vector_path(String::from(rand_string())); + /// // dv_url might be: s3://bucket/table/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin + /// ``` + pub fn new_deletion_vector_path(&self, random_prefix: String) -> DeletionVectorPath { + DeletionVectorPath::new(self.target_dir.clone(), random_prefix) + } + + /// Relativize a URL with respect to the table root. + /// + /// This converts an absolute URL to a relative path suitable for storage in + /// deletion vector descriptors. The path is made relative to the table root. + /// + /// # Arguments + /// + /// * `url` - The absolute URL to relativize + /// + /// # Returns + /// + /// A relative path string, or an error if the URL cannot be made relative to the table root. + /// + /// # Examples + /// + /// ```rust,ignore + /// let write_context = transaction.get_write_context(); + /// let absolute_url = Url::parse("s3://bucket/table/deletion_vector_abc.bin")?; + /// let relative = write_context.relativize_url(&absolute_url)?; + /// // relative might be: "deletion_vector_abc.bin" + /// # Ok::<(), delta_kernel::Error>(()) + /// ``` + pub fn relativize_url(&self, url: &Url) -> DeltaResult { + // Get the path component of both URLs + let table_path = self.target_dir.path(); + let file_path = url.path(); + + // Check if the file path starts with the table path + if let Some(relative_path) = file_path.strip_prefix(table_path) { + // Remove leading slash if present + Ok(relative_path.trim_start_matches('/').to_string()) + } else { + Err(Error::generic(format!( + "Cannot relativize URL {} with respect to table root {}", + url, self.target_dir + ))) + } + } } /// Kernel exposes information about the state of the table that engines might want to use to @@ -848,4 +912,37 @@ mod tests { assert_eq!(*schema, expected.into()); Ok(()) } + + #[test] + fn test_new_deletion_vector_path() -> Result<(), Box> { + let engine = SyncEngine::new(); + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + let snapshot = Snapshot::builder_for(url.clone()) + .at_version(1) + .build(&engine) + .unwrap(); + let txn = snapshot.transaction()?.with_engine_info("default engine"); + let write_context = txn.get_write_context(); + + // Test with empty prefix + let dv_path1 = write_context.new_deletion_vector_path(String::from("")); + let abs_path1 = dv_path1.absolute_path()?; + assert!(abs_path1.as_str().contains(url.as_str())); + + // Test with non-empty prefix + let prefix = String::from("dv_test"); + let dv_path2 = write_context.new_deletion_vector_path(prefix.clone()); + let abs_path2 = dv_path2.absolute_path()?; + assert!(abs_path2.as_str().contains(url.as_str())); + assert!(abs_path2.as_str().contains(&prefix)); + + // Test that two paths with same prefix are different (unique UUIDs) + let dv_path3 = write_context.new_deletion_vector_path(prefix.clone()); + let abs_path3 = dv_path3.absolute_path()?; + assert_ne!(abs_path2, abs_path3); + + Ok(()) + } } From 29d2bb9de76e2af55340527cc742c52bed282549 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 23 Oct 2025 22:09:13 +0000 Subject: [PATCH 2/5] nits --- kernel/src/actions/deletion_vector.rs | 6 ++++++ kernel/src/actions/deletion_vector_writer.rs | 8 ++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/kernel/src/actions/deletion_vector.rs b/kernel/src/actions/deletion_vector.rs index 4485f6a0b..0fcd431f9 100644 --- a/kernel/src/actions/deletion_vector.rs +++ b/kernel/src/actions/deletion_vector.rs @@ -59,6 +59,8 @@ impl ToDataType for DeletionVectorStorageType { } } +/// Represents and abstract path to a deltion vector. This is used to construct the path to a deletion vector file +/// and translate it to the relative path the delta spec requires (this is only exposed via [`to_deletion_vector_descriptor`]). pub struct DeletionVectorPath { table_path: Url, uuid: uuid::Uuid, @@ -83,6 +85,8 @@ impl DeletionVectorPath { } } + /// Helper method to cosntruct the relative path to a deletion vector file, + /// the prefix and the UUID suffix. fn relative_path(prefix: &str, uuid: &uuid::Uuid) -> String { let uuid_as_string = uuid::Uuid::to_string(uuid); if !prefix.is_empty() { @@ -92,6 +96,7 @@ impl DeletionVectorPath { } } + /// Returns the absolute path to the deletion vector file. pub fn absolute_path(&self) -> DeltaResult { let dv_suffix = Self::relative_path(&self.prefix, &self.uuid); let dv_path = self @@ -101,6 +106,7 @@ impl DeletionVectorPath { Ok(dv_path) } + /// Returns the compressed encoded path for use in descriptor (prefix + z85 encoded UUID). pub(crate) fn encoded_relative_path(&self) -> String { format!("{}{}", self.prefix, z85::encode(self.uuid.as_bytes())) } diff --git a/kernel/src/actions/deletion_vector_writer.rs b/kernel/src/actions/deletion_vector_writer.rs index d53e51054..7a03626d8 100644 --- a/kernel/src/actions/deletion_vector_writer.rs +++ b/kernel/src/actions/deletion_vector_writer.rs @@ -156,7 +156,7 @@ impl DeletionVector for KernelDeletionVector { /// - The first byte of the file is a version byte (currently 1) /// - Each DV is prefixed with a 4-byte size (big-endian) of the serialized data /// - Followed by a 4-byte magic number (0x64485871, little-endian) -/// - Followed by the serialized RoaringTreemap +/// - Followed by the serialized 64-bit Roaring Bitmap /// - Followed by a 4-byte CRC32 checksum (big-endian) of the serialized data /// /// # Examples @@ -186,7 +186,7 @@ impl<'a, W: Write> StreamingDeletionVectorWriter<'a, W> { /// /// # Arguments /// - /// * `writer` - A mutable reference to any type implementing [`std::io::Write`] + /// * `writer` - A mutable reference to any type implementing [`std::io::Write`]. /// /// # Examples /// @@ -205,7 +205,7 @@ impl<'a, W: Write> StreamingDeletionVectorWriter<'a, W> { /// Write a deletion vector to the underlying writer. /// /// This method can be called multiple times to write multiple deletion vectors to the same - /// file. The caller is responsible for keeping track of which deletion vector corresponds to + /// writer. The caller is responsible for keeping track of which deletion vector corresponds to /// which data file. /// /// # Arguments @@ -276,7 +276,7 @@ impl<'a, W: Write> StreamingDeletionVectorWriter<'a, W> { .write_all(&size_bytes) .map_err(|e| Error::generic(format!("Failed to write size: {}", e)))?; - // Write magic number (little-endian, 0x64485856 = 1681511510) + // Write magic number (little-endian) // This is the RoaringBitmapArray format magic let magic: u32 = 1681511377; self.writer From 79e733ea0df95f08c5b0246fcee7c5204eaffb30 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 23 Oct 2025 22:21:09 +0000 Subject: [PATCH 3/5] better method for writes --- kernel/src/actions/deletion_vector_writer.rs | 41 +++++++++++++------- kernel/src/transaction/mod.rs | 4 +- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/kernel/src/actions/deletion_vector_writer.rs b/kernel/src/actions/deletion_vector_writer.rs index 7a03626d8..27638b14b 100644 --- a/kernel/src/actions/deletion_vector_writer.rs +++ b/kernel/src/actions/deletion_vector_writer.rs @@ -5,6 +5,7 @@ use std::io::Write; +use bytes::Bytes; use roaring::RoaringTreemap; use crate::actions::deletion_vector::{ @@ -41,12 +42,20 @@ pub trait DeletionVector: Sized { /// Consume the deletion vector and return an iterator over deleted row indexes. fn into_iter(self) -> Self::IndexIterator; - /// Convert the deletion vector into a RoaringTreemap. + /// Serialize the deletion vector into bytes. /// - /// This method has a default implementation that collects from the iterator, - /// but types can override it for more efficient conversion. - fn into_roaring_treemap(self) -> RoaringTreemap { - self.into_iter().collect() + /// This method has a default implementation that collects the iterator into a + /// RoaringTreemap and serializes it. Types can override this for more efficient + /// serialization if they already have the data in a suitable format. + /// + /// Returns a `Bytes` object which is reference-counted and FFI-friendly. + fn serialize(self) -> DeltaResult { + let treemap: RoaringTreemap = self.into_iter().collect(); + let mut serialized = Vec::new(); + treemap + .serialize_into(&mut serialized) + .map_err(|e| Error::generic(format!("Failed to serialize deletion vector: {}", e)))?; + Ok(Bytes::from(serialized)) } } @@ -138,9 +147,13 @@ impl DeletionVector for KernelDeletionVector { self.dv.into_iter() } - /// Optimized conversion that returns the internal RoaringTreemap directly. - fn into_roaring_treemap(self) -> RoaringTreemap { + /// Optimized serialization that directly serializes the internal RoaringTreemap. + fn serialize(self) -> DeltaResult { + let mut serialized = Vec::new(); self.dv + .serialize_into(&mut serialized) + .map_err(|e| Error::generic(format!("Failed to serialize deletion vector: {}", e)))?; + Ok(Bytes::from(serialized)) } } @@ -247,15 +260,13 @@ impl<'a, W: Write> StreamingDeletionVectorWriter<'a, W> { self.has_written_version = true; } - // Convert deletion vector to RoaringTreemap - let treemap: RoaringTreemap = deletion_vector.into_roaring_treemap(); - let cardinality = treemap.len(); + // Serialize the deletion vector to bytes + let serialized = deletion_vector.serialize()?; - // Serialize the treemap to bytes - let mut serialized = Vec::new(); - treemap - .serialize_into(&mut serialized) - .map_err(|e| Error::generic(format!("Failed to serialize deletion vector: {}", e)))?; + // Deserialize to get cardinality (we need this for the metadata) + let treemap = RoaringTreemap::deserialize_from(&serialized[..]) + .map_err(|e| Error::generic(format!("Failed to deserialize deletion vector for cardinality: {}", e)))?; + let cardinality = treemap.len(); // Calculate sizes diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 6c5cd9900..288cbdac9 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -923,7 +923,9 @@ mod tests { .at_version(1) .build(&engine) .unwrap(); - let txn = snapshot.transaction()?.with_engine_info("default engine"); + let txn = snapshot + .transaction(Box::new(FileSystemCommitter::new()))? + .with_engine_info("default engine"); let write_context = txn.get_write_context(); // Test with empty prefix From 8c2a39bc05c06dd272d909bb70ee46748d717f6e Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Fri, 24 Oct 2025 01:31:47 +0000 Subject: [PATCH 4/5] use serialize instead --- kernel/src/actions/deletion_vector_writer.rs | 29 ++++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/kernel/src/actions/deletion_vector_writer.rs b/kernel/src/actions/deletion_vector_writer.rs index 27638b14b..fe1dd8d70 100644 --- a/kernel/src/actions/deletion_vector_writer.rs +++ b/kernel/src/actions/deletion_vector_writer.rs @@ -39,16 +39,17 @@ pub trait DeletionVector: Sized { /// Iterator type that yields deleted row indexes. type IndexIterator: Iterator; - /// Consume the deletion vector and return an iterator over deleted row indexes. + /// Return an iterator over deleted row indexes. fn into_iter(self) -> Self::IndexIterator; + /// Return the number of deleted rows in the deletion vector. + fn cardinality(&self) -> u64; + /// Serialize the deletion vector into bytes. /// - /// This method has a default implementation that collects the iterator into a - /// RoaringTreemap and serializes it. Types can override this for more efficient - /// serialization if they already have the data in a suitable format. - /// - /// Returns a `Bytes` object which is reference-counted and FFI-friendly. + /// This serializes the deletion vector in the format expected by the Delta Lake protocol. + /// it may be overridden for more efficient serialization if the implementation already has the data in a suitable format. + /// But generally, only do this if you fully understand the the format requirements. fn serialize(self) -> DeltaResult { let treemap: RoaringTreemap = self.into_iter().collect(); let mut serialized = Vec::new(); @@ -155,6 +156,10 @@ impl DeletionVector for KernelDeletionVector { .map_err(|e| Error::generic(format!("Failed to serialize deletion vector: {}", e)))?; Ok(Bytes::from(serialized)) } + + fn cardinality(&self) -> u64 { + self.dv.len() + } } /// A streaming writer for deletion vectors. @@ -260,13 +265,11 @@ impl<'a, W: Write> StreamingDeletionVectorWriter<'a, W> { self.has_written_version = true; } + let cardinality = deletion_vector.cardinality(); // Serialize the deletion vector to bytes let serialized = deletion_vector.serialize()?; // Deserialize to get cardinality (we need this for the metadata) - let treemap = RoaringTreemap::deserialize_from(&serialized[..]) - .map_err(|e| Error::generic(format!("Failed to deserialize deletion vector for cardinality: {}", e)))?; - let cardinality = treemap.len(); // Calculate sizes @@ -484,6 +487,10 @@ mod tests { fn into_iter(self) -> Self::IndexIterator { self.indexes.into_iter() } + + fn cardinality(&self) -> u64 { + self.indexes.len() as u64 + } } let test_dv = TestDV { @@ -521,6 +528,10 @@ mod tests { fn into_iter(self) -> Self::IndexIterator { self.deleted_rows.into_iter() } + + fn cardinality(&self) -> u64 { + self.deleted_rows.len() as u64 + } } // Create a temporary directory and file From 0abb739c2eb44481870e72e76e945b5672b746f9 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Fri, 24 Oct 2025 01:57:10 +0000 Subject: [PATCH 5/5] polish --- kernel/src/actions/deletion_vector.rs | 137 ++----------------- kernel/src/actions/deletion_vector_writer.rs | 4 +- kernel/src/transaction/mod.rs | 39 ------ 3 files changed, 17 insertions(+), 163 deletions(-) diff --git a/kernel/src/actions/deletion_vector.rs b/kernel/src/actions/deletion_vector.rs index 0fcd431f9..39f7a0e6c 100644 --- a/kernel/src/actions/deletion_vector.rs +++ b/kernel/src/actions/deletion_vector.rs @@ -59,8 +59,10 @@ impl ToDataType for DeletionVectorStorageType { } } -/// Represents and abstract path to a deltion vector. This is used to construct the path to a deletion vector file -/// and translate it to the relative path the delta spec requires (this is only exposed via [`to_deletion_vector_descriptor`]). +/// Represents and abstract path to a deltion vector. This is used in the public +/// API to construct the path to a deletion vector file and +/// have business logic to help convert [`crate::actions::deletion_vector_writer::DeletionVectorWriteResult`] to a [`DeletionVectorDescriptor`] +/// with appropriate storage type and path. pub struct DeletionVectorPath { table_path: Url, uuid: uuid::Uuid, @@ -652,7 +654,7 @@ mod tests { } #[test] - fn test_deletion_vector_path_absolute_path_uniqueness() { + fn test_deletion_vector_path_uniqueness() { // Verify that two DeletionVectorPath instances created with the same arguments // produce different absolute paths due to unique UUIDs let table_path = Url::parse("file:///tmp/test_table/").unwrap(); @@ -667,9 +669,10 @@ mod tests { // The absolute paths should be different because each DeletionVectorPath // gets a unique UUID assert_ne!(abs_path1, abs_path2); - - // But they should share the same parent path - assert_eq!(abs_path1.join("..").unwrap(), abs_path2.join("..").unwrap()); + assert_ne!( + dv_path1.encoded_relative_path(), + dv_path2.encoded_relative_path() + ); } #[test] @@ -701,6 +704,10 @@ mod tests { let expected_path = "file:///tmp/test_table/dv/deletion_vector_550e8400-e29b-41d4-a716-446655440000.bin"; assert_eq!(abs_path.as_str(), expected_path); + + // Verify the encoded_relative_path is exactly as expected (prefix + z85 encoded UUID: 20 chars) + let encoded = dv_path.encoded_relative_path(); + assert_eq!(encoded, "dvrsTVZ&*Sl-RXRWjryu/!"); } #[test] @@ -717,123 +724,9 @@ mod tests { let expected_path = "file:///tmp/test_table/deletion_vector_123e4567-e89b-12d3-a456-426614174000.bin"; assert_eq!(abs_path.as_str(), expected_path); - } - - #[test] - fn test_deletion_vector_path_encoded_with_known_uuid() { - // Test encoded relative path with a known UUID - let table_path = Url::parse("file:///tmp/test_table/").unwrap(); - let prefix = String::from("prefix"); - let known_uuid = uuid::Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap(); - - let dv_path = DeletionVectorPath::new_with_uuid(table_path, prefix.clone(), known_uuid); - let encoded = dv_path.encoded_relative_path(); - - // The encoded path should start with the prefix - assert!(encoded.starts_with(&prefix)); - - // Verify it has the correct length (prefix + 20 for z85 encoded UUID) - assert_eq!(encoded.len(), prefix.len() + 20); - - // Verify the z85 encoding of the UUID (550e8400-e29b-41d4-a716-446655440000 -> rsTVZ&*Sl-RXRWjryu/!) - assert_eq!(encoded, "prefixrsTVZ&*Sl-RXRWjryu/!"); - } - - #[test] - fn test_deletion_vector_path_absolute_path_empty_prefix() { - let table_path = Url::parse("file:///tmp/test_table/").unwrap(); - let prefix = String::from(""); - - let dv_path = DeletionVectorPath::new(table_path.clone(), prefix); - let abs_path = dv_path.absolute_path().unwrap(); - - // With empty prefix, the deletion vector file should be directly under table_path - let path_str = abs_path.as_str(); - assert!(path_str.starts_with("file:///tmp/test_table/deletion_vector_")); - assert!(path_str.ends_with(".bin")); - } - - #[test] - fn test_deletion_vector_path_encoded_relative_path() { - let table_path = Url::parse("file:///tmp/test_table/").unwrap(); - let prefix = String::from("dv"); - - let dv_path = DeletionVectorPath::new(table_path, prefix.clone()); - let encoded = dv_path.encoded_relative_path(); - - // The encoded path should start with the prefix - assert!(encoded.starts_with(&prefix)); - - // The encoded path should be at least prefix length + 20 (z85 encoded UUID) - assert!(encoded.len() >= prefix.len() + 20); - - // The part after the prefix should be exactly 20 characters (z85 encoded 16-byte UUID) - assert_eq!(encoded.len() - prefix.len(), 20); - } - - #[test] - fn test_deletion_vector_path_encoded_relative_path_empty_prefix() { - let table_path = Url::parse("file:///tmp/test_table/").unwrap(); - let prefix = String::from(""); - - let dv_path = DeletionVectorPath::new(table_path, prefix); - let encoded = dv_path.encoded_relative_path(); - - // With empty prefix, the encoded path should be exactly 20 characters - assert_eq!(encoded.len(), 20); - } - - #[test] - fn test_deletion_vector_path_with_s3_url() { - let table_path = Url::parse("s3://my-bucket/warehouse/delta_table/").unwrap(); - let prefix = String::from("deletion_vectors"); - - let dv_path = DeletionVectorPath::new(table_path, prefix); - let abs_path = dv_path.absolute_path().unwrap(); - - // Should maintain the s3 scheme - assert!(abs_path.as_str().starts_with("s3://")); - assert!(abs_path.as_str().contains("my-bucket")); - assert!(abs_path - .as_str() - .contains("/deletion_vectors/deletion_vector_")); - } - - #[test] - fn test_deletion_vector_path_with_nested_prefix() { - let table_path = Url::parse("file:///data/warehouse/my_table/").unwrap(); - let prefix = String::from("_delta_log/deletion_vectors"); - - let dv_path = DeletionVectorPath::new(table_path, prefix.clone()); - let abs_path = dv_path.absolute_path().unwrap(); - - // Should contain the nested path structure - assert!(abs_path - .as_str() - .contains("/_delta_log/deletion_vectors/deletion_vector_")); + // Verify the encoded_relative_path is exactly as expected (z85 encoded UUID: 20 chars) let encoded = dv_path.encoded_relative_path(); - // The encoded path should start with the full prefix - assert!(encoded.starts_with(&prefix)); - } - - #[test] - fn test_deletion_vector_path_encoded_uniqueness() { - // Verify that encoded relative paths are unique for different instances - let table_path = Url::parse("file:///tmp/test_table/").unwrap(); - let prefix = String::from("dv"); - - let dv_path1 = DeletionVectorPath::new(table_path.clone(), prefix.clone()); - let dv_path2 = DeletionVectorPath::new(table_path.clone(), prefix.clone()); - - let encoded1 = dv_path1.encoded_relative_path(); - let encoded2 = dv_path2.encoded_relative_path(); - - // Each instance should have a different encoded path due to unique UUIDs - assert_ne!(encoded1, encoded2); - - // But they should both start with the same prefix - assert!(encoded1.starts_with("dv")); - assert!(encoded2.starts_with("dv")); + assert_eq!(encoded, "5:JjlQ/G/]6C<1m"); } } diff --git a/kernel/src/actions/deletion_vector_writer.rs b/kernel/src/actions/deletion_vector_writer.rs index fe1dd8d70..62af1cbd8 100644 --- a/kernel/src/actions/deletion_vector_writer.rs +++ b/kernel/src/actions/deletion_vector_writer.rs @@ -63,7 +63,7 @@ pub trait DeletionVector: Sized { /// Metadata about a written deletion vector, excluding the storage path. /// /// This structure contains the information needed to construct a full -/// [`DeletionVectorDescriptor`](crate::actions::deletion_vector::DeletionVectorDescriptor) +/// [`DeletionVectorDescriptor`] /// after writing the DV to storage. #[derive(Debug, Clone, PartialEq, Eq)] pub struct DeletionVectorWriteResult { @@ -108,7 +108,7 @@ impl DeletionVectorWriteResult { /// use delta_kernel::actions::deletion_vector_writer::KernelDeletionVector; /// /// let mut dv = KernelDeletionVector::new(); -/// dv.add_deleted_row_indexes([0, 5, 10]; +/// dv.add_deleted_row_indexes([0, 5, 10]); /// ``` #[derive(Debug, Clone)] pub struct KernelDeletionVector { diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 288cbdac9..f2d691230 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -721,45 +721,6 @@ impl WriteContext { pub fn new_deletion_vector_path(&self, random_prefix: String) -> DeletionVectorPath { DeletionVectorPath::new(self.target_dir.clone(), random_prefix) } - - /// Relativize a URL with respect to the table root. - /// - /// This converts an absolute URL to a relative path suitable for storage in - /// deletion vector descriptors. The path is made relative to the table root. - /// - /// # Arguments - /// - /// * `url` - The absolute URL to relativize - /// - /// # Returns - /// - /// A relative path string, or an error if the URL cannot be made relative to the table root. - /// - /// # Examples - /// - /// ```rust,ignore - /// let write_context = transaction.get_write_context(); - /// let absolute_url = Url::parse("s3://bucket/table/deletion_vector_abc.bin")?; - /// let relative = write_context.relativize_url(&absolute_url)?; - /// // relative might be: "deletion_vector_abc.bin" - /// # Ok::<(), delta_kernel::Error>(()) - /// ``` - pub fn relativize_url(&self, url: &Url) -> DeltaResult { - // Get the path component of both URLs - let table_path = self.target_dir.path(); - let file_path = url.path(); - - // Check if the file path starts with the table path - if let Some(relative_path) = file_path.strip_prefix(table_path) { - // Remove leading slash if present - Ok(relative_path.trim_start_matches('/').to_string()) - } else { - Err(Error::generic(format!( - "Cannot relativize URL {} with respect to table root {}", - url, self.target_dir - ))) - } - } } /// Kernel exposes information about the state of the table that engines might want to use to