From 3c6446dd3b11e02eb39bf0c747cf4176657fe49a Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 24 Sep 2025 16:52:59 -0700 Subject: [PATCH 1/2] wip --- kernel/src/engine/default/executor.rs | 250 ----------------------- kernel/src/engine/default/file_stream.rs | 7 +- kernel/src/engine/default/filesystem.rs | 18 +- kernel/src/engine/default/json.rs | 18 +- kernel/src/engine/default/mod.rs | 34 ++- kernel/src/engine/default/parquet.rs | 16 +- 6 files changed, 45 insertions(+), 298 deletions(-) delete mode 100644 kernel/src/engine/default/executor.rs diff --git a/kernel/src/engine/default/executor.rs b/kernel/src/engine/default/executor.rs deleted file mode 100644 index 86831bc11..000000000 --- a/kernel/src/engine/default/executor.rs +++ /dev/null @@ -1,250 +0,0 @@ -// TODO: (#1112) remove panics in this file -#![allow(clippy::unwrap_used)] -#![allow(clippy::expect_used)] -#![allow(clippy::panic)] - -//! The default engine uses Async IO to read files, but the kernel APIs are all -//! synchronous. Therefore, we need an executor to run the async IO on in the -//! background. -//! -//! A generic trait [TaskExecutor] can be implemented with your preferred async -//! runtime. Behind the `tokio` feature flag, we provide a both a single-threaded -//! and multi-threaded executor based on Tokio. -use futures::{future::BoxFuture, Future}; - -use crate::DeltaResult; - -/// An executor that can be used to run async tasks. This is used by IO functions -/// within the `DefaultEngine`. -/// -/// This must be capable of running within an async context and running futures -/// on another thread. This could be a multi-threaded runtime, like Tokio's or -/// could be a single-threaded runtime on a background thread. -pub trait TaskExecutor: Send + Sync + 'static { - /// Block on the given future, returning its output. - /// - /// This should NOT panic if called within an async context. Thus it can't - /// be implemented by `tokio::runtime::Runtime::block_on`. - fn block_on(&self, task: T) -> T::Output - where - T: Future + Send + 'static, - T::Output: Send + 'static; - - /// Run the future in the background. - fn spawn(&self, task: F) - where - F: Future + Send + 'static; - - fn spawn_blocking(&self, task: T) -> BoxFuture<'_, DeltaResult> - where - T: FnOnce() -> R + Send + 'static, - R: Send + 'static; -} - -#[cfg(any(feature = "tokio", test))] -pub mod tokio { - use super::TaskExecutor; - use futures::TryFutureExt; - use futures::{future::BoxFuture, Future}; - use std::sync::mpsc::channel; - use tokio::runtime::RuntimeFlavor; - - use crate::DeltaResult; - - /// A [`TaskExecutor`] that uses the tokio single-threaded runtime in a - /// background thread to service tasks. - #[derive(Debug)] - pub struct TokioBackgroundExecutor { - sender: tokio::sync::mpsc::Sender>, - _thread: std::thread::JoinHandle<()>, - } - - impl Default for TokioBackgroundExecutor { - fn default() -> Self { - Self::new() - } - } - - impl TokioBackgroundExecutor { - pub fn new() -> Self { - let (sender, mut receiver) = tokio::sync::mpsc::channel::>(50); - let thread = std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - rt.block_on(async move { - while let Some(task) = receiver.recv().await { - tokio::task::spawn(task); - } - }); - }); - Self { - sender, - _thread: thread, - } - } - } - - impl TokioBackgroundExecutor { - fn send_future(&self, fut: BoxFuture<'static, ()>) { - // We cannot call `blocking_send()` because that calls `block_on` - // internally and panics if called within an async context. 🤦 - let mut fut = Some(fut); - loop { - match self.sender.try_send(fut.take().unwrap()) { - Ok(()) => break, - Err(tokio::sync::mpsc::error::TrySendError::Full(original)) => { - std::thread::yield_now(); - fut.replace(original); - } - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - panic!("TokioBackgroundExecutor channel closed") - } - }; - } - } - } - - impl TaskExecutor for TokioBackgroundExecutor { - fn block_on(&self, task: T) -> T::Output - where - T: Future + Send + 'static, - T::Output: Send + 'static, - { - // We cannot call `tokio::runtime::Runtime::block_on` here because - // it panics if called within an async context. So instead we spawn - // the future on the runtime and send the result back using a channel. - let (sender, receiver) = channel::(); - - let fut = Box::pin(async move { - let task_output = task.await; - tokio::task::spawn_blocking(move || { - sender.send(task_output).ok(); - }) - .await - .unwrap(); - }); - - self.send_future(fut); - - receiver - .recv() - .expect("TokioBackgroundExecutor has crashed") - } - - fn spawn(&self, task: F) - where - F: Future + Send + 'static, - { - self.send_future(Box::pin(task)); - } - - fn spawn_blocking(&self, task: T) -> BoxFuture<'_, DeltaResult> - where - T: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - Box::pin(tokio::task::spawn_blocking(task).map_err(crate::Error::join_failure)) - } - } - - /// A [`TaskExecutor`] that uses the tokio multi-threaded runtime. You can - /// create one based on a handle to an existing runtime, so it can share - /// the runtime with other parts of your application. - #[derive(Debug)] - pub struct TokioMultiThreadExecutor { - handle: tokio::runtime::Handle, - } - - impl TokioMultiThreadExecutor { - pub fn new(handle: tokio::runtime::Handle) -> Self { - assert_eq!( - handle.runtime_flavor(), - RuntimeFlavor::MultiThread, - "TokioExecutor must be created with a multi-threaded runtime" - ); - Self { handle } - } - } - - impl TaskExecutor for TokioMultiThreadExecutor { - fn block_on(&self, task: T) -> T::Output - where - T: Future + Send + 'static, - T::Output: Send + 'static, - { - // We cannot call `tokio::runtime::Runtime::block_on` here because - // it panics if called within an async context. So instead we spawn - // the future on the runtime and send the result back using a channel. - let (sender, receiver) = channel::(); - - let fut = Box::pin(async move { - let task_output = task.await; - tokio::task::spawn_blocking(move || { - sender.send(task_output).ok(); - }) - .await - .unwrap(); - }); - - // We throw away the handle, but it should continue on. - self.handle.spawn(fut); - - receiver - .recv() - .expect("TokioMultiThreadExecutor has crashed") - } - - fn spawn(&self, task: F) - where - F: Future + Send + 'static, - { - self.handle.spawn(task); - } - - fn spawn_blocking(&self, task: T) -> BoxFuture<'_, DeltaResult> - where - T: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - Box::pin(tokio::task::spawn_blocking(task).map_err(crate::Error::join_failure)) - } - } - - #[cfg(test)] - mod test { - use super::*; - - async fn test_executor(executor: impl TaskExecutor) { - // Can run a task - let task = async { - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - 2 + 2 - }; - let result = executor.block_on(task); - assert_eq!(result, 4); - - // Can spawn a task - let (sender, receiver) = channel::(); - executor.spawn(async move { - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - sender.send(2 + 2).unwrap(); - }); - let result = receiver.recv().unwrap(); - assert_eq!(result, 4); - } - - #[tokio::test] - async fn test_tokio_background_executor() { - let executor = TokioBackgroundExecutor::new(); - test_executor(executor).await; - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn test_tokio_multi_thread_executor() { - let executor = TokioMultiThreadExecutor::new(tokio::runtime::Handle::current()); - test_executor(executor).await; - } - } -} diff --git a/kernel/src/engine/default/file_stream.rs b/kernel/src/engine/default/file_stream.rs index 23b151062..fcf18b389 100644 --- a/kernel/src/engine/default/file_stream.rs +++ b/kernel/src/engine/default/file_stream.rs @@ -2,7 +2,6 @@ use std::collections::VecDeque; use std::mem; use std::ops::Range; use std::pin::Pin; -use std::sync::Arc; use std::task::{ready, Context, Poll}; use crate::arrow::array::RecordBatch; @@ -10,9 +9,9 @@ use crate::arrow::datatypes::SchemaRef as ArrowSchemaRef; use futures::future::BoxFuture; use futures::stream::{BoxStream, Stream, StreamExt}; use futures::FutureExt; +use tokio::runtime::Handle; use tracing::error; -use super::executor::TaskExecutor; use crate::engine::arrow_data::ArrowEngineData; use crate::{DeltaResult, FileDataReadResultIterator, FileMeta}; @@ -102,8 +101,8 @@ impl FileStream { /// Creates a new `FileStream` from a given schema, `FileOpener`, and files list; the files are /// processed asynchronously by the provided `TaskExecutor`. Returns an `Iterator` that consumes /// the results. - pub fn new_async_read_iterator( - task_executor: Arc, + pub fn new_async_read_iterator( + task_executor: Handle, schema: ArrowSchemaRef, file_opener: Box, files: &[FileMeta], diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 08d03107c..fac7eaaff 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -6,25 +6,25 @@ use futures::stream::StreamExt; use itertools::Itertools; use object_store::path::Path; use object_store::{DynObjectStore, ObjectStore}; +use tokio::runtime::Handle; use url::Url; use super::UrlExt; -use crate::engine::default::executor::TaskExecutor; use crate::{DeltaResult, Error, FileMeta, FileSlice, StorageHandler}; #[derive(Debug)] -pub struct ObjectStoreStorageHandler { +pub struct ObjectStoreStorageHandler { inner: Arc, - task_executor: Arc, + runtime: Handle, readahead: usize, } -impl ObjectStoreStorageHandler { +impl ObjectStoreStorageHandler { #[internal_api] - pub(crate) fn new(store: Arc, task_executor: Arc) -> Self { + pub(crate) fn new(store: Arc, runtime: Handle) -> Self { Self { inner: store, - task_executor, + runtime, readahead: 10, } } @@ -36,7 +36,7 @@ impl ObjectStoreStorageHandler { } } -impl StorageHandler for ObjectStoreStorageHandler { +impl StorageHandler for ObjectStoreStorageHandler { fn list_from( &self, path: &Url, @@ -83,7 +83,7 @@ impl StorageHandler for ObjectStoreStorageHandler { // This channel will become the iterator let (sender, receiver) = std::sync::mpsc::sync_channel(4_000); let url = path.clone(); - self.task_executor.spawn(async move { + self.runtime.spawn(async move { let mut stream = store.list_with_offset(Some(&prefix), &offset); while let Some(meta) = stream.next().await { @@ -133,7 +133,7 @@ impl StorageHandler for ObjectStoreStorageHandler { // buffer size to 0. let (sender, receiver) = std::sync::mpsc::sync_channel(0); - self.task_executor.spawn( + self.runtime.spawn( futures::stream::iter(files) .map(move |(url, range)| { let store = store.clone(); diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index a86727184..c8c358e23 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -13,10 +13,10 @@ use futures::stream::{self, BoxStream}; use futures::{ready, StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::{self, DynObjectStore, GetResultPayload, PutMode}; +use tokio::runtime::Handle; use tracing::warn; use url::Url; -use super::executor::TaskExecutor; use crate::engine::arrow_conversion::TryFromKernel as _; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::parse_json as arrow_parse_json; @@ -30,11 +30,11 @@ const DEFAULT_BUFFER_SIZE: usize = 1000; const DEFAULT_BATCH_SIZE: usize = 1000; #[derive(Debug)] -pub struct DefaultJsonHandler { +pub struct DefaultJsonHandler { /// The object store to read files from store: Arc, /// The executor to run async tasks on - task_executor: Arc, + runtime: Handle, /// The maximum number of read requests to buffer in memory at once. Note that this actually /// controls two things: the number of concurrent requests (done by `buffered`) and the size of /// the buffer (via our `sync_channel`). @@ -44,11 +44,11 @@ pub struct DefaultJsonHandler { batch_size: usize, } -impl DefaultJsonHandler { - pub fn new(store: Arc, task_executor: Arc) -> Self { +impl DefaultJsonHandler { + pub fn new(store: Arc, runtime: Handle) -> Self { Self { store, - task_executor, + runtime, buffer_size: DEFAULT_BUFFER_SIZE, batch_size: DEFAULT_BATCH_SIZE, } @@ -83,7 +83,7 @@ impl DefaultJsonHandler { } } -impl JsonHandler for DefaultJsonHandler { +impl JsonHandler for DefaultJsonHandler { fn parse_json( &self, json_strings: Box, @@ -109,7 +109,7 @@ impl JsonHandler for DefaultJsonHandler { let files = files.to_vec(); let buffer_size = self.buffer_size; - self.task_executor.spawn(async move { + self.runtime.spawn(async move { // an iterator of futures that open each file let file_futures = files.into_iter().map(|file| file_opener.open(file, None)); @@ -149,7 +149,7 @@ impl JsonHandler for DefaultJsonHandler { let store = self.store.clone(); // cheap Arc let path = Path::from_url_path(path.path())?; let path_str = path.to_string(); - self.task_executor + self.runtime .block_on(async move { store.put_opts(&path, buffer.into(), put_mode.into()).await }) .map_err(|e| match e { object_store::Error::AlreadyExists { .. } => Error::FileAlreadyExists(path_str), diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 051a6a32c..3d35f2901 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -11,9 +11,9 @@ use std::sync::Arc; use self::storage::parse_url_opts; use object_store::DynObjectStore; +use tokio::runtime::Handle; use url::Url; -use self::executor::TaskExecutor; use self::filesystem::ObjectStoreStorageHandler; use self::json::DefaultJsonHandler; use self::parquet::DefaultParquetHandler; @@ -26,7 +26,6 @@ use crate::{ DeltaResult, Engine, EngineData, EvaluationHandler, JsonHandler, ParquetHandler, StorageHandler, }; -pub mod executor; pub mod file_stream; pub mod filesystem; pub mod json; @@ -34,15 +33,16 @@ pub mod parquet; pub mod storage; #[derive(Debug)] -pub struct DefaultEngine { +pub struct DefaultEngine { + runtime: Handle, object_store: Arc, - storage: Arc>, - json: Arc>, - parquet: Arc>, + storage: Arc, + json: Arc, + parquet: Arc, evaluation: Arc, } -impl DefaultEngine { +impl DefaultEngine { /// Create a new [`DefaultEngine`] instance /// /// # Parameters @@ -53,7 +53,7 @@ impl DefaultEngine { pub fn try_new( table_root: &Url, options: impl IntoIterator, - task_executor: Arc, + runtime: Handle, ) -> DeltaResult where K: AsRef, @@ -61,7 +61,7 @@ impl DefaultEngine { { // table root is the path of the table in the ObjectStore let (object_store, _table_root) = parse_url_opts(table_root, options)?; - Ok(Self::new(Arc::new(object_store), task_executor)) + Ok(Self::new(Arc::new(object_store), runtime)) } /// Create a new [`DefaultEngine`] instance @@ -69,21 +69,19 @@ impl DefaultEngine { /// # Parameters /// /// - `object_store`: The object store to use. - /// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor]. - pub fn new(object_store: Arc, task_executor: Arc) -> Self { + /// - `runtime`: tokio handle to use for async tasks. + pub fn new(object_store: Arc, runtime: Handle) -> Self { Self { + runtime: runtime.clone(), storage: Arc::new(ObjectStoreStorageHandler::new( object_store.clone(), - task_executor.clone(), + runtime.clone(), )), json: Arc::new(DefaultJsonHandler::new( object_store.clone(), - task_executor.clone(), - )), - parquet: Arc::new(DefaultParquetHandler::new( - object_store.clone(), - task_executor, + runtime.clone(), )), + parquet: Arc::new(DefaultParquetHandler::new(object_store.clone(), runtime)), object_store, evaluation: Arc::new(ArrowEvaluationHandler {}), } @@ -120,7 +118,7 @@ impl DefaultEngine { } } -impl Engine for DefaultEngine { +impl Engine for DefaultEngine { fn evaluation_handler(&self) -> Arc { self.evaluation.clone() } diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 7609510e0..110c61937 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -15,6 +15,7 @@ use crate::parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatc use futures::StreamExt; use object_store::path::Path; use object_store::DynObjectStore; +use tokio::runtime::Handle; use uuid::Uuid; use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; @@ -25,7 +26,6 @@ use crate::engine::arrow_utils::{ fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes, RowIndexBuilder, }; -use crate::engine::default::executor::TaskExecutor; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; use crate::schema::SchemaRef; use crate::transaction::add_files_schema; @@ -35,9 +35,9 @@ use crate::{ }; #[derive(Debug)] -pub struct DefaultParquetHandler { +pub struct DefaultParquetHandler { store: Arc, - task_executor: Arc, + runtime: Handle, readahead: usize, } @@ -122,11 +122,11 @@ impl DataFileMetadata { } } -impl DefaultParquetHandler { - pub fn new(store: Arc, task_executor: Arc) -> Self { +impl DefaultParquetHandler { + pub fn new(store: Arc, runtime: Handle) -> Self { Self { store, - task_executor, + runtime, readahead: 10, } } @@ -205,7 +205,7 @@ impl DefaultParquetHandler { } } -impl ParquetHandler for DefaultParquetHandler { +impl ParquetHandler for DefaultParquetHandler { fn read_parquet_files( &self, files: &[FileMeta], @@ -239,7 +239,7 @@ impl ParquetHandler for DefaultParquetHandler { )) }; FileStream::new_async_read_iterator( - self.task_executor.clone(), + self.runtime.clone(), Arc::new(physical_schema.as_ref().try_into_arrow()?), file_opener, files, From c23fa43f7055490d13b403b6a13f9cfb5514c2ce Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 24 Sep 2025 16:53:11 -0700 Subject: [PATCH 2/2] vibe: finish it --- acceptance/tests/dat_reader.rs | 3 +- ffi/Cargo.toml | 4 +-- ffi/src/domain_metadata.rs | 11 +++++-- ffi/src/engine_funcs.rs | 5 ++- ffi/src/lib.rs | 41 ++++++++++++++++++------- ffi/src/transaction/mod.rs | 7 +++++ kernel/benches/metadata_bench.rs | 12 +++++--- kernel/examples/common/Cargo.toml | 1 + kernel/examples/common/src/lib.rs | 12 ++------ kernel/examples/write-table/src/main.rs | 8 ++--- kernel/src/checkpoint/tests.rs | 14 ++++----- kernel/src/engine/default/filesystem.rs | 7 ++--- kernel/src/engine/default/json.rs | 35 +++++++++------------ kernel/src/engine/default/mod.rs | 7 ++--- kernel/src/engine/default/parquet.rs | 7 ++--- kernel/src/listed_log_files.rs | 3 +- kernel/src/log_segment/tests.rs | 19 ++++++------ kernel/src/snapshot.rs | 20 +++++------- kernel/src/snapshot/builder.rs | 10 ++---- kernel/tests/golden_tables.rs | 18 +++-------- kernel/tests/hdfs.rs | 4 +-- kernel/tests/log_tail.rs | 9 ++---- kernel/tests/read.rs | 19 ++++++------ kernel/tests/row_tracking.rs | 9 ++---- kernel/tests/write.rs | 7 ++--- test-utils/Cargo.toml | 1 + test-utils/src/lib.rs | 34 +++++--------------- 27 files changed, 149 insertions(+), 178 deletions(-) diff --git a/acceptance/tests/dat_reader.rs b/acceptance/tests/dat_reader.rs index 0d8d283e7..552214749 100644 --- a/acceptance/tests/dat_reader.rs +++ b/acceptance/tests/dat_reader.rs @@ -2,7 +2,6 @@ use std::path::Path; use std::sync::Arc; use acceptance::read_dat_case; -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; // TODO(zach): skip iceberg_compat_v1 test until DAT is fixed @@ -31,7 +30,7 @@ fn reader_test(path: &Path) -> datatest_stable::Result<()> { DefaultEngine::try_new( &table_root, std::iter::empty::<(&str, &str)>(), - Arc::new(TokioBackgroundExecutor::new()), + tokio::runtime::Handle::current(), ) .unwrap(), ); diff --git a/ffi/Cargo.toml b/ffi/Cargo.toml index 438bb0e80..d27eddb49 100644 --- a/ffi/Cargo.toml +++ b/ffi/Cargo.toml @@ -18,6 +18,7 @@ release = false crate-type = ["lib", "cdylib", "staticlib"] [dependencies] +tokio = { version = "1.47", features = ["rt-multi-thread"], optional = true } tracing = "0.1" tracing-core = { version = "0.1", optional = true } tracing-subscriber = { version = "0.3", optional = true, features = [ "json" ] } @@ -36,7 +37,6 @@ rand = "0.9.2" serde = "1.0.219" serde_json = "1.0.142" test_utils = { path = "../test-utils" } -tokio = { version = "1.47" } trybuild = "1.0" tempfile = "3.20.0" itertools = "0.14.0" @@ -50,7 +50,7 @@ default-engine-rustls = ["delta_kernel/default-engine-rustls", "default-engine-b # This is an 'internal' feature flag which has all the shared bits from default-engine-native-tls and # default-engine-rustls. There is a check in kernel/lib.rs to ensure you have enabled one of # default-engine-native-tls or default-engine-rustls, so default-engine-base will not work by itself -default-engine-base = ["delta_kernel/default-engine-base", "delta_kernel/arrow"] +default-engine-base = ["delta_kernel/default-engine-base", "delta_kernel/arrow", "tokio"] tracing = [ "tracing-core", "tracing-subscriber" ] internal-api = [] diff --git a/ffi/src/domain_metadata.rs b/ffi/src/domain_metadata.rs index 7f51dd607..13ad9cb41 100644 --- a/ffi/src/domain_metadata.rs +++ b/ffi/src/domain_metadata.rs @@ -45,20 +45,27 @@ mod tests { allocate_err, allocate_str, assert_extern_result_error_with_message, ok_or_panic, recover_string, }; + #[cfg(feature = "default-engine-base")] use crate::{engine_to_handle, free_engine, free_snapshot, kernel_string_slice, snapshot}; - use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; + #[cfg(feature = "default-engine-base")] use delta_kernel::engine::default::DefaultEngine; + #[cfg(feature = "default-engine-base")] use delta_kernel::DeltaResult; + #[cfg(feature = "default-engine-base")] use object_store::memory::InMemory; + #[cfg(feature = "default-engine-base")] use serde_json::json; + #[cfg(feature = "default-engine-base")] use std::sync::Arc; + #[cfg(feature = "default-engine-base")] use test_utils::add_commit; + #[cfg(feature = "default-engine-base")] #[tokio::test] async fn test_domain_metadata() -> DeltaResult<()> { let storage = Arc::new(InMemory::new()); - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone(), ::tokio::runtime::Handle::current()); let engine = engine_to_handle(Arc::new(engine), allocate_err); let path = "memory:///"; diff --git a/ffi/src/engine_funcs.rs b/ffi/src/engine_funcs.rs index 5eb4dacb3..2760291dc 100644 --- a/ffi/src/engine_funcs.rs +++ b/ffi/src/engine_funcs.rs @@ -215,13 +215,16 @@ fn evaluate_expression_impl( #[cfg(test)] mod tests { use super::{free_expression_evaluator, new_expression_evaluator}; - use crate::{free_engine, handle::Handle, tests::get_default_engine, SharedSchema}; + #[cfg(feature = "default-engine-base")] + use crate::tests::get_default_engine; + use crate::{free_engine, handle::Handle, SharedSchema}; use delta_kernel::{ schema::{DataType, StructField, StructType}, Expression, }; use std::sync::Arc; + #[cfg(feature = "default-engine-base")] #[test] fn test_new_expression_evaluator() { let engine = get_default_engine("memory:///doesntmatter/foo"); diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index a70b531f1..ecf153bd2 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -2,7 +2,6 @@ //! //! Exposes that an engine needs to call from C/C++ to interface with kernel -#[cfg(feature = "default-engine-base")] use std::collections::HashMap; use std::default::Default; use std::os::raw::{c_char, c_void}; @@ -540,16 +539,28 @@ fn get_default_engine_impl( options: HashMap, allocate_error: AllocateErrorFn, ) -> DeltaResult> { - use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; - let engine = DefaultEngine::::try_new( - &url, - options, - Arc::new(TokioBackgroundExecutor::new()), - ); + // Create a tokio runtime and get its handle + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create tokio runtime"); + let handle = rt.handle().clone(); + let engine = DefaultEngine::try_new(&url, options, handle); Ok(engine_to_handle(Arc::new(engine?), allocate_error)) } +#[cfg(not(feature = "default-engine-base"))] +fn get_default_engine_impl( + _url: Url, + _options: HashMap, + _allocate_error: AllocateErrorFn, +) -> DeltaResult> { + Err(delta_kernel::Error::Generic( + "Default engine not available. Enable 'default-engine-rustls' or 'default-engine-native-tls' feature.".to_string() + )) +} + /// # Safety /// /// Caller is responsible for passing a valid handle. @@ -794,8 +805,11 @@ mod tests { allocate_err, allocate_str, assert_extern_result_error_with_message, ok_or_panic, recover_string, }; - use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}; + #[cfg(feature = "default-engine-base")] + use delta_kernel::engine::default::DefaultEngine; + #[cfg(feature = "default-engine-base")] use object_store::memory::InMemory; + #[cfg(feature = "default-engine-base")] use test_utils::{actions_to_string, actions_to_string_partitioned, add_commit, TestAction}; #[no_mangle] @@ -818,12 +832,14 @@ mod tests { } } + #[cfg(feature = "default-engine-base")] pub(crate) fn get_default_engine(path: &str) -> Handle { let path = kernel_string_slice!(path); let builder = unsafe { ok_or_panic(get_engine_builder(path, allocate_err)) }; unsafe { ok_or_panic(builder_build(builder)) } } + #[cfg(feature = "default-engine-base")] #[test] fn engine_builder() { let engine = get_default_engine("memory:///doesntmatter/foo"); @@ -832,6 +848,7 @@ mod tests { } } + #[cfg(feature = "default-engine-base")] #[tokio::test] async fn test_snapshot() -> Result<(), Box> { let storage = Arc::new(InMemory::new()); @@ -841,7 +858,7 @@ mod tests { actions_to_string(vec![TestAction::Metadata]), ) .await?; - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone(), ::tokio::runtime::Handle::current()); let engine = engine_to_handle(Arc::new(engine), allocate_err); let path = "memory:///"; @@ -878,6 +895,7 @@ mod tests { Ok(()) } + #[cfg(feature = "default-engine-base")] #[tokio::test] async fn test_snapshot_partition_cols() -> Result<(), Box> { let storage = Arc::new(InMemory::new()); @@ -887,7 +905,7 @@ mod tests { actions_to_string_partitioned(vec![TestAction::Metadata]), ) .await?; - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone(), ::tokio::runtime::Handle::current()); let engine = engine_to_handle(Arc::new(engine), allocate_err); let path = "memory:///"; @@ -914,6 +932,7 @@ mod tests { Ok(()) } + #[cfg(feature = "default-engine-base")] #[tokio::test] async fn allocate_null_err_okay() -> Result<(), Box> { let storage = Arc::new(InMemory::new()); @@ -923,7 +942,7 @@ mod tests { actions_to_string(vec![TestAction::Metadata]), ) .await?; - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone(), ::tokio::runtime::Handle::current()); let engine = engine_to_handle(Arc::new(engine), allocate_null_err); let path = "memory:///"; diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 8bb3681b0..7cac03838 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -139,10 +139,13 @@ mod tests { use delta_kernel::parquet::file::properties::WriterProperties; use delta_kernel::transaction::add_files_schema; + #[cfg(feature = "default-engine-base")] use delta_kernel_ffi::engine_data::get_engine_data; + #[cfg(feature = "default-engine-base")] use delta_kernel_ffi::engine_data::ArrowFFIData; use delta_kernel_ffi::ffi_test_utils::{allocate_str, ok_or_panic, recover_string}; + #[cfg(feature = "default-engine-base")] use delta_kernel_ffi::tests::get_default_engine; use crate::{free_engine, free_schema, kernel_string_slice}; @@ -170,6 +173,7 @@ mod tests { .expect("txnId should be present in commitInfo"); } + #[cfg(feature = "default-engine-base")] fn create_arrow_ffi_from_json( schema: ArrowSchema, json_string: &str, @@ -188,6 +192,7 @@ mod tests { }) } + #[cfg(feature = "default-engine-base")] fn create_file_metadata( path: &str, num_rows: i64, @@ -206,6 +211,7 @@ mod tests { create_arrow_ffi_from_json(schema, file_metadata.as_str()) } + #[cfg(feature = "default-engine-base")] fn write_parquet_file( delta_path: &str, file_path: &str, @@ -226,6 +232,7 @@ mod tests { create_file_metadata(file_path, res.num_rows) } + #[cfg(feature = "default-engine-base")] #[tokio::test] #[cfg_attr(miri, ignore)] // FIXME: re-enable miri (can't call foreign function `linkat` on OS `linux`) async fn test_basic_append() -> Result<(), Box> { diff --git a/kernel/benches/metadata_bench.rs b/kernel/benches/metadata_bench.rs index 86168baf1..6e7898eb9 100644 --- a/kernel/benches/metadata_bench.rs +++ b/kernel/benches/metadata_bench.rs @@ -20,7 +20,6 @@ use std::collections::HashMap; use std::sync::Arc; -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::snapshot::Snapshot; use delta_kernel::try_parse_uri; @@ -34,16 +33,19 @@ use url::Url; // force scan metadata bench to use smaller sample size so test runs faster (100 -> 20) const SCAN_METADATA_BENCH_SAMPLE_SIZE: usize = 20; -fn setup() -> (TempDir, Url, Arc>) { +fn setup() -> (TempDir, Url, Arc) { // note this table _only_ has a _delta_log, no data files (can only do metadata reads) let table = "300k-add-files-100-col-partitioned"; let tempdir = load_test_data("./tests/data", table).unwrap(); let table_path = tempdir.path().join(table); let url = try_parse_uri(table_path.to_str().unwrap()).expect("Failed to parse table path"); // TODO: use multi-threaded executor - let executor = Arc::new(TokioBackgroundExecutor::new()); - let engine = DefaultEngine::try_new(&url, HashMap::::new(), executor) - .expect("Failed to create engine"); + let engine = DefaultEngine::try_new( + &url, + HashMap::::new(), + tokio::runtime::Handle::current(), + ) + .expect("Failed to create engine"); (tempdir, url, Arc::new(engine)) } diff --git a/kernel/examples/common/Cargo.toml b/kernel/examples/common/Cargo.toml index 5c8f75f94..843aa237c 100644 --- a/kernel/examples/common/Cargo.toml +++ b/kernel/examples/common/Cargo.toml @@ -17,4 +17,5 @@ delta_kernel = { path = "../../../kernel", features = [ "default-engine-rustls", "internal-api", ] } +tokio = { version = "1.47", features = ["rt-multi-thread"] } url = "2" diff --git a/kernel/examples/common/src/lib.rs b/kernel/examples/common/src/lib.rs index ab1a2f03a..cf344b604 100644 --- a/kernel/examples/common/src/lib.rs +++ b/kernel/examples/common/src/lib.rs @@ -4,10 +4,7 @@ use std::{collections::HashMap, sync::Arc}; use clap::Args; use delta_kernel::{ - arrow::array::RecordBatch, - engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}, - scan::Scan, - schema::Schema, + arrow::array::RecordBatch, engine::default::DefaultEngine, scan::Scan, schema::Schema, DeltaResult, SnapshotRef, }; @@ -45,10 +42,7 @@ pub struct ScanArgs { } /// Get an engine configured to read table at `url` and `LocationArgs` -pub fn get_engine( - url: &Url, - args: &LocationArgs, -) -> DeltaResult> { +pub fn get_engine(url: &Url, args: &LocationArgs) -> DeltaResult { let mut options = if let Some(ref region) = args.region { HashMap::from([("region", region.clone())]) } else { @@ -57,7 +51,7 @@ pub fn get_engine( if args.public { options.insert("skip_signature", "true".to_string()); } - DefaultEngine::try_new(url, options, Arc::new(TokioBackgroundExecutor::new())) + DefaultEngine::try_new(url, options, tokio::runtime::Handle::current()) } /// Construct a scan at the latest snapshot. This is over the specified table and using the passed diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index 138724ff4..ed965fa99 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -15,7 +15,6 @@ use uuid::Uuid; use delta_kernel::arrow::array::TimestampMicrosecondArray; use delta_kernel::engine::arrow_conversion::TryIntoArrow; use delta_kernel::engine::arrow_data::ArrowEngineData; -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType}; use delta_kernel::transaction::CommitResult; @@ -75,7 +74,7 @@ async fn try_main() -> DeltaResult<()> { let engine = DefaultEngine::try_new( &url, HashMap::::new(), - Arc::new(TokioBackgroundExecutor::new()), + tokio::runtime::Handle::current(), )?; // Create or get the table @@ -290,10 +289,7 @@ fn create_sample_data(schema: &SchemaRef, num_rows: usize) -> DeltaResult, -) -> DeltaResult<()> { +async fn read_and_display_data(table_url: &Url, engine: DefaultEngine) -> DeltaResult<()> { let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let scan = snapshot.scan_builder().build()?; diff --git a/kernel/src/checkpoint/tests.rs b/kernel/src/checkpoint/tests.rs index bcc4e96c0..0179eca51 100644 --- a/kernel/src/checkpoint/tests.rs +++ b/kernel/src/checkpoint/tests.rs @@ -8,7 +8,7 @@ use crate::arrow::array::{ArrayRef, StructArray}; use crate::arrow::datatypes::{DataType, Schema}; use crate::checkpoint::create_last_checkpoint_data; use crate::engine::arrow_data::ArrowEngineData; -use crate::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}; +use crate::engine::default::DefaultEngine; use crate::utils::test_utils::Action; use crate::{DeltaResult, FileMeta, Snapshot}; @@ -58,7 +58,7 @@ fn test_deleted_file_retention_timestamp() -> DeltaResult<()> { #[test] fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); // 1st commit (version 0) - metadata and protocol actions // Protocol action does not include the v2Checkpoint reader/writer feature. @@ -116,7 +116,7 @@ fn test_create_last_checkpoint_data() -> DeltaResult<()> { let add_actions_counter = 75; let size_in_bytes: i64 = 1024 * 1024; // 1MB let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); // Create last checkpoint metadata let last_checkpoint_batch = create_last_checkpoint_data( @@ -271,7 +271,7 @@ fn read_last_checkpoint_file(store: &Arc) -> DeltaResult { #[test] fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); // 1st commit: adds `fake_path_1` write_commit_to_store(&store, vec![create_add_action("fake_path_1")], 0)?; @@ -341,7 +341,7 @@ fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { #[test] fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); // 1st commit (version 0) - metadata and protocol actions // Protocol action does not include the v2Checkpoint reader/writer feature. @@ -403,7 +403,7 @@ fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { #[test] fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); // 1st commit (version 0) - metadata and protocol actions write_commit_to_store( @@ -445,7 +445,7 @@ fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaR #[test] fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); // 1st commit: adds `fake_path_2` & removes `fake_path_1` write_commit_to_store( diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index fac7eaaff..e2cf9ae3e 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -188,7 +188,6 @@ mod tests { use test_utils::delta_path_for_version; - use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::DefaultEngine; use crate::utils::current_time_duration; use crate::Engine as _; @@ -217,7 +216,7 @@ mod tests { let mut url = Url::from_directory_path(tmp.path()).unwrap(); let store = Arc::new(LocalFileSystem::new()); - let executor = Arc::new(TokioBackgroundExecutor::new()); + let executor = tokio::runtime::Handle::current(); let storage = ObjectStoreStorageHandler::new(store, executor); let mut slices: Vec = Vec::new(); @@ -249,7 +248,7 @@ mod tests { store.put(&name, data.clone().into()).await.unwrap(); let table_root = Url::parse("memory:///").expect("valid url"); - let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store, tokio::runtime::Handle::current()); let files: Vec<_> = engine .storage_handler() .list_from(&table_root.join("_delta_log").unwrap().join("0").unwrap()) @@ -279,7 +278,7 @@ mod tests { let url = Url::from_directory_path(tmp.path()).unwrap(); let store = Arc::new(LocalFileSystem::new()); - let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store, tokio::runtime::Handle::current()); let files = engine .storage_handler() .list_from(&url.join("_delta_log").unwrap().join("0").unwrap()) diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index c8c358e23..9024dbaf5 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -149,12 +149,16 @@ impl JsonHandler for DefaultJsonHandler { let store = self.store.clone(); // cheap Arc let path = Path::from_url_path(path.path())?; let path_str = path.to_string(); - self.runtime - .block_on(async move { store.put_opts(&path, buffer.into(), put_mode.into()).await }) - .map_err(|e| match e { - object_store::Error::AlreadyExists { .. } => Error::FileAlreadyExists(path_str), - e => e.into(), - })?; + + // Use futures::executor::block_on for synchronous execution + // This works both inside and outside of a tokio runtime context + futures::executor::block_on(async move { + store.put_opts(&path, buffer.into(), put_mode.into()).await + }) + .map_err(|e| match e { + object_store::Error::AlreadyExists { .. } => Error::FileAlreadyExists(path_str), + e => e.into(), + })?; Ok(()) } } @@ -255,9 +259,6 @@ mod tests { use crate::arrow::array::{AsArray, Int32Array, RecordBatch, StringArray}; use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use crate::engine::arrow_data::ArrowEngineData; - use crate::engine::default::executor::tokio::{ - TokioBackgroundExecutor, TokioMultiThreadExecutor, - }; use crate::schema::{DataType as DeltaDataType, Schema, StructField}; use crate::utils::test_utils::string_array_to_engine_data; use futures::future; @@ -479,7 +480,7 @@ mod tests { #[test] fn test_parse_json() { let store = Arc::new(LocalFileSystem::new()); - let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + let handler = DefaultJsonHandler::new(store, tokio::runtime::Handle::current()); let json_strings = StringArray::from(vec![ r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, @@ -498,7 +499,7 @@ mod tests { #[test] fn test_parse_json_drop_field() { let store = Arc::new(LocalFileSystem::new()); - let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + let handler = DefaultJsonHandler::new(store, tokio::runtime::Handle::current()); let json_strings = StringArray::from(vec![ r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2, "maxRowId": 3}}}"#, ]); @@ -539,7 +540,7 @@ mod tests { size: meta.size, }]; - let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + let handler = DefaultJsonHandler::new(store, tokio::runtime::Handle::current()); let data: Vec = handler .read_json_files(files, get_log_schema().clone(), None) .unwrap() @@ -694,12 +695,7 @@ mod tests { let files = future::join_all(ordered_file_meta).await; // fire off the read_json_files call (for all the files in order) - let handler = DefaultJsonHandler::new( - store, - Arc::new(TokioMultiThreadExecutor::new( - tokio::runtime::Handle::current(), - )), - ); + let handler = DefaultJsonHandler::new(store, tokio::runtime::Handle::current()); let handler = handler.with_buffer_size(*buffer_size); let physical_schema = Arc::new(Schema::new_unchecked(vec![StructField::nullable( "val", @@ -767,8 +763,7 @@ mod tests { async fn do_test_write_json_file(overwrite: bool) -> DeltaResult<()> { let store = Arc::new(InMemory::new()); - let executor = Arc::new(TokioBackgroundExecutor::new()); - let handler = DefaultJsonHandler::new(store.clone(), executor); + let handler = DefaultJsonHandler::new(store.clone(), tokio::runtime::Handle::current()); let path = Url::parse("memory:///test/data/00000000000000000001.json")?; let object_path = Path::from("/test/data/00000000000000000001.json"); diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 3d35f2901..d44e92d59 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -167,17 +167,16 @@ impl UrlExt for Url { #[cfg(test)] mod tests { - use super::executor::tokio::TokioBackgroundExecutor; use super::*; use crate::engine::tests::test_arrow_engine; use object_store::local::LocalFileSystem; - #[test] - fn test_default_engine() { + #[tokio::test] + async fn test_default_engine() { let tmp = tempfile::tempdir().unwrap(); let url = Url::from_directory_path(tmp.path()).unwrap(); let object_store = Arc::new(LocalFileSystem::new()); - let engine = DefaultEngine::new(object_store, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(object_store, tokio::runtime::Handle::current()); test_arrow_engine(&engine, &url); } diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 110c61937..2ca3e7892 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -435,7 +435,6 @@ mod tests { use crate::engine::arrow_conversion::TryIntoKernel as _; use crate::engine::arrow_data::ArrowEngineData; - use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::EngineData; use itertools::Itertools; @@ -479,7 +478,7 @@ mod tests { size: meta.size, }]; - let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + let handler = DefaultParquetHandler::new(store, tokio::runtime::Handle::current()); let data: Vec = handler .read_parquet_files( files, @@ -557,7 +556,7 @@ mod tests { async fn test_write_parquet() { let store = Arc::new(InMemory::new()); let parquet_handler = - DefaultParquetHandler::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + DefaultParquetHandler::new(store.clone(), tokio::runtime::Handle::current()); let data = Box::new(ArrowEngineData::new( RecordBatch::try_from_iter(vec![( @@ -627,7 +626,7 @@ mod tests { async fn test_disallow_non_trailing_slash() { let store = Arc::new(InMemory::new()); let parquet_handler = - DefaultParquetHandler::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + DefaultParquetHandler::new(store.clone(), tokio::runtime::Handle::current()); let data = Box::new(ArrowEngineData::new( RecordBatch::try_from_iter(vec![( diff --git a/kernel/src/listed_log_files.rs b/kernel/src/listed_log_files.rs index 73d82541f..210014d7e 100644 --- a/kernel/src/listed_log_files.rs +++ b/kernel/src/listed_log_files.rs @@ -359,7 +359,6 @@ mod list_log_files_with_log_tail_tests { use object_store::{memory::InMemory, path::Path as ObjectPath, ObjectStore}; use url::Url; - use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreStorageHandler; use crate::FileMeta; @@ -416,7 +415,7 @@ mod list_log_files_with_log_tail_tests { } }); - let executor = Arc::new(TokioBackgroundExecutor::new()); + let executor = tokio::runtime::Handle::current(); let storage = Box::new(ObjectStoreStorageHandler::new(store, executor)); (storage, log_root) } diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 692cf563a..177d9089e 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -13,7 +13,6 @@ use crate::actions::{ SIDECAR_NAME, }; use crate::engine::arrow_data::ArrowEngineData; -use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreStorageHandler; use crate::engine::default::DefaultEngine; use crate::engine::sync::SyncEngine; @@ -114,7 +113,7 @@ fn build_log_with_paths_and_checkpoint( } }); - let storage = ObjectStoreStorageHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + let storage = ObjectStoreStorageHandler::new(store, tokio::runtime::Handle::current()); let table_root = Url::parse("memory:///").expect("valid url"); let log_root = table_root.join("_delta_log/").unwrap(); @@ -959,7 +958,7 @@ fn test_checkpoint_batch_with_no_sidecars_returns_none() -> DeltaResult<()> { #[test] fn test_checkpoint_batch_with_sidecars_returns_sidecar_batches() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); let read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME, SIDECAR_NAME])?; add_sidecar_to_store( @@ -999,7 +998,7 @@ fn test_checkpoint_batch_with_sidecars_returns_sidecar_batches() -> DeltaResult< #[test] fn test_checkpoint_batch_with_sidecar_files_that_do_not_exist() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); let checkpoint_batch = sidecar_batch_with_given_paths( vec!["sidecarfile1.parquet", "sidecarfile2.parquet"], @@ -1026,7 +1025,7 @@ fn test_checkpoint_batch_with_sidecar_files_that_do_not_exist() -> DeltaResult<( #[test] fn test_reading_sidecar_files_with_predicate() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); let read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME, SIDECAR_NAME])?; let checkpoint_batch = @@ -1124,7 +1123,7 @@ fn test_create_checkpoint_stream_errors_when_schema_has_add_but_no_sidecar_actio fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_no_file_actions( ) -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); add_checkpoint_to_store( &store, // Create a checkpoint batch with sidecar actions to verify that the sidecar actions are not read. @@ -1170,7 +1169,7 @@ fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_ fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_is_multi_part( ) -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); // Multi-part checkpoints should never contain sidecar actions. // This test intentionally includes batches with sidecar actions in multi-part checkpoints @@ -1237,7 +1236,7 @@ fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_is_mul fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); add_checkpoint_to_store( &store, @@ -1279,7 +1278,7 @@ fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars #[test] fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidecars() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); let filename = "00000000000000000010.checkpoint.80a083e8-7026-4e79-81be-64bd76c43a11.json"; @@ -1336,7 +1335,7 @@ fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidecars() fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar_batches( ) -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); add_checkpoint_to_store( &store, diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 5a7d4b013..710536cc2 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -402,7 +402,6 @@ mod tests { use crate::parquet::arrow::ArrowWriter; use crate::engine::arrow_data::ArrowEngineData; - use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreStorageHandler; use crate::engine::default::DefaultEngine; use crate::engine::sync::SyncEngine; @@ -510,7 +509,7 @@ mod tests { // in each test we will modify versions 1 and 2 to test different scenarios fn test_new_from(store: Arc) -> DeltaResult<()> { let url = Url::parse("memory:///")?; - let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store, tokio::runtime::Handle::current()); let base_snapshot = Snapshot::builder_for(url.clone()) .at_version(0) .build(&engine)?; @@ -560,10 +559,7 @@ mod tests { // 3. new version > existing version // a. no new log segment let url = Url::parse("memory:///")?; - let engine = DefaultEngine::new( - Arc::new(store.fork()), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(Arc::new(store.fork()), tokio::runtime::Handle::current()); let base_snapshot = Snapshot::builder_for(url.clone()) .at_version(0) .build(&engine)?; @@ -636,7 +632,7 @@ mod tests { // new commits AND request version > end of log let url = Url::parse("memory:///")?; - let engine = DefaultEngine::new(store_3c_i, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store_3c_i, tokio::runtime::Handle::current()); let base_snapshot = Snapshot::builder_for(url.clone()) .at_version(0) .build(&engine)?; @@ -680,7 +676,7 @@ mod tests { async fn test_snapshot_new_from_crc() -> Result<(), Box> { let store = Arc::new(InMemory::new()); let url = Url::parse("memory:///")?; - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); let protocol = |reader_version, writer_version| { json!({ "protocol": { @@ -810,7 +806,7 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let store = Arc::new(LocalFileSystem::new()); - let executor = Arc::new(TokioBackgroundExecutor::new()); + let executor = tokio::runtime::Handle::current(); let storage = ObjectStoreStorageHandler::new(store, executor); let cp = LastCheckpointHint::try_read(&storage, &url).unwrap(); assert!(cp.is_none()); @@ -838,7 +834,7 @@ mod tests { .expect("put _last_checkpoint"); }); - let executor = Arc::new(TokioBackgroundExecutor::new()); + let executor = tokio::runtime::Handle::current(); let storage = ObjectStoreStorageHandler::new(store, executor); let url = Url::parse("memory:///invalid/").expect("valid url"); let invalid = LastCheckpointHint::try_read(&storage, &url).expect("read last checkpoint"); @@ -869,7 +865,7 @@ mod tests { .expect("put _last_checkpoint"); }); - let executor = Arc::new(TokioBackgroundExecutor::new()); + let executor = tokio::runtime::Handle::current(); let storage = ObjectStoreStorageHandler::new(store, executor); let url = Url::parse("memory:///valid/").expect("valid url"); let valid = LastCheckpointHint::try_read(&storage, &url).expect("read last checkpoint"); @@ -924,7 +920,7 @@ mod tests { async fn test_domain_metadata() -> DeltaResult<()> { let url = Url::parse("memory:///")?; let store = Arc::new(InMemory::new()); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone(), tokio::runtime::Handle::current()); // commit0 // - domain1: not removed diff --git a/kernel/src/snapshot/builder.rs b/kernel/src/snapshot/builder.rs index 15cf59c62..796388318 100644 --- a/kernel/src/snapshot/builder.rs +++ b/kernel/src/snapshot/builder.rs @@ -105,7 +105,7 @@ impl SnapshotBuilder { mod tests { use std::sync::Arc; - use crate::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}; + use crate::engine::default::DefaultEngine; use itertools::Itertools; use object_store::memory::InMemory; @@ -114,16 +114,12 @@ mod tests { use super::*; - fn setup_test() -> ( - Arc>, - Arc, - Url, - ) { + fn setup_test() -> (Arc, Arc, Url) { let table_root = Url::parse("memory:///test_table").unwrap(); let store = Arc::new(InMemory::new()); let engine = Arc::new(DefaultEngine::new( store.clone(), - Arc::new(TokioBackgroundExecutor::new()), + tokio::runtime::Handle::current(), )); (engine, store, table_root) } diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index 758902e7f..87c108050 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -16,7 +16,6 @@ use delta_kernel::parquet::arrow::async_reader::{ }; use delta_kernel::engine::arrow_conversion::TryFromKernel as _; -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::{DeltaResult, Snapshot}; @@ -164,7 +163,7 @@ fn assert_eq(actual: &StructArray, expected: &StructArray) { // do a full table scan at the latest snapshot of the table and compare with the expected data async fn latest_snapshot_test( - engine: DefaultEngine, + engine: DefaultEngine, url: Url, expected_path: Option, ) -> Result<(), Box> { @@ -199,14 +198,7 @@ async fn latest_snapshot_test( Ok(()) } -fn setup_golden_table( - test_name: &str, -) -> ( - DefaultEngine, - Url, - Option, - tempfile::TempDir, -) { +fn setup_golden_table(test_name: &str) -> (DefaultEngine, Url, Option, tempfile::TempDir) { let test_dir = load_test_data("tests/golden_data", test_name).unwrap(); let test_path = test_dir.path().join(test_name); let table_path = test_path.join("delta"); @@ -215,7 +207,7 @@ fn setup_golden_table( let engine = DefaultEngine::try_new( &url, std::iter::empty::<(&str, &str)>(), - Arc::new(TokioBackgroundExecutor::new()), + tokio::runtime::Handle::current(), ) .unwrap(); let expected_path = test_path.join("expected"); @@ -266,7 +258,7 @@ macro_rules! golden_test { // TODO use in canonicalized paths tests #[allow(dead_code)] async fn canonicalized_paths_test( - engine: DefaultEngine, + engine: DefaultEngine, table_root: Url, _expected: Option, ) -> Result<(), Box> { @@ -280,7 +272,7 @@ async fn canonicalized_paths_test( } async fn checkpoint_test( - engine: DefaultEngine, + engine: DefaultEngine, table_root: Url, _expected: Option, ) -> Result<(), Box> { diff --git a/kernel/tests/hdfs.rs b/kernel/tests/hdfs.rs index da39c9c77..6f6b10934 100644 --- a/kernel/tests/hdfs.rs +++ b/kernel/tests/hdfs.rs @@ -7,7 +7,6 @@ // cargo test --features integration-test --test hdfs #![cfg(all(feature = "integration-test", not(target_os = "windows")))] -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::Snapshot; use hdfs_native::{Client, WriteOptions}; @@ -15,7 +14,6 @@ use hdfs_native_object_store::minidfs::MiniDfs; use std::collections::HashSet; use std::fs; use std::path::Path; -use std::sync::Arc; extern crate walkdir; use walkdir::WalkDir; @@ -70,7 +68,7 @@ async fn read_table_version_hdfs() -> Result<(), Box> { let engine = DefaultEngine::try_new( &url, std::iter::empty::<(&str, &str)>(), - Arc::new(TokioBackgroundExecutor::new()), + tokio::runtime::Handle::current(), )?; let snapshot = Snapshot::builder_for(url).build(&engine)?; diff --git a/kernel/tests/log_tail.rs b/kernel/tests/log_tail.rs index c4f602fd4..6f8007a45 100644 --- a/kernel/tests/log_tail.rs +++ b/kernel/tests/log_tail.rs @@ -4,7 +4,6 @@ use object_store::memory::InMemory; use object_store::path::Path; use url::Url; -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::{FileMeta, LogPath, Snapshot}; @@ -27,16 +26,12 @@ fn create_file_meta(table_root: &Url, commit_path: Path) -> FileMeta { } } -fn setup_test() -> ( - Arc, - Arc>, - Url, -) { +fn setup_test() -> (Arc, Arc, Url) { let storage = Arc::new(InMemory::new()); let table_root = Url::parse("memory:///").unwrap(); let engine = Arc::new(DefaultEngine::new( storage.clone(), - Arc::new(TokioBackgroundExecutor::new()), + tokio::runtime::Handle::current(), )); (storage, engine, table_root) } diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 817c85cbc..da2ec8bd6 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -7,7 +7,6 @@ use delta_kernel::arrow::array::AsArray as _; use delta_kernel::arrow::compute::{concat_batches, filter_record_batch}; use delta_kernel::arrow::datatypes::{Int64Type, Schema as ArrowSchema}; use delta_kernel::engine::arrow_conversion::TryFromKernel as _; -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::expressions::{ column_expr, column_pred, Expression as Expr, ExpressionRef, Predicate as Pred, @@ -63,7 +62,7 @@ async fn single_commit_two_add_files() -> Result<(), Box> let location = Url::parse("memory:///")?; let engine = Arc::new(DefaultEngine::new( storage.clone(), - Arc::new(TokioBackgroundExecutor::new()), + tokio::runtime::Handle::current(), )); let expected_data = vec![batch.clone(), batch]; @@ -116,7 +115,7 @@ async fn two_commits() -> Result<(), Box> { .await?; let location = Url::parse("memory:///").unwrap(); - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone(), tokio::runtime::Handle::current()); let expected_data = vec![batch.clone(), batch]; @@ -169,7 +168,7 @@ async fn remove_action() -> Result<(), Box> { .await?; let location = Url::parse("memory:///").unwrap(); - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone(), tokio::runtime::Handle::current()); let expected_data = vec![batch]; @@ -242,7 +241,7 @@ async fn stats() -> Result<(), Box> { let location = Url::parse("memory:///").unwrap(); let engine = Arc::new(DefaultEngine::new( storage.clone(), - Arc::new(TokioBackgroundExecutor::new()), + tokio::runtime::Handle::current(), )); let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?; @@ -432,7 +431,7 @@ fn read_table_data( let engine = Arc::new(DefaultEngine::try_new( &url, std::iter::empty::<(&str, &str)>(), - Arc::new(TokioBackgroundExecutor::new()), + tokio::runtime::Handle::current(), )?); let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?; @@ -1057,7 +1056,7 @@ async fn predicate_on_non_nullable_partition_column() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box DeltaResult<( - Url, - Arc>, - Arc, -)> { +) -> DeltaResult<(Url, Arc, Arc)> { let tmp_test_dir_url = Url::from_directory_path(tmp_dir.path()) .map_err(|_| Error::generic("Failed to convert directory path to URL"))?; let (store, engine, table_location) = engine_store_setup(table_name, Some(&tmp_test_dir_url)); @@ -53,7 +48,7 @@ async fn create_row_tracking_table( /// Helper function to write data and return the number of records written. async fn write_data_to_table( table_url: &Url, - engine: Arc>, + engine: Arc, data: Vec, ) -> DeltaResult { let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?; diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index bd3da49e7..5e4c53039 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -14,7 +14,6 @@ use delta_kernel::arrow::record_batch::RecordBatch; use delta_kernel::engine::arrow_conversion::{TryFromKernel, TryIntoArrow as _}; use delta_kernel::engine::arrow_data::ArrowEngineData; -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::parquet::DefaultParquetHandler; use delta_kernel::engine::default::DefaultEngine; @@ -140,7 +139,7 @@ async fn get_and_check_all_parquet_sizes(store: Arc, path: &str async fn write_data_and_check_result_and_stats( table_url: Url, schema: SchemaRef, - engine: Arc>, + engine: Arc, expected_since_commit: u64, ) -> Result<(), Box> { let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?; @@ -1007,7 +1006,7 @@ async fn test_append_variant() -> Result<(), Box> { let add_files_metadata = (*engine) .parquet_handler() .as_any() - .downcast_ref::>() + .downcast_ref::() .unwrap() .write_parquet_file( write_context.target_dir(), @@ -1179,7 +1178,7 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box>() + .downcast_ref::() .unwrap() .write_parquet_file( write_context.target_dir(), diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index a515dc721..9c484a281 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -19,6 +19,7 @@ itertools = "0.14.0" serde_json = "1.0.142" tar = "0.4" tempfile = "3" +tokio = { version = "1.47", features = ["rt-multi-thread"] } url = "2.5.4" uuid = { version = "1", features = ["v4"] } zstd = "0.13" diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 4840b90e2..ae8df3fdb 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -10,8 +10,6 @@ use delta_kernel::arrow::compute::filter_record_batch; use delta_kernel::arrow::error::ArrowError; use delta_kernel::arrow::util::pretty::pretty_format_batches; use delta_kernel::engine::arrow_data::ArrowEngineData; -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; -use delta_kernel::engine::default::executor::TaskExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::parquet::arrow::arrow_writer::ArrowWriter; use delta_kernel::parquet::file::properties::WriterProperties; @@ -200,19 +198,15 @@ pub fn into_record_batch(engine_data: Box) -> RecordBatch { /// Note: we implment this extension trait here so that we can import this trait (from test-utils /// crate) and get to use all these test-only helper methods from places where we don't have access pub trait DefaultEngineExtension { - type Executor: TaskExecutor; - - fn new_local() -> Arc>; + fn new_local() -> Arc; } -impl DefaultEngineExtension for DefaultEngine { - type Executor = TokioBackgroundExecutor; - - fn new_local() -> Arc> { +impl DefaultEngineExtension for DefaultEngine { + fn new_local() -> Arc { let object_store = Arc::new(LocalFileSystem::new()); Arc::new(DefaultEngine::new( object_store, - TokioBackgroundExecutor::new().into(), + tokio::runtime::Handle::current(), )) } } @@ -221,11 +215,7 @@ impl DefaultEngineExtension for DefaultEngine { pub fn engine_store_setup( table_name: &str, local_directory: Option<&Url>, -) -> ( - Arc, - DefaultEngine, - Url, -) { +) -> (Arc, DefaultEngine, Url) { let (storage, url): (Arc, Url) = match local_directory { None => ( Arc::new(InMemory::new()), @@ -236,8 +226,7 @@ pub fn engine_store_setup( Url::parse(format!("{dir}{table_name}/").as_str()).expect("valid url"), ), }; - let executor = Arc::new(TokioBackgroundExecutor::new()); - let engine = DefaultEngine::new(Arc::clone(&storage), executor); + let engine = DefaultEngine::new(Arc::clone(&storage), tokio::runtime::Handle::current()); (storage, engine, url) } @@ -333,15 +322,8 @@ pub async fn setup_test_tables( partition_columns: &[&str], local_directory: Option<&Url>, table_base_name: &str, -) -> Result< - Vec<( - Url, - DefaultEngine, - Arc, - &'static str, - )>, - Box, -> { +) -> Result, &'static str)>, Box> +{ let table_name_11 = format!("{table_base_name}_11"); let table_name_37 = format!("{table_base_name}_37"); let (store_11, engine_11, table_location_11) =