From 8377f940dde884eb2425974f8867f3212ba0adf5 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Fri, 10 Oct 2025 10:09:58 +0200 Subject: [PATCH 1/5] feat: allow scanning the whole domain metadata --- ffi/src/domain_metadata.rs | 88 +++++++++++++++++++++++++++ ffi/src/lib.rs | 11 ++++ ffi/src/scan.rs | 12 +--- ffi/src/schema.rs | 2 +- kernel/src/actions/domain_metadata.rs | 11 ++++ kernel/src/snapshot.rs | 15 ++++- 6 files changed, 126 insertions(+), 13 deletions(-) diff --git a/ffi/src/domain_metadata.rs b/ffi/src/domain_metadata.rs index 7f51dd607..d1c2e5f56 100644 --- a/ffi/src/domain_metadata.rs +++ b/ffi/src/domain_metadata.rs @@ -37,6 +37,51 @@ fn get_domain_metadata_impl( .and_then(|config: String| allocate_fn(kernel_string_slice!(config)))) } +/// Get the domain metadata as an optional string allocated by `AllocatedStringFn` for a specific domain in this snapshot +/// +/// # Safety +/// +/// Caller is responsible for passing in a valid handle +#[no_mangle] +pub unsafe extern "C" fn get_all_domain_metadata( + snapshot: Handle, + engine: Handle, + engine_context: NullableCvoid, + engine_visitor: extern "C" fn( + engine_context: NullableCvoid, + key: KernelStringSlice, + value: KernelStringSlice, + ), +) -> ExternResult { + let snapshot = unsafe { snapshot.as_ref() }; + let engine = unsafe { engine.as_ref() }; + + get_all_domain_metadata_impl(snapshot, engine, engine_context, engine_visitor) + .into_extern_result(&engine) +} + +fn get_all_domain_metadata_impl( + snapshot: &Snapshot, + extern_engine: &dyn ExternEngine, + engine_context: NullableCvoid, + engine_visitor: extern "C" fn( + engine_context: NullableCvoid, + key: KernelStringSlice, + value: KernelStringSlice, + ), +) -> DeltaResult { + let res = snapshot.get_all_domain_metadata(extern_engine.engine().as_ref()); + res?.iter().for_each(|(key, value)| { + engine_visitor( + engine_context, + kernel_string_slice!(key), + kernel_string_slice!(value), + ); + }); + + Ok(true) +} + #[cfg(test)] mod tests { use super::*; @@ -51,6 +96,8 @@ mod tests { use delta_kernel::DeltaResult; use object_store::memory::InMemory; use serde_json::json; + use std::collections::HashMap; + use std::ptr::NonNull; use std::sync::Arc; use test_utils::add_commit; @@ -148,6 +195,7 @@ mod tests { ) }; + // First, we test fetching the domain metadata one-by-one let domain1 = "domain1"; let res = ok_or_panic(get_domain_metadata_helper(domain1)); assert!(res.is_none()); @@ -160,6 +208,46 @@ mod tests { let res = get_domain_metadata_helper(domain3); assert_extern_result_error_with_message(res, KernelError::GenericError, "Generic delta kernel error: User DomainMetadata are not allowed to use system-controlled 'delta.*' domain"); + // Secondly, we scan the entire domain metadata + let visitor_state: Box> = + Box::new(std::collections::HashMap::new()); + let visitor_state_ptr = Box::into_raw(visitor_state); + + extern "C" fn visitor( + state: NullableCvoid, + key: KernelStringSlice, + value: KernelStringSlice, + ) { + let mut collected_metadata = unsafe { + Box::from_raw( + state.unwrap().as_ptr() as *mut std::collections::HashMap + ) + }; + let key: DeltaResult = unsafe { TryFromStringSlice::try_from_slice(&key) }; + let value: DeltaResult = unsafe { TryFromStringSlice::try_from_slice(&value) }; + collected_metadata.insert(key.unwrap(), value.unwrap()); + Box::leak(collected_metadata); + } + + let res = unsafe { + ok_or_panic(get_all_domain_metadata( + snapshot.shallow_copy(), + engine.shallow_copy(), + Some(NonNull::new_unchecked(visitor_state_ptr).cast()), + visitor, + )) + }; + + // Confirm visitor picked up all entries in map + let collected_metadata = unsafe { Box::from_raw(visitor_state_ptr) }; + assert!(res); + assert!(collected_metadata.get("domain1").is_none()); + assert!(collected_metadata.get("delta.domain3").is_none()); + assert_eq!( + collected_metadata.get("domain2").unwrap(), + "domain2_commit1" + ); + unsafe { free_snapshot(snapshot) } unsafe { free_engine(engine) } diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index 4308e6bb2..6a4457ebd 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -52,6 +52,17 @@ pub mod transaction; pub(crate) type NullableCvoid = Option>; +#[derive(Default)] +pub struct CStringMap { + values: HashMap, +} + +impl From> for CStringMap { + fn from(val: HashMap) -> Self { + Self { values: val } + } +} + /// Model iterators. This allows an engine to specify iteration however it likes, and we simply wrap /// the engine functions. The engine retains ownership of the iterator. #[repr(C)] diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index 4ec116793..2dddc46bd 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -14,6 +14,7 @@ use url::Url; use crate::expressions::kernel_visitor::{unwrap_kernel_predicate, KernelExpressionVisitorState}; use crate::expressions::SharedExpression; +use crate::CStringMap; use crate::{ kernel_string_slice, unwrap_and_parse_path_as_url, AllocateStringFn, ExternEngine, ExternResult, IntoExternResult, KernelBoolSlice, KernelRowIndexArray, KernelStringSlice, @@ -309,17 +310,6 @@ type CScanCallback = extern "C" fn( partition_map: &CStringMap, ); -#[derive(Default)] -pub struct CStringMap { - values: HashMap, -} - -impl From> for CStringMap { - fn from(val: HashMap) -> Self { - Self { values: val } - } -} - #[no_mangle] /// allow probing into a CStringMap. If the specified key is in the map, kernel will call /// allocate_fn with the value associated with the key and return the value returned from that diff --git a/ffi/src/schema.rs b/ffi/src/schema.rs index e891aaff3..82342557f 100644 --- a/ffi/src/schema.rs +++ b/ffi/src/schema.rs @@ -1,7 +1,7 @@ use std::os::raw::c_void; use crate::handle::Handle; -use crate::scan::CStringMap; +use crate::CStringMap; use crate::{kernel_string_slice, KernelStringSlice, SharedSchema}; use delta_kernel::schema::{ArrayType, DataType, MapType, PrimitiveType, StructType}; diff --git a/kernel/src/actions/domain_metadata.rs b/kernel/src/actions/domain_metadata.rs index 8ef5e89bd..57a80ad0e 100644 --- a/kernel/src/actions/domain_metadata.rs +++ b/kernel/src/actions/domain_metadata.rs @@ -34,6 +34,17 @@ pub(crate) fn domain_metadata_configuration( .remove(domain) .map(|domain_metadata| domain_metadata.configuration)) } +pub(crate) fn all_domain_metadata_configuration( + log_segment: &LogSegment, + engine: &dyn Engine, +) -> DeltaResult> { + let domain_metadatas = scan_domain_metadatas(log_segment, None, engine)?; + + Ok(domain_metadatas + .into_iter() + .map(|(key, domain_metadata)| (key, domain_metadata.configuration)) + .collect()) +} /// Scan the entire log for all domain metadata actions but terminate early if a specific domain /// is provided. Note that this returns the latest domain metadata for each domain, accounting for diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 9665a7497..7b885f2de 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -1,10 +1,13 @@ //! In-memory representation of snapshots of tables (snapshot is a table at given point in time, it //! has schema etc.) +use std::collections::HashMap; use std::sync::Arc; use crate::action_reconciliation::calculate_transaction_expiration_timestamp; -use crate::actions::domain_metadata::domain_metadata_configuration; +use crate::actions::domain_metadata::{ + all_domain_metadata_configuration, domain_metadata_configuration, +}; use crate::actions::set_transaction::SetTransactionScanner; use crate::actions::INTERNAL_DOMAIN_PREFIX; use crate::checkpoint::CheckpointWriter; @@ -369,6 +372,16 @@ impl Snapshot { domain_metadata_configuration(self.log_segment(), domain, engine) } + pub fn get_all_domain_metadata( + &self, + engine: &dyn Engine, + ) -> DeltaResult> { + let all_metadata = all_domain_metadata_configuration(self.log_segment(), engine)?; + Ok(all_metadata + .into_iter() + .filter(|(key, _)| !key.starts_with(INTERNAL_DOMAIN_PREFIX)) + .collect()) + } /// Get the In-Commit Timestamp (ICT) for this snapshot. /// From 574d32ad73d3a4440d1c99e0036e593d5f1b9360 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Thu, 9 Oct 2025 12:53:29 +0200 Subject: [PATCH 2/5] fix: rename and cleanup --- ffi/src/domain_metadata.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/ffi/src/domain_metadata.rs b/ffi/src/domain_metadata.rs index d1c2e5f56..4ddeda00d 100644 --- a/ffi/src/domain_metadata.rs +++ b/ffi/src/domain_metadata.rs @@ -43,7 +43,7 @@ fn get_domain_metadata_impl( /// /// Caller is responsible for passing in a valid handle #[no_mangle] -pub unsafe extern "C" fn get_all_domain_metadata( +pub unsafe extern "C" fn visit_domain_metadata( snapshot: Handle, engine: Handle, engine_context: NullableCvoid, @@ -56,11 +56,11 @@ pub unsafe extern "C" fn get_all_domain_metadata( let snapshot = unsafe { snapshot.as_ref() }; let engine = unsafe { engine.as_ref() }; - get_all_domain_metadata_impl(snapshot, engine, engine_context, engine_visitor) + visit_domain_metadata_impl(snapshot, engine, engine_context, engine_visitor) .into_extern_result(&engine) } -fn get_all_domain_metadata_impl( +fn visit_domain_metadata_impl( snapshot: &Snapshot, extern_engine: &dyn ExternEngine, engine_context: NullableCvoid, @@ -196,6 +196,7 @@ mod tests { }; // First, we test fetching the domain metadata one-by-one + let domain1 = "domain1"; let res = ok_or_panic(get_domain_metadata_helper(domain1)); assert!(res.is_none()); @@ -208,11 +209,14 @@ mod tests { let res = get_domain_metadata_helper(domain3); assert_extern_result_error_with_message(res, KernelError::GenericError, "Generic delta kernel error: User DomainMetadata are not allowed to use system-controlled 'delta.*' domain"); - // Secondly, we scan the entire domain metadata + // Secondly, we visit the entire domain metadata + + // Create visitor state let visitor_state: Box> = Box::new(std::collections::HashMap::new()); let visitor_state_ptr = Box::into_raw(visitor_state); + // Test visitor function extern "C" fn visitor( state: NullableCvoid, key: KernelStringSlice, @@ -229,8 +233,9 @@ mod tests { Box::leak(collected_metadata); } + // Visit all (user) domain metadata let res = unsafe { - ok_or_panic(get_all_domain_metadata( + ok_or_panic(visit_domain_metadata( snapshot.shallow_copy(), engine.shallow_copy(), Some(NonNull::new_unchecked(visitor_state_ptr).cast()), From 0c2b4bee4956fd8ae3d5ec6c930e0498e365fc11 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Thu, 9 Oct 2025 12:59:38 +0200 Subject: [PATCH 3/5] test: get_all_domain_metadata --- kernel/src/snapshot.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 7b885f2de..f4b94a8e0 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -1134,6 +1134,8 @@ mod tests { let snapshot = Snapshot::builder_for(url.clone()).build(&engine)?; + // Test get_domain_metadata + assert_eq!(snapshot.get_domain_metadata("domain1", &engine)?, None); assert_eq!( snapshot.get_domain_metadata("domain2", &engine)?, @@ -1148,6 +1150,17 @@ mod tests { .unwrap_err(); assert!(matches!(err, Error::Generic(msg) if msg == "User DomainMetadata are not allowed to use system-controlled 'delta.*' domain")); + + // Test get_all_domain_metadata + + let metadata = snapshot.get_all_domain_metadata(&engine)?; + + let mut expected = HashMap::new(); + expected.insert("domain2".to_string(), "domain2_commit1".to_string()); + expected.insert("domain3".to_string(), "domain3_commit0".to_string()); + + assert_eq!(metadata, expected); + Ok(()) } From 1065f8a63983adcebe649b1bc07c28cf430f0222 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Thu, 9 Oct 2025 13:16:16 +0200 Subject: [PATCH 4/5] refactor: naming consistency with other code --- ffi/src/domain_metadata.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ffi/src/domain_metadata.rs b/ffi/src/domain_metadata.rs index 4ddeda00d..e10cdac66 100644 --- a/ffi/src/domain_metadata.rs +++ b/ffi/src/domain_metadata.rs @@ -47,7 +47,7 @@ pub unsafe extern "C" fn visit_domain_metadata( snapshot: Handle, engine: Handle, engine_context: NullableCvoid, - engine_visitor: extern "C" fn( + visitor: extern "C" fn( engine_context: NullableCvoid, key: KernelStringSlice, value: KernelStringSlice, @@ -56,7 +56,7 @@ pub unsafe extern "C" fn visit_domain_metadata( let snapshot = unsafe { snapshot.as_ref() }; let engine = unsafe { engine.as_ref() }; - visit_domain_metadata_impl(snapshot, engine, engine_context, engine_visitor) + visit_domain_metadata_impl(snapshot, engine, engine_context, visitor) .into_extern_result(&engine) } @@ -64,7 +64,7 @@ fn visit_domain_metadata_impl( snapshot: &Snapshot, extern_engine: &dyn ExternEngine, engine_context: NullableCvoid, - engine_visitor: extern "C" fn( + visitor: extern "C" fn( engine_context: NullableCvoid, key: KernelStringSlice, value: KernelStringSlice, @@ -72,7 +72,7 @@ fn visit_domain_metadata_impl( ) -> DeltaResult { let res = snapshot.get_all_domain_metadata(extern_engine.engine().as_ref()); res?.iter().for_each(|(key, value)| { - engine_visitor( + visitor( engine_context, kernel_string_slice!(key), kernel_string_slice!(value), @@ -212,8 +212,7 @@ mod tests { // Secondly, we visit the entire domain metadata // Create visitor state - let visitor_state: Box> = - Box::new(std::collections::HashMap::new()); + let visitor_state: Box> = Box::default(); let visitor_state_ptr = Box::into_raw(visitor_state); // Test visitor function From a27ad6fd90eec1abedb5bee3945bc989930afc8b Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Thu, 9 Oct 2025 13:46:51 +0200 Subject: [PATCH 5/5] fix: add missing use --- ffi/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index 6a4457ebd..2aff22241 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -3,7 +3,6 @@ //! Exposes that an engine needs to call from C/C++ to interface with kernel #[cfg(feature = "default-engine-base")] -use std::collections::HashMap; use std::default::Default; use std::os::raw::{c_char, c_void}; use std::ptr::NonNull; @@ -16,6 +15,7 @@ use delta_kernel::snapshot::Snapshot; use delta_kernel::Version; use delta_kernel::{DeltaResult, Engine, EngineData}; use delta_kernel_ffi_macros::handle_descriptor; +use std::collections::HashMap; // cbindgen doesn't understand our use of feature flags here, and by default it parses `mod handle` // twice. So we tell it to ignore one of the declarations to avoid double-definition errors.