Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions acceptance/tests/dat_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::path::Path;
use std::sync::Arc;

use acceptance::read_dat_case;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;

// TODO(zach): skip iceberg_compat_v1 test until DAT is fixed
Expand Down Expand Up @@ -31,7 +30,7 @@ fn reader_test(path: &Path) -> datatest_stable::Result<()> {
DefaultEngine::try_new(
&table_root,
std::iter::empty::<(&str, &str)>(),
Arc::new(TokioBackgroundExecutor::new()),
tokio::runtime::Handle::current(),
)
.unwrap(),
);
Expand Down
4 changes: 2 additions & 2 deletions ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ release = false
crate-type = ["lib", "cdylib", "staticlib"]

[dependencies]
tokio = { version = "1.47", features = ["rt-multi-thread"], optional = true }
tracing = "0.1"
tracing-core = { version = "0.1", optional = true }
tracing-subscriber = { version = "0.3", optional = true, features = [ "json" ] }
Expand All @@ -36,7 +37,6 @@ rand = "0.9.2"
serde = "1.0.219"
serde_json = "1.0.142"
test_utils = { path = "../test-utils" }
tokio = { version = "1.47" }
trybuild = "1.0"
tempfile = "3.20.0"
itertools = "0.14.0"
Expand All @@ -50,7 +50,7 @@ default-engine-rustls = ["delta_kernel/default-engine-rustls", "default-engine-b
# This is an 'internal' feature flag which has all the shared bits from default-engine-native-tls and
# default-engine-rustls. There is a check in kernel/lib.rs to ensure you have enabled one of
# default-engine-native-tls or default-engine-rustls, so default-engine-base will not work by itself
default-engine-base = ["delta_kernel/default-engine-base", "delta_kernel/arrow"]
default-engine-base = ["delta_kernel/default-engine-base", "delta_kernel/arrow", "tokio"]

tracing = [ "tracing-core", "tracing-subscriber" ]
internal-api = []
Expand Down
11 changes: 9 additions & 2 deletions ffi/src/domain_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,27 @@ mod tests {
allocate_err, allocate_str, assert_extern_result_error_with_message, ok_or_panic,
recover_string,
};
#[cfg(feature = "default-engine-base")]
use crate::{engine_to_handle, free_engine, free_snapshot, kernel_string_slice, snapshot};
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
#[cfg(feature = "default-engine-base")]
use delta_kernel::engine::default::DefaultEngine;
#[cfg(feature = "default-engine-base")]
use delta_kernel::DeltaResult;
#[cfg(feature = "default-engine-base")]
use object_store::memory::InMemory;
#[cfg(feature = "default-engine-base")]
use serde_json::json;
#[cfg(feature = "default-engine-base")]
use std::sync::Arc;
#[cfg(feature = "default-engine-base")]
use test_utils::add_commit;

#[cfg(feature = "default-engine-base")]
#[tokio::test]
async fn test_domain_metadata() -> DeltaResult<()> {
let storage = Arc::new(InMemory::new());

let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
let engine = DefaultEngine::new(storage.clone(), ::tokio::runtime::Handle::current());
let engine = engine_to_handle(Arc::new(engine), allocate_err);
let path = "memory:///";

Expand Down
5 changes: 4 additions & 1 deletion ffi/src/engine_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,16 @@ fn evaluate_expression_impl(
#[cfg(test)]
mod tests {
use super::{free_expression_evaluator, new_expression_evaluator};
use crate::{free_engine, handle::Handle, tests::get_default_engine, SharedSchema};
#[cfg(feature = "default-engine-base")]
use crate::tests::get_default_engine;
use crate::{free_engine, handle::Handle, SharedSchema};
use delta_kernel::{
schema::{DataType, StructField, StructType},
Expression,
};
use std::sync::Arc;

#[cfg(feature = "default-engine-base")]
#[test]
fn test_new_expression_evaluator() {
let engine = get_default_engine("memory:///doesntmatter/foo");
Expand Down
41 changes: 30 additions & 11 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//!
//! Exposes that an engine needs to call from C/C++ to interface with kernel

#[cfg(feature = "default-engine-base")]
use std::collections::HashMap;
use std::default::Default;
use std::os::raw::{c_char, c_void};
Expand Down Expand Up @@ -540,16 +539,28 @@ fn get_default_engine_impl(
options: HashMap<String, String>,
allocate_error: AllocateErrorFn,
) -> DeltaResult<Handle<SharedExternEngine>> {
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
let engine = DefaultEngine::<TokioBackgroundExecutor>::try_new(
&url,
options,
Arc::new(TokioBackgroundExecutor::new()),
);
// Create a tokio runtime and get its handle
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create tokio runtime");
let handle = rt.handle().clone();
let engine = DefaultEngine::try_new(&url, options, handle);
Ok(engine_to_handle(Arc::new(engine?), allocate_error))
}

#[cfg(not(feature = "default-engine-base"))]
fn get_default_engine_impl(
_url: Url,
_options: HashMap<String, String>,
_allocate_error: AllocateErrorFn,
) -> DeltaResult<Handle<SharedExternEngine>> {
Err(delta_kernel::Error::Generic(
"Default engine not available. Enable 'default-engine-rustls' or 'default-engine-native-tls' feature.".to_string()
))
}

/// # Safety
///
/// Caller is responsible for passing a valid handle.
Expand Down Expand Up @@ -794,8 +805,11 @@ mod tests {
allocate_err, allocate_str, assert_extern_result_error_with_message, ok_or_panic,
recover_string,
};
use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine};
#[cfg(feature = "default-engine-base")]
use delta_kernel::engine::default::DefaultEngine;
#[cfg(feature = "default-engine-base")]
use object_store::memory::InMemory;
#[cfg(feature = "default-engine-base")]
use test_utils::{actions_to_string, actions_to_string_partitioned, add_commit, TestAction};

#[no_mangle]
Expand All @@ -818,12 +832,14 @@ mod tests {
}
}

#[cfg(feature = "default-engine-base")]
pub(crate) fn get_default_engine(path: &str) -> Handle<SharedExternEngine> {
let path = kernel_string_slice!(path);
let builder = unsafe { ok_or_panic(get_engine_builder(path, allocate_err)) };
unsafe { ok_or_panic(builder_build(builder)) }
}

#[cfg(feature = "default-engine-base")]
#[test]
fn engine_builder() {
let engine = get_default_engine("memory:///doesntmatter/foo");
Expand All @@ -832,6 +848,7 @@ mod tests {
}
}

#[cfg(feature = "default-engine-base")]
#[tokio::test]
async fn test_snapshot() -> Result<(), Box<dyn std::error::Error>> {
let storage = Arc::new(InMemory::new());
Expand All @@ -841,7 +858,7 @@ mod tests {
actions_to_string(vec![TestAction::Metadata]),
)
.await?;
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
let engine = DefaultEngine::new(storage.clone(), ::tokio::runtime::Handle::current());
let engine = engine_to_handle(Arc::new(engine), allocate_err);
let path = "memory:///";

Expand Down Expand Up @@ -878,6 +895,7 @@ mod tests {
Ok(())
}

#[cfg(feature = "default-engine-base")]
#[tokio::test]
async fn test_snapshot_partition_cols() -> Result<(), Box<dyn std::error::Error>> {
let storage = Arc::new(InMemory::new());
Expand All @@ -887,7 +905,7 @@ mod tests {
actions_to_string_partitioned(vec![TestAction::Metadata]),
)
.await?;
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
let engine = DefaultEngine::new(storage.clone(), ::tokio::runtime::Handle::current());
let engine = engine_to_handle(Arc::new(engine), allocate_err);
let path = "memory:///";

Expand All @@ -914,6 +932,7 @@ mod tests {
Ok(())
}

#[cfg(feature = "default-engine-base")]
#[tokio::test]
async fn allocate_null_err_okay() -> Result<(), Box<dyn std::error::Error>> {
let storage = Arc::new(InMemory::new());
Expand All @@ -923,7 +942,7 @@ mod tests {
actions_to_string(vec![TestAction::Metadata]),
)
.await?;
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
let engine = DefaultEngine::new(storage.clone(), ::tokio::runtime::Handle::current());
let engine = engine_to_handle(Arc::new(engine), allocate_null_err);
let path = "memory:///";

Expand Down
7 changes: 7 additions & 0 deletions ffi/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,13 @@ mod tests {
use delta_kernel::parquet::file::properties::WriterProperties;
use delta_kernel::transaction::add_files_schema;

#[cfg(feature = "default-engine-base")]
use delta_kernel_ffi::engine_data::get_engine_data;
#[cfg(feature = "default-engine-base")]
use delta_kernel_ffi::engine_data::ArrowFFIData;

use delta_kernel_ffi::ffi_test_utils::{allocate_str, ok_or_panic, recover_string};
#[cfg(feature = "default-engine-base")]
use delta_kernel_ffi::tests::get_default_engine;

use crate::{free_engine, free_schema, kernel_string_slice};
Expand Down Expand Up @@ -170,6 +173,7 @@ mod tests {
.expect("txnId should be present in commitInfo");
}

#[cfg(feature = "default-engine-base")]
fn create_arrow_ffi_from_json(
schema: ArrowSchema,
json_string: &str,
Expand All @@ -188,6 +192,7 @@ mod tests {
})
}

#[cfg(feature = "default-engine-base")]
fn create_file_metadata(
path: &str,
num_rows: i64,
Expand All @@ -206,6 +211,7 @@ mod tests {
create_arrow_ffi_from_json(schema, file_metadata.as_str())
}

#[cfg(feature = "default-engine-base")]
fn write_parquet_file(
delta_path: &str,
file_path: &str,
Expand All @@ -226,6 +232,7 @@ mod tests {
create_file_metadata(file_path, res.num_rows)
}

#[cfg(feature = "default-engine-base")]
#[tokio::test]
#[cfg_attr(miri, ignore)] // FIXME: re-enable miri (can't call foreign function `linkat` on OS `linux`)
async fn test_basic_append() -> Result<(), Box<dyn std::error::Error>> {
Expand Down
12 changes: 7 additions & 5 deletions kernel/benches/metadata_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use std::collections::HashMap;
use std::sync::Arc;

use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::snapshot::Snapshot;
use delta_kernel::try_parse_uri;
Expand All @@ -34,16 +33,19 @@ use url::Url;
// force scan metadata bench to use smaller sample size so test runs faster (100 -> 20)
const SCAN_METADATA_BENCH_SAMPLE_SIZE: usize = 20;

fn setup() -> (TempDir, Url, Arc<DefaultEngine<TokioBackgroundExecutor>>) {
fn setup() -> (TempDir, Url, Arc<DefaultEngine>) {
// note this table _only_ has a _delta_log, no data files (can only do metadata reads)
let table = "300k-add-files-100-col-partitioned";
let tempdir = load_test_data("./tests/data", table).unwrap();
let table_path = tempdir.path().join(table);
let url = try_parse_uri(table_path.to_str().unwrap()).expect("Failed to parse table path");
// TODO: use multi-threaded executor
let executor = Arc::new(TokioBackgroundExecutor::new());
let engine = DefaultEngine::try_new(&url, HashMap::<String, String>::new(), executor)
.expect("Failed to create engine");
let engine = DefaultEngine::try_new(
&url,
HashMap::<String, String>::new(),
tokio::runtime::Handle::current(),
)
.expect("Failed to create engine");

(tempdir, url, Arc::new(engine))
}
Expand Down
1 change: 1 addition & 0 deletions kernel/examples/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ delta_kernel = { path = "../../../kernel", features = [
"default-engine-rustls",
"internal-api",
] }
tokio = { version = "1.47", features = ["rt-multi-thread"] }
url = "2"
12 changes: 3 additions & 9 deletions kernel/examples/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use std::{collections::HashMap, sync::Arc};

use clap::Args;
use delta_kernel::{
arrow::array::RecordBatch,
engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine},
scan::Scan,
schema::Schema,
arrow::array::RecordBatch, engine::default::DefaultEngine, scan::Scan, schema::Schema,
DeltaResult, SnapshotRef,
};

Expand Down Expand Up @@ -45,10 +42,7 @@ pub struct ScanArgs {
}

/// Get an engine configured to read table at `url` and `LocationArgs`
pub fn get_engine(
url: &Url,
args: &LocationArgs,
) -> DeltaResult<DefaultEngine<TokioBackgroundExecutor>> {
pub fn get_engine(url: &Url, args: &LocationArgs) -> DeltaResult<DefaultEngine> {
let mut options = if let Some(ref region) = args.region {
HashMap::from([("region", region.clone())])
} else {
Expand All @@ -57,7 +51,7 @@ pub fn get_engine(
if args.public {
options.insert("skip_signature", "true".to_string());
}
DefaultEngine::try_new(url, options, Arc::new(TokioBackgroundExecutor::new()))
DefaultEngine::try_new(url, options, tokio::runtime::Handle::current())
}

/// Construct a scan at the latest snapshot. This is over the specified table and using the passed
Expand Down
8 changes: 2 additions & 6 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use uuid::Uuid;
use delta_kernel::arrow::array::TimestampMicrosecondArray;
use delta_kernel::engine::arrow_conversion::TryIntoArrow;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
use delta_kernel::transaction::CommitResult;
Expand Down Expand Up @@ -75,7 +74,7 @@ async fn try_main() -> DeltaResult<()> {
let engine = DefaultEngine::try_new(
&url,
HashMap::<String, String>::new(),
Arc::new(TokioBackgroundExecutor::new()),
tokio::runtime::Handle::current(),
)?;

// Create or get the table
Expand Down Expand Up @@ -290,10 +289,7 @@ fn create_sample_data(schema: &SchemaRef, num_rows: usize) -> DeltaResult<ArrowE
}

/// Read and display data from the table.
async fn read_and_display_data(
table_url: &Url,
engine: DefaultEngine<TokioBackgroundExecutor>,
) -> DeltaResult<()> {
async fn read_and_display_data(table_url: &Url, engine: DefaultEngine) -> DeltaResult<()> {
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let scan = snapshot.scan_builder().build()?;

Expand Down
Loading
Loading