Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions ffi/src/domain_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,51 @@ fn get_domain_metadata_impl(
.and_then(|config: String| allocate_fn(kernel_string_slice!(config))))
}

/// Get the domain metadata as an optional string allocated by `AllocatedStringFn` for a specific domain in this snapshot
///
/// # Safety
///
/// Caller is responsible for passing in a valid handle
#[no_mangle]
pub unsafe extern "C" fn visit_domain_metadata(
snapshot: Handle<SharedSnapshot>,
engine: Handle<SharedExternEngine>,
engine_context: NullableCvoid,
visitor: extern "C" fn(
engine_context: NullableCvoid,
key: KernelStringSlice,
value: KernelStringSlice,
Comment on lines +52 to +53
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call these domain and configuration to match the protocol. we should probably also include if the domain is removed

),
) -> ExternResult<bool> {
let snapshot = unsafe { snapshot.as_ref() };
let engine = unsafe { engine.as_ref() };

visit_domain_metadata_impl(snapshot, engine, engine_context, visitor)
.into_extern_result(&engine)
}

fn visit_domain_metadata_impl(
snapshot: &Snapshot,
extern_engine: &dyn ExternEngine,
engine_context: NullableCvoid,
visitor: extern "C" fn(
engine_context: NullableCvoid,
key: KernelStringSlice,
value: KernelStringSlice,
),
) -> DeltaResult<bool> {
let res = snapshot.get_all_domain_metadata(extern_engine.engine().as_ref());
res?.iter().for_each(|(key, value)| {
visitor(
engine_context,
kernel_string_slice!(key),
kernel_string_slice!(value),
);
});

Ok(true)
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -51,6 +96,8 @@ mod tests {
use delta_kernel::DeltaResult;
use object_store::memory::InMemory;
use serde_json::json;
use std::collections::HashMap;
use std::ptr::NonNull;
use std::sync::Arc;
use test_utils::add_commit;

Expand Down Expand Up @@ -148,6 +195,8 @@ mod tests {
)
};

// First, we test fetching the domain metadata one-by-one

let domain1 = "domain1";
let res = ok_or_panic(get_domain_metadata_helper(domain1));
assert!(res.is_none());
Expand All @@ -160,6 +209,49 @@ mod tests {
let res = get_domain_metadata_helper(domain3);
assert_extern_result_error_with_message(res, KernelError::GenericError, "Generic delta kernel error: User DomainMetadata are not allowed to use system-controlled 'delta.*' domain");

// Secondly, we visit the entire domain metadata

// Create visitor state
let visitor_state: Box<HashMap<String, String>> = Box::default();
let visitor_state_ptr = Box::into_raw(visitor_state);

// Test visitor function
extern "C" fn visitor(
state: NullableCvoid,
key: KernelStringSlice,
value: KernelStringSlice,
) {
let mut collected_metadata = unsafe {
Box::from_raw(
state.unwrap().as_ptr() as *mut std::collections::HashMap<String, String>
)
};
let key: DeltaResult<String> = unsafe { TryFromStringSlice::try_from_slice(&key) };
let value: DeltaResult<String> = unsafe { TryFromStringSlice::try_from_slice(&value) };
collected_metadata.insert(key.unwrap(), value.unwrap());
Box::leak(collected_metadata);
}

// Visit all (user) domain metadata
let res = unsafe {
ok_or_panic(visit_domain_metadata(
snapshot.shallow_copy(),
engine.shallow_copy(),
Some(NonNull::new_unchecked(visitor_state_ptr).cast()),
visitor,
))
};

// Confirm visitor picked up all entries in map
let collected_metadata = unsafe { Box::from_raw(visitor_state_ptr) };
assert!(res);
assert!(collected_metadata.get("domain1").is_none());
assert!(collected_metadata.get("delta.domain3").is_none());
assert_eq!(
collected_metadata.get("domain2").unwrap(),
"domain2_commit1"
);

unsafe { free_snapshot(snapshot) }
unsafe { free_engine(engine) }

Expand Down
13 changes: 12 additions & 1 deletion ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//! Exposes that an engine needs to call from C/C++ to interface with kernel

#[cfg(feature = "default-engine-base")]
use std::collections::HashMap;
use std::default::Default;
use std::os::raw::{c_char, c_void};
use std::ptr::NonNull;
Expand All @@ -16,6 +15,7 @@ use delta_kernel::snapshot::Snapshot;
use delta_kernel::Version;
use delta_kernel::{DeltaResult, Engine, EngineData};
use delta_kernel_ffi_macros::handle_descriptor;
use std::collections::HashMap;

// cbindgen doesn't understand our use of feature flags here, and by default it parses `mod handle`
// twice. So we tell it to ignore one of the declarations to avoid double-definition errors.
Expand Down Expand Up @@ -52,6 +52,17 @@ pub mod transaction;

pub(crate) type NullableCvoid = Option<NonNull<c_void>>;

#[derive(Default)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're not using CStringMap in this PR? Can we revert this change if it's not needed?

pub struct CStringMap {
values: HashMap<String, String>,
}

impl From<HashMap<String, String>> for CStringMap {
fn from(val: HashMap<String, String>) -> Self {
Self { values: val }
}
}

/// Model iterators. This allows an engine to specify iteration however it likes, and we simply wrap
/// the engine functions. The engine retains ownership of the iterator.
#[repr(C)]
Expand Down
12 changes: 1 addition & 11 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use url::Url;

use crate::expressions::kernel_visitor::{unwrap_kernel_predicate, KernelExpressionVisitorState};
use crate::expressions::SharedExpression;
use crate::CStringMap;
use crate::{
kernel_string_slice, unwrap_and_parse_path_as_url, AllocateStringFn, ExternEngine,
ExternResult, IntoExternResult, KernelBoolSlice, KernelRowIndexArray, KernelStringSlice,
Expand Down Expand Up @@ -309,17 +310,6 @@ type CScanCallback = extern "C" fn(
partition_map: &CStringMap,
);

#[derive(Default)]
pub struct CStringMap {
values: HashMap<String, String>,
}

impl From<HashMap<String, String>> for CStringMap {
fn from(val: HashMap<String, String>) -> Self {
Self { values: val }
}
}

#[no_mangle]
/// allow probing into a CStringMap. If the specified key is in the map, kernel will call
/// allocate_fn with the value associated with the key and return the value returned from that
Expand Down
2 changes: 1 addition & 1 deletion ffi/src/schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::os::raw::c_void;

use crate::handle::Handle;
use crate::scan::CStringMap;
use crate::CStringMap;
use crate::{kernel_string_slice, KernelStringSlice, SharedSchema};
use delta_kernel::schema::{ArrayType, DataType, MapType, PrimitiveType, StructType};

Expand Down
11 changes: 11 additions & 0 deletions kernel/src/actions/domain_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ pub(crate) fn domain_metadata_configuration(
.remove(domain)
.map(|domain_metadata| domain_metadata.configuration))
}
pub(crate) fn all_domain_metadata_configuration(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a newline above

log_segment: &LogSegment,
engine: &dyn Engine,
) -> DeltaResult<HashMap<String, String>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a bit like an odd api. I'd prefer to return a Vec<DomainMetadata> where:

struct DomainMetadata {
  domain: String,
  configuration: String,
  removed: bool,
}

We already have exactly this type in actions/mod.rs but it's #[internal_api]. I think we should just make the get_all_domain_metadata in Snapshot also #[internal_api], we already enable that feature in the ffi crate.

let domain_metadatas = scan_domain_metadatas(log_segment, None, engine)?;

Ok(domain_metadatas
.into_iter()
.map(|(key, domain_metadata)| (key, domain_metadata.configuration))
.collect())
}

/// Scan the entire log for all domain metadata actions but terminate early if a specific domain
/// is provided. Note that this returns the latest domain metadata for each domain, accounting for
Expand Down
28 changes: 27 additions & 1 deletion kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
//! In-memory representation of snapshots of tables (snapshot is a table at given point in time, it
//! has schema etc.)

use std::collections::HashMap;
use std::sync::Arc;

use crate::action_reconciliation::calculate_transaction_expiration_timestamp;
use crate::actions::domain_metadata::domain_metadata_configuration;
use crate::actions::domain_metadata::{
all_domain_metadata_configuration, domain_metadata_configuration,
};
use crate::actions::set_transaction::SetTransactionScanner;
use crate::actions::INTERNAL_DOMAIN_PREFIX;
use crate::checkpoint::CheckpointWriter;
Expand Down Expand Up @@ -369,6 +372,16 @@ impl Snapshot {

domain_metadata_configuration(self.log_segment(), domain, engine)
}
pub fn get_all_domain_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<HashMap<String, String>> {
let all_metadata = all_domain_metadata_configuration(self.log_segment(), engine)?;
Ok(all_metadata
.into_iter()
.filter(|(key, _)| !key.starts_with(INTERNAL_DOMAIN_PREFIX))
.collect())
}

/// Get the In-Commit Timestamp (ICT) for this snapshot.
///
Expand Down Expand Up @@ -1121,6 +1134,8 @@ mod tests {

let snapshot = Snapshot::builder_for(url.clone()).build(&engine)?;

// Test get_domain_metadata

assert_eq!(snapshot.get_domain_metadata("domain1", &engine)?, None);
assert_eq!(
snapshot.get_domain_metadata("domain2", &engine)?,
Expand All @@ -1135,6 +1150,17 @@ mod tests {
.unwrap_err();
assert!(matches!(err, Error::Generic(msg) if
msg == "User DomainMetadata are not allowed to use system-controlled 'delta.*' domain"));

// Test get_all_domain_metadata

let metadata = snapshot.get_all_domain_metadata(&engine)?;

let mut expected = HashMap::new();
expected.insert("domain2".to_string(), "domain2_commit1".to_string());
expected.insert("domain3".to_string(), "domain3_commit0".to_string());

assert_eq!(metadata, expected);

Ok(())
}

Expand Down
Loading