Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pre-release-hook = [
delta_kernel_derive = { path = "../derive-macros", version = "0.16.0" }
bytes = "1.10"
chrono = "0.4.41"
crc = "3.2.2"
indexmap = "2.10.0"
itertools = "0.14"
roaring = "0.11.2"
Expand Down
205 changes: 186 additions & 19 deletions kernel/src/actions/deletion_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use delta_kernel_derive::ToSchema;
use roaring::RoaringTreemap;
use url::Url;

use crc::{Crc, CRC_32_ISO_HDLC};

use crate::schema::DataType;
use crate::utils::require;
use crate::{DeltaResult, Error, StorageHandler};
Expand Down Expand Up @@ -57,6 +59,61 @@ impl ToDataType for DeletionVectorStorageType {
}
}

/// Represents and abstract path to a deltion vector. This is used in the public
/// API to construct the path to a deletion vector file and
/// have business logic to help convert [`crate::actions::deletion_vector_writer::DeletionVectorWriteResult`] to a [`DeletionVectorDescriptor`]
/// with appropriate storage type and path.
pub struct DeletionVectorPath {
table_path: Url,
uuid: uuid::Uuid,
prefix: String,
}

impl DeletionVectorPath {
pub(crate) fn new(table_path: Url, prefix: String) -> Self {
Self {
table_path,
uuid: uuid::Uuid::new_v4(),
prefix,
}
}

#[cfg(test)]
pub(crate) fn new_with_uuid(table_path: Url, prefix: String, uuid: uuid::Uuid) -> Self {
Self {
table_path,
uuid,
prefix,
}
}

/// Helper method to cosntruct the relative path to a deletion vector file,
/// the prefix and the UUID suffix.
fn relative_path(prefix: &str, uuid: &uuid::Uuid) -> String {
let uuid_as_string = uuid::Uuid::to_string(uuid);
if !prefix.is_empty() {
format!("{prefix}/deletion_vector_{uuid_as_string}.bin")
} else {
format!("deletion_vector_{uuid_as_string}.bin")
}
}

/// Returns the absolute path to the deletion vector file.
pub fn absolute_path(&self) -> DeltaResult<Url> {
let dv_suffix = Self::relative_path(&self.prefix, &self.uuid);
let dv_path = self
.table_path
.join(&dv_suffix)
.map_err(|_| Error::DeletionVector(format!("invalid path: {dv_suffix}")))?;
Ok(dv_path)
}

/// Returns the compressed encoded path for use in descriptor (prefix + z85 encoded UUID).
pub(crate) fn encoded_relative_path(&self) -> String {
format!("{}{}", self.prefix, z85::encode(self.uuid.as_bytes()))
}
}

#[derive(Debug, Clone, PartialEq, Eq, ToSchema)]
#[cfg_attr(test, derive(serde::Serialize), serde(rename_all = "camelCase"))]
pub struct DeletionVectorDescriptor {
Expand Down Expand Up @@ -122,14 +179,8 @@ impl DeletionVectorDescriptor {
.map_err(|_| Error::deletion_vector("Failed to decode DV uuid"))?;
let uuid = uuid::Uuid::from_slice(&decoded)
.map_err(|err| Error::DeletionVector(err.to_string()))?;
let dv_suffix = if prefix_len > 0 {
format!(
"{}/deletion_vector_{uuid}.bin",
&self.path_or_inline_dv[..prefix_len]
)
} else {
format!("deletion_vector_{uuid}.bin")
};
let dv_suffix =
DeletionVectorPath::relative_path(&self.path_or_inline_dv[..prefix_len], &uuid);
let dv_path = parent
.join(&dv_suffix)
.map_err(|_| Error::DeletionVector(format!("invalid path: {dv_suffix}")))?;
Expand All @@ -154,6 +205,16 @@ impl DeletionVectorDescriptor {
&self,
storage: Arc<dyn StorageHandler>,
parent: &Url,
) -> DeltaResult<RoaringTreemap> {
self.read_with_possible_validation(storage, parent, false)
}

/// Same as above, but optionally validate the CRC32 checksum in the file.
pub(crate) fn read_with_possible_validation(
&self,
storage: Arc<dyn StorageHandler>,
parent: &Url,
validate_crc: bool,
) -> DeltaResult<RoaringTreemap> {
match self.absolute_path(parent)? {
None => {
Expand Down Expand Up @@ -206,16 +267,35 @@ impl DeletionVectorDescriptor {
);

// get the Bytes back out and limit it to dv_size
let position = cursor.position();
let mut bytes = cursor.into_inner();
let truncate_pos = position + dv_size as u64;
assert!(
truncate_pos <= usize::MAX as u64,
"Can't truncate as truncate_pos is > usize::MAX"
);
bytes.truncate(truncate_pos as usize);
let mut cursor = Cursor::new(bytes);
cursor.set_position(position);
let position = cursor.position() as usize;
let bytes = cursor.into_inner();
// -4 to remove CRC portion.
let truncate_pos = position + dv_size as usize - 4;

if validate_crc {
require!(
bytes.len() >= truncate_pos + 4,
Error::DeletionVector(format!(
"Can't validate CRC as there are not enough bytes {} < {}",
bytes.len(),
truncate_pos + 4
))
);
let mut crc_cursor: Cursor<Bytes> =
Cursor::new(bytes.slice(truncate_pos..truncate_pos + 4));
let crc = read_u32(&mut crc_cursor, Endian::Big)?;
let crc32 = create_dv_crc32();
// -4 to include magic portion of the CRC
let expected_crc = crc32.checksum(&bytes.slice(position - 4..truncate_pos));
require!(
crc == expected_crc,
Error::DeletionVector(format!(
"CRC32 checksum mismatch: {crc} != {expected_crc}"
))
);
}
let dv_bytes = bytes.slice(position..truncate_pos);
let cursor = Cursor::new(dv_bytes.clone());
RoaringTreemap::deserialize_from(cursor)
.map_err(|err| Error::DeletionVector(err.to_string()))
}
Expand All @@ -238,6 +318,12 @@ enum Endian {
Little,
}

/// Factory function to create a CRC-32 instance using the ISO HDLC algorithm.
/// This ensures consistent CRC algorithm usage for deletion vectors.
pub(crate) fn create_dv_crc32() -> Crc<u32> {
Crc::<u32>::new(&CRC_32_ISO_HDLC)
}

/// small helper to read a big or little endian u32 from a cursor
fn read_u32(cursor: &mut Cursor<Bytes>, endian: Endian) -> DeltaResult<u32> {
let mut buf = [0; 4];
Expand Down Expand Up @@ -437,7 +523,11 @@ mod tests {
let storage = sync_engine.storage_handler();

let example = dv_example();
let tree_map = example.read(storage, &parent).unwrap();
let tree_map = example.read(storage.clone(), &parent).unwrap();
let validated_treemap = example
.read_with_possible_validation(storage.clone(), &parent, true)
.unwrap();
assert_eq!(tree_map, validated_treemap);

let expected: Vec<u64> = vec![0, 9];
let found = tree_map.iter().collect::<Vec<_>>();
Expand Down Expand Up @@ -562,4 +652,81 @@ mod tests {
assert_eq!(variant, parsed);
}
}

#[test]
fn test_deletion_vector_path_uniqueness() {
// Verify that two DeletionVectorPath instances created with the same arguments
// produce different absolute paths due to unique UUIDs
let table_path = Url::parse("file:///tmp/test_table/").unwrap();
let prefix = String::from("deletion_vectors");

let dv_path1 = DeletionVectorPath::new(table_path.clone(), prefix.clone());
let dv_path2 = DeletionVectorPath::new(table_path.clone(), prefix.clone());

let abs_path1 = dv_path1.absolute_path().unwrap();
let abs_path2 = dv_path2.absolute_path().unwrap();

// The absolute paths should be different because each DeletionVectorPath
// gets a unique UUID
assert_ne!(abs_path1, abs_path2);
assert_ne!(
dv_path1.encoded_relative_path(),
dv_path2.encoded_relative_path()
);
}

#[test]
fn test_deletion_vector_path_absolute_path_with_prefix() {
let table_path = Url::parse("file:///tmp/test_table/").unwrap();
let prefix = String::from("dv");
let known_uuid = uuid::Uuid::parse_str("abcdef01-2345-6789-abcd-ef0123456789").unwrap();

let dv_path = DeletionVectorPath::new_with_uuid(table_path.clone(), prefix, known_uuid);
let abs_path = dv_path.absolute_path().unwrap();

// Verify the exact path with known UUID
let expected =
"file:///tmp/test_table/dv/deletion_vector_abcdef01-2345-6789-abcd-ef0123456789.bin";
assert_eq!(abs_path.as_str(), expected);
}

#[test]
fn test_deletion_vector_path_absolute_path_with_known_uuid() {
// Test with a known UUID to verify exact path construction
let table_path = Url::parse("file:///tmp/test_table/").unwrap();
let prefix = String::from("dv");
let known_uuid = uuid::Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap();

let dv_path = DeletionVectorPath::new_with_uuid(table_path, prefix, known_uuid);
let abs_path = dv_path.absolute_path().unwrap();

// Verify the exact path is constructed correctly
let expected_path =
"file:///tmp/test_table/dv/deletion_vector_550e8400-e29b-41d4-a716-446655440000.bin";
assert_eq!(abs_path.as_str(), expected_path);

// Verify the encoded_relative_path is exactly as expected (prefix + z85 encoded UUID: 20 chars)
let encoded = dv_path.encoded_relative_path();
assert_eq!(encoded, "dvrsTVZ&*Sl-RXRWjryu/!");
}

#[test]
fn test_deletion_vector_path_absolute_path_with_known_uuid_empty_prefix() {
// Test with a known UUID and empty prefix
let table_path = Url::parse("file:///tmp/test_table/").unwrap();
let prefix = String::from("");
let known_uuid = uuid::Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000").unwrap();

let dv_path = DeletionVectorPath::new_with_uuid(table_path, prefix, known_uuid);
let abs_path = dv_path.absolute_path().unwrap();

// Verify the exact path is constructed correctly without prefix directory
let expected_path =
"file:///tmp/test_table/deletion_vector_123e4567-e89b-12d3-a456-426614174000.bin";
assert_eq!(abs_path.as_str(), expected_path);

// Verify the encoded_relative_path is exactly as expected (z85 encoded UUID: 20 chars)
let encoded = dv_path.encoded_relative_path();
assert_eq!(encoded, "5<w-%>:JjlQ/G/]6C<1m");
}
}
Loading
Loading