diff --git a/acceptance/src/data.rs b/acceptance/src/data.rs index 95e489228..bdd7e9da2 100644 --- a/acceptance/src/data.rs +++ b/acceptance/src/data.rs @@ -115,7 +115,7 @@ pub async fn assert_scan_metadata( test_case: &TestCaseInfo, ) -> TestResult<()> { let table_root = test_case.table_root()?; - let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?; + let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref()).await?; let scan = snapshot.scan_builder().build()?; let mut schema = None; let batches: Vec = scan diff --git a/acceptance/src/meta.rs b/acceptance/src/meta.rs index 93144d76e..196455103 100644 --- a/acceptance/src/meta.rs +++ b/acceptance/src/meta.rs @@ -103,13 +103,13 @@ impl TestCaseInfo { let engine = engine.as_ref(); let (latest, versions) = self.versions().await?; - let snapshot = Snapshot::builder_for(self.table_root()?).build(engine)?; + let snapshot = Snapshot::builder_for(self.table_root()?).build(engine).await?; self.assert_snapshot_meta(&latest, &snapshot)?; for table_version in versions { let snapshot = Snapshot::builder_for(self.table_root()?) .at_version(table_version.version) - .build(engine)?; + .build(engine).await?; self.assert_snapshot_meta(&table_version, &snapshot)?; } diff --git a/ffi/Cargo.toml b/ffi/Cargo.toml index 438bb0e80..19590881d 100644 --- a/ffi/Cargo.toml +++ b/ffi/Cargo.toml @@ -18,6 +18,7 @@ release = false crate-type = ["lib", "cdylib", "staticlib"] [dependencies] +futures = "0.3" tracing = "0.1" tracing-core = { version = "0.1", optional = true } tracing-subscriber = { version = "0.3", optional = true, features = [ "json" ] } diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index a70b531f1..b498af3a3 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -608,7 +608,7 @@ fn snapshot_impl( } else { builder }; - let snapshot = builder.build(extern_engine.engine().as_ref())?; + let snapshot = futures::executor::block_on(builder.build(extern_engine.engine().as_ref()))?; Ok(snapshot.into()) } diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 8bb3681b0..0eae71499 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -37,7 +37,9 @@ fn transaction_impl( url: DeltaResult, extern_engine: &dyn ExternEngine, ) -> DeltaResult> { - let snapshot = Snapshot::builder_for(url?).build(extern_engine.engine().as_ref())?; + let snapshot = futures::executor::block_on( + Snapshot::builder_for(url?).build(extern_engine.engine().as_ref()) + )?; let transaction = snapshot.transaction(); Ok(Box::new(transaction?).into()) } @@ -372,7 +374,7 @@ mod tests { // Confirm that the data matches what we appended let test_batch = ArrowEngineData::from(batch); - test_read(&test_batch, &table_url, unsafe { engine.as_ref().engine() })?; + test_read(&test_batch, &table_url, unsafe { engine.as_ref().engine() }).await?; unsafe { free_schema(write_schema) }; unsafe { free_write_context(write_context) }; diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index eb21b3702..dfb748447 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -42,6 +42,7 @@ pre-release-hook = [ delta_kernel_derive = { path = "../derive-macros", version = "0.16.0" } bytes = "1.10" chrono = "0.4.41" +futures = "0.3" indexmap = "2.10.0" itertools = "0.14" roaring = "0.11.2" @@ -56,7 +57,6 @@ uuid = { version = "1.18.0", features = ["v4", "fast-rng"] } z85 = "3.0.6" # optional deps -futures = { version = "0.3", optional = true } # Used for fetching direct urls (like pre-signed urls) reqwest = { version = "0.12.23", default-features = false, optional = true } # optionally used with default engine (though not required) @@ -116,7 +116,6 @@ catalog-managed = [] default-engine-base = [ "arrow-conversion", "arrow-expression", - "futures", "need-arrow", "tokio", ] diff --git a/kernel/examples/inspect-table/Cargo.toml b/kernel/examples/inspect-table/Cargo.toml index 545baf911..1ccb75c85 100644 --- a/kernel/examples/inspect-table/Cargo.toml +++ b/kernel/examples/inspect-table/Cargo.toml @@ -13,6 +13,7 @@ delta_kernel = { path = "../../../kernel", features = [ "internal-api", ] } env_logger = "0.11.8" +tokio = { version = "1", features = ["rt", "macros"] } # for cargo-release [package.metadata.release] diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index 67a651ae1..8f754023e 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -51,7 +51,8 @@ enum Commands { fn main() -> ExitCode { env_logger::init(); - match try_main() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + match runtime.block_on(try_main()) { Ok(()) => ExitCode::SUCCESS, Err(e) => { println!("{e:#?}"); @@ -177,12 +178,12 @@ fn print_scan_file( ); } -fn try_main() -> DeltaResult<()> { +async fn try_main() -> DeltaResult<()> { let cli = Cli::parse(); let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; let engine = common::get_engine(&url, &cli.location_args)?; - let snapshot = Snapshot::builder_for(url).build(&engine)?; + let snapshot = Snapshot::builder_for(url).build(&engine).await?; match cli.command { Commands::TableVersion => { diff --git a/kernel/examples/read-table-changes/Cargo.toml b/kernel/examples/read-table-changes/Cargo.toml index 17c57ef57..95f3859b2 100644 --- a/kernel/examples/read-table-changes/Cargo.toml +++ b/kernel/examples/read-table-changes/Cargo.toml @@ -16,4 +16,6 @@ delta_kernel = { path = "../../../kernel", features = [ "default-engine-rustls", "internal-api", ] } +futures = "0.3" itertools = "0.14" +tokio = { version = "1", features = ["rt", "macros"] } diff --git a/kernel/examples/read-table-changes/src/main.rs b/kernel/examples/read-table-changes/src/main.rs index 8ffee30ff..1a275f29c 100644 --- a/kernel/examples/read-table-changes/src/main.rs +++ b/kernel/examples/read-table-changes/src/main.rs @@ -7,6 +7,7 @@ use delta_kernel::arrow::{compute::filter_record_batch, util::pretty::print_batc use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::table_changes::TableChanges; use delta_kernel::DeltaResult; +use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; #[derive(Parser)] @@ -24,15 +25,16 @@ struct Cli { end_version: Option, } -fn main() -> DeltaResult<()> { +#[tokio::main] +async fn main() -> DeltaResult<()> { let cli = Cli::parse(); let url = delta_kernel::try_parse_uri(cli.location_args.path.as_str())?; let engine = common::get_engine(&url, &cli.location_args)?; - let table_changes = TableChanges::try_new(url, &engine, cli.start_version, cli.end_version)?; + let table_changes = TableChanges::try_new(url, &engine, cli.start_version, cli.end_version).await?; let table_changes_scan = table_changes.into_scan_builder().build()?; let batches: Vec = table_changes_scan - .execute(Arc::new(engine))? + .execute(Arc::new(engine)).await? .map(|scan_result| -> DeltaResult<_> { let scan_result = scan_result?; let mask = scan_result.full_mask(); @@ -48,7 +50,7 @@ fn main() -> DeltaResult<()> { Ok(record_batch) } }) - .try_collect()?; + .try_collect().await?; print_batches(&batches)?; Ok(()) } diff --git a/kernel/examples/read-table-multi-threaded/Cargo.toml b/kernel/examples/read-table-multi-threaded/Cargo.toml index 17253b98d..261dd05c7 100644 --- a/kernel/examples/read-table-multi-threaded/Cargo.toml +++ b/kernel/examples/read-table-multi-threaded/Cargo.toml @@ -15,6 +15,7 @@ delta_kernel = { path = "../../../kernel", features = [ ] } env_logger = "0.11.8" spmc = "0.3.0" +tokio = { version = "1", features = ["rt", "macros"] } url = "2" # for cargo-release diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index e5afea11c..b495845e2 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -41,7 +41,8 @@ struct Cli { fn main() -> ExitCode { env_logger::init(); - match try_main() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + match runtime.block_on(try_main()) { Ok(()) => ExitCode::SUCCESS, Err(e) => { println!("{e:#?}"); @@ -93,13 +94,13 @@ struct ScanState { logical_schema: SchemaRef, } -fn try_main() -> DeltaResult<()> { +async fn try_main() -> DeltaResult<()> { let cli = Cli::parse(); let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; println!("Reading {url}"); let engine = common::get_engine(&url, &cli.location_args)?; - let snapshot = Snapshot::builder_for(url).build(&engine)?; + let snapshot = Snapshot::builder_for(url).build(&engine).await?; let Some(scan) = common::get_scan(snapshot, &cli.scan_args)? else { return Ok(()); }; diff --git a/kernel/examples/read-table-single-threaded/Cargo.toml b/kernel/examples/read-table-single-threaded/Cargo.toml index d13457a5a..47b29c5dd 100644 --- a/kernel/examples/read-table-single-threaded/Cargo.toml +++ b/kernel/examples/read-table-single-threaded/Cargo.toml @@ -15,6 +15,7 @@ delta_kernel = { path = "../../../kernel", features = [ ] } env_logger = "0.11.8" itertools = "0.14" +tokio = { version = "1", features = ["rt", "macros"] } # for cargo-release [package.metadata.release] diff --git a/kernel/examples/read-table-single-threaded/src/main.rs b/kernel/examples/read-table-single-threaded/src/main.rs index b9ba43eeb..e65058004 100644 --- a/kernel/examples/read-table-single-threaded/src/main.rs +++ b/kernel/examples/read-table-single-threaded/src/main.rs @@ -29,7 +29,8 @@ struct Cli { fn main() -> ExitCode { env_logger::init(); - match try_main() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + match runtime.block_on(try_main()) { Ok(()) => ExitCode::SUCCESS, Err(e) => { println!("{e:#?}"); @@ -38,12 +39,12 @@ fn main() -> ExitCode { } } -fn try_main() -> DeltaResult<()> { +async fn try_main() -> DeltaResult<()> { let cli = Cli::parse(); let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; println!("Reading {url}"); let engine = common::get_engine(&url, &cli.location_args)?; - let snapshot = Snapshot::builder_for(url).build(&engine)?; + let snapshot = Snapshot::builder_for(url).build(&engine).await?; let Some(scan) = common::get_scan(snapshot, &cli.scan_args)? else { return Ok(()); }; diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index 138724ff4..a83f2eaea 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -125,7 +125,7 @@ async fn create_or_get_base_snapshot( schema_str: &str, ) -> DeltaResult { // Check if table already exists - match Snapshot::builder_for(url.clone()).build(engine) { + match Snapshot::builder_for(url.clone()).build(engine).await { Ok(snapshot) => { println!("✓ Found existing table at version {}", snapshot.version()); Ok(snapshot) @@ -135,7 +135,7 @@ async fn create_or_get_base_snapshot( println!("Creating new Delta table..."); let schema = parse_schema(schema_str)?; create_table(url, &schema).await?; - Snapshot::builder_for(url.clone()).build(engine) + Snapshot::builder_for(url.clone()).build(engine).await } } } @@ -294,7 +294,7 @@ async fn read_and_display_data( table_url: &Url, engine: DefaultEngine, ) -> DeltaResult<()> { - let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine).await?; let scan = snapshot.scan_builder().build()?; let batches: Vec = scan diff --git a/kernel/src/actions/set_transaction.rs b/kernel/src/actions/set_transaction.rs index d263521f5..0238ee7a0 100644 --- a/kernel/src/actions/set_transaction.rs +++ b/kernel/src/actions/set_transaction.rs @@ -105,7 +105,7 @@ mod tests { use crate::arrow::array::StringArray; use itertools::Itertools; - fn get_latest_transactions( + async fn get_latest_transactions( path: &str, app_id: &str, ) -> (SetTransactionMap, Option) { @@ -113,7 +113,7 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).await.unwrap(); let log_segment = snapshot.log_segment(); ( @@ -122,13 +122,13 @@ mod tests { ) } - #[test] - fn test_txn() { - let (txns, txn) = get_latest_transactions("./tests/data/basic_partitioned/", "test"); + #[tokio::test] + async fn test_txn() { + let (txns, txn) = get_latest_transactions("./tests/data/basic_partitioned/", "test").await; assert!(txn.is_none()); assert_eq!(txns.len(), 0); - let (txns, txn) = get_latest_transactions("./tests/data/app-txn-no-checkpoint/", "my-app"); + let (txns, txn) = get_latest_transactions("./tests/data/app-txn-no-checkpoint/", "my-app").await; assert!(txn.is_some()); assert_eq!(txns.len(), 2); assert_eq!(txns.get("my-app"), txn.as_ref()); @@ -142,7 +142,7 @@ mod tests { .as_ref() ); - let (txns, txn) = get_latest_transactions("./tests/data/app-txn-checkpoint/", "my-app"); + let (txns, txn) = get_latest_transactions("./tests/data/app-txn-checkpoint/", "my-app").await; assert!(txn.is_some()); assert_eq!(txns.len(), 2); assert_eq!(txns.get("my-app"), txn.as_ref()); @@ -157,13 +157,13 @@ mod tests { ); } - #[test] - fn test_replay_for_app_ids() { + #[tokio::test] + async fn test_replay_for_app_ids() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/")); let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).await.unwrap(); let log_segment = snapshot.log_segment(); // The checkpoint has five parts, each containing one action. There are two app ids. @@ -174,13 +174,13 @@ mod tests { assert_eq!(data.len(), 2); } - #[test] - fn test_txn_retention_filtering() { + #[tokio::test] + async fn test_txn_retention_filtering() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/app-txn-with-last-updated/")); let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).await.unwrap(); let log_segment = snapshot.log_segment(); // Test with no retention (should get all transactions) diff --git a/kernel/src/checkpoint/tests.rs b/kernel/src/checkpoint/tests.rs index bcc4e96c0..ce0f32a27 100644 --- a/kernel/src/checkpoint/tests.rs +++ b/kernel/src/checkpoint/tests.rs @@ -22,8 +22,8 @@ use serde_json::{from_slice, json, Value}; use test_utils::delta_path_for_version; use url::Url; -#[test] -fn test_deleted_file_retention_timestamp() -> DeltaResult<()> { +#[tokio::test] +async fn test_deleted_file_retention_timestamp() -> DeltaResult<()> { const MILLIS_PER_SECOND: i64 = 1_000; let reference_time_secs = 10_000; @@ -55,8 +55,8 @@ fn test_deleted_file_retention_timestamp() -> DeltaResult<()> { Ok(()) } -#[test] -fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { +#[tokio::test] +async fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); @@ -72,7 +72,7 @@ fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { )?; let table_root = Url::parse("memory:///")?; - let snapshot = Snapshot::builder_for(table_root).build(&engine)?; + let snapshot = Snapshot::builder_for(table_root).build(&engine).await?; let writer = snapshot.checkpoint()?; let checkpoint_batch = writer.create_checkpoint_metadata_batch(&engine)?; @@ -109,8 +109,8 @@ fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { Ok(()) } -#[test] -fn test_create_last_checkpoint_data() -> DeltaResult<()> { +#[tokio::test] +async fn test_create_last_checkpoint_data() -> DeltaResult<()> { let version = 10; let total_actions_counter = 100; let add_actions_counter = 75; @@ -268,8 +268,8 @@ fn read_last_checkpoint_file(store: &Arc) -> DeltaResult { /// Tests the `checkpoint()` API with: /// - A table that does not support v2Checkpoint /// - No version specified (latest version is used) -#[test] -fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { +#[tokio::test] +async fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); @@ -295,7 +295,7 @@ fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { )?; let table_root = Url::parse("memory:///")?; - let snapshot = Snapshot::builder_for(table_root).build(&engine)?; + let snapshot = Snapshot::builder_for(table_root).build(&engine).await?; let writer = snapshot.checkpoint()?; // Verify the checkpoint file path is the latest version by default. @@ -338,8 +338,8 @@ fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { /// Tests the `checkpoint()` API with: /// - A table that does not support v2Checkpoint /// - A specific version specified (version 0) -#[test] -fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { +#[tokio::test] +async fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); @@ -365,7 +365,7 @@ fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { // Specify version 0 for checkpoint let snapshot = Snapshot::builder_for(table_root) .at_version(0) - .build(&engine)?; + .build(&engine).await?; let writer = snapshot.checkpoint()?; // Verify the checkpoint file path is the specified version. @@ -400,8 +400,8 @@ fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { Ok(()) } -#[test] -fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaResult<()> { +#[tokio::test] +async fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); @@ -415,7 +415,7 @@ fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaR let table_root = Url::parse("memory:///")?; let snapshot = Snapshot::builder_for(table_root) .at_version(0) - .build(&engine)?; + .build(&engine).await?; let writer = snapshot.checkpoint()?; let data_iter = writer.checkpoint_data(&engine)?; @@ -442,8 +442,8 @@ fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaR /// Tests the `checkpoint()` API with: /// - A table that does supports v2Checkpoint /// - No version specified (latest version is used) -#[test] -fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { +#[tokio::test] +async fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); @@ -469,7 +469,7 @@ fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { )?; let table_root = Url::parse("memory:///")?; - let snapshot = Snapshot::builder_for(table_root).build(&engine)?; + let snapshot = Snapshot::builder_for(table_root).build(&engine).await?; let writer = snapshot.checkpoint()?; // Verify the checkpoint file path is the latest version by default. diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 08d03107c..48079fd82 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use bytes::Bytes; use delta_kernel_derive::internal_api; -use futures::stream::StreamExt; +use futures::stream::{BoxStream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::path::Path; use object_store::{DynObjectStore, ObjectStore}; @@ -37,10 +37,7 @@ impl ObjectStoreStorageHandler { } impl StorageHandler for ObjectStoreStorageHandler { - fn list_from( - &self, - path: &Url, - ) -> DeltaResult>>> { + fn list_from(&self, path: &Url) -> DeltaResult>> { // The offset is used for list-after; the prefix is used to restrict the listing to a specific directory. // Unfortunately, `Path` provides no easy way to check whether a name is directory-like, // because it strips trailing /, so we're reduced to manually checking the original URL. @@ -80,39 +77,53 @@ impl StorageHandler for ObjectStoreStorageHandler { // So we just need to know if we're local and then if so, we sort the returned file list let has_ordered_listing = path.scheme() != "file"; - // This channel will become the iterator - let (sender, receiver) = std::sync::mpsc::sync_channel(4_000); let url = path.clone(); - self.task_executor.spawn(async move { - let mut stream = store.list_with_offset(Some(&prefix), &offset); - - while let Some(meta) = stream.next().await { - match meta { - Ok(meta) => { - let mut location = url.clone(); - location.set_path(&format!("/{}", meta.location.as_ref())); - sender - .send(Ok(FileMeta { + + if !has_ordered_listing { + // For local filesystem, we need to collect and sort first + let stream = store + .list_with_offset(Some(&prefix), &offset) + .map(move |meta_result| { + meta_result + .map(|meta| { + let mut location = url.clone(); + location.set_path(&format!("/{}", meta.location.as_ref())); + FileMeta { location, last_modified: meta.last_modified.timestamp_millis(), size: meta.size, - })) - .ok(); - } - Err(e) => { - sender.send(Err(e.into())).ok(); - } - } - } - }); - - if !has_ordered_listing { - // This FS doesn't return things in the order we require - let mut fms: Vec = receiver.into_iter().try_collect()?; - fms.sort_unstable(); - Ok(Box::new(fms.into_iter().map(Ok))) + } + }) + .map_err(|e: object_store::Error| -> Error { e.into() }) + }); + + // Collect all items, sort them, then return as a stream + let collected = Box::pin(async move { + let mut items: Vec = stream.try_collect().await?; + items.sort_unstable(); + Ok::<_, Error>(futures::stream::iter(items.into_iter().map(Ok))) + }); + + Ok(Box::pin(futures::stream::once(collected).try_flatten())) } else { - Ok(Box::new(receiver.into_iter())) + // For cloud storage, we can stream directly since it's already sorted + let stream = store + .list_with_offset(Some(&prefix), &offset) + .map(move |meta_result| { + meta_result + .map(|meta| { + let mut location = url.clone(); + location.set_path(&format!("/{}", meta.location.as_ref())); + FileMeta { + location, + last_modified: meta.last_modified.timestamp_millis(), + size: meta.size, + } + }) + .map_err(|e: object_store::Error| -> Error { e.into() }) + }); + + Ok(Box::pin(stream)) } } @@ -182,6 +193,7 @@ mod tests { use std::ops::Range; use std::time::Duration; + use futures::TryStreamExt; use itertools::Itertools; use object_store::memory::InMemory; use object_store::{local::LocalFileSystem, ObjectStore}; @@ -255,6 +267,7 @@ mod tests { .list_from(&table_root.join("_delta_log").unwrap().join("0").unwrap()) .unwrap() .try_collect() + .await .unwrap(); assert!(!files.is_empty()); @@ -280,20 +293,21 @@ mod tests { let url = Url::from_directory_path(tmp.path()).unwrap(); let store = Arc::new(LocalFileSystem::new()); let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); - let files = engine + let files: Vec<_> = engine .storage_handler() .list_from(&url.join("_delta_log").unwrap().join("0").unwrap()) + .unwrap() + .try_collect() + .await .unwrap(); let mut len = 0; - for (file, expected) in files.zip(expected_names.iter()) { + for (file, expected) in files.iter().zip(expected_names.iter()) { assert!( - file.as_ref() - .unwrap() - .location + file.location .path() .ends_with(expected.as_ref()), "{} does not end with {}", - file.unwrap().location.path(), + file.location.path(), expected ); len += 1; diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 051a6a32c..7b3e9e927 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -174,13 +174,13 @@ mod tests { use crate::engine::tests::test_arrow_engine; use object_store::local::LocalFileSystem; - #[test] - fn test_default_engine() { + #[tokio::test] + async fn test_default_engine() { let tmp = tempfile::tempdir().unwrap(); let url = Url::from_directory_path(tmp.path()).unwrap(); let object_store = Arc::new(LocalFileSystem::new()); let engine = DefaultEngine::new(object_store, Arc::new(TokioBackgroundExecutor::new())); - test_arrow_engine(&engine, &url); + test_arrow_engine(&engine, &url).await; } #[test] diff --git a/kernel/src/engine/mod.rs b/kernel/src/engine/mod.rs index 6290e6310..d21e59684 100644 --- a/kernel/src/engine/mod.rs +++ b/kernel/src/engine/mod.rs @@ -29,6 +29,7 @@ pub mod parquet_row_group_skipping; #[cfg(test)] mod tests { + use futures::TryStreamExt; use itertools::Itertools; use object_store::path::Path; use std::sync::Arc; @@ -41,7 +42,7 @@ mod tests { use test_utils::delta_path_for_version; - fn test_list_from_should_sort_and_filter( + async fn test_list_from_should_sort_and_filter( engine: &dyn Engine, base_url: &Url, engine_data: impl Fn() -> Box, @@ -64,7 +65,7 @@ mod tests { // list files after an offset let test_url = base_url.join(expected_names[0].as_ref()).unwrap(); - let files: Vec<_> = storage.list_from(&test_url).unwrap().try_collect().unwrap(); + let files: Vec<_> = storage.list_from(&test_url).unwrap().try_collect().await.unwrap(); assert_eq!(files.len(), expected_names.len() - 1); for (file, expected) in files.iter().zip(expected_names.iter().skip(1)) { assert_eq!(file.location, base_url.join(expected.as_ref()).unwrap()); @@ -73,12 +74,12 @@ mod tests { let test_url = base_url .join(delta_path_for_version(0, "json").as_ref()) .unwrap(); - let files: Vec<_> = storage.list_from(&test_url).unwrap().try_collect().unwrap(); + let files: Vec<_> = storage.list_from(&test_url).unwrap().try_collect().await.unwrap(); assert_eq!(files.len(), expected_names.len()); // list files inside a directory / key prefix let test_url = base_url.join("_delta_log/").unwrap(); - let files: Vec<_> = storage.list_from(&test_url).unwrap().try_collect().unwrap(); + let files: Vec<_> = storage.list_from(&test_url).unwrap().try_collect().await.unwrap(); assert_eq!(files.len(), expected_names.len()); for (file, expected) in files.iter().zip(expected_names.iter()) { assert_eq!(file.location, base_url.join(expected.as_ref()).unwrap()); @@ -99,7 +100,7 @@ mod tests { Box::new(ArrowEngineData::new(data)) } - pub(crate) fn test_arrow_engine(engine: &dyn Engine, base_url: &Url) { - test_list_from_should_sort_and_filter(engine, base_url, get_arrow_data); + pub(crate) async fn test_arrow_engine(engine: &dyn Engine, base_url: &Url) { + test_list_from_should_sort_and_filter(engine, base_url, get_arrow_data).await; } } diff --git a/kernel/src/engine/sync/mod.rs b/kernel/src/engine/sync/mod.rs index 10e3cc6b1..49f0fdad9 100644 --- a/kernel/src/engine/sync/mod.rs +++ b/kernel/src/engine/sync/mod.rs @@ -103,11 +103,11 @@ mod tests { use super::*; use crate::engine::tests::test_arrow_engine; - #[test] - fn test_sync_engine() { + #[tokio::test] + async fn test_sync_engine() { let tmp = tempfile::tempdir().unwrap(); let url = url::Url::from_directory_path(tmp.path()).unwrap(); let engine = SyncEngine::new(); - test_arrow_engine(&engine, &url); + test_arrow_engine(&engine, &url).await; } } diff --git a/kernel/src/engine/sync/storage.rs b/kernel/src/engine/sync/storage.rs index fd48e7f8e..58819c5a2 100644 --- a/kernel/src/engine/sync/storage.rs +++ b/kernel/src/engine/sync/storage.rs @@ -1,4 +1,5 @@ use bytes::Bytes; +use futures::stream::BoxStream; use itertools::Itertools; use url::Url; @@ -9,10 +10,7 @@ pub(crate) struct SyncStorageHandler; impl StorageHandler for SyncStorageHandler { /// List the paths in the same directory that are lexicographically greater or equal to /// (UTF-8 sorting) the given `path`. The result is sorted by the file name. - fn list_from( - &self, - url_path: &Url, - ) -> DeltaResult>>> { + fn list_from(&self, url_path: &Url) -> DeltaResult>> { if url_path.scheme() == "file" { let path = url_path .to_file_path() @@ -46,7 +44,7 @@ impl StorageHandler for SyncStorageHandler { .into_iter() .sorted_by_key(|ent| ent.path()) .map(TryFrom::try_from); - Ok(Box::new(it)) + Ok(Box::pin(futures::stream::iter(it))) } else { Err(Error::generic("Can only read local filesystem")) } @@ -79,6 +77,7 @@ mod tests { use std::time::Duration; use bytes::{BufMut, BytesMut}; + use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use url::Url; @@ -91,8 +90,8 @@ mod tests { format!("{index:020}.json") } - #[test] - fn test_file_meta_is_correct() -> Result<(), Box> { + #[tokio::test] + async fn test_file_meta_is_correct() -> Result<(), Box> { let storage = SyncStorageHandler; let tmp_dir = tempfile::tempdir().unwrap(); @@ -105,7 +104,7 @@ mod tests { let url_path = tmp_dir.path().join(get_json_filename(0)); let url = Url::from_file_path(url_path).unwrap(); - let files: Vec<_> = storage.list_from(&url)?.try_collect()?; + let files: Vec<_> = storage.list_from(&url)?.try_collect().await?; assert!(!files.is_empty()); for meta in files.iter() { @@ -115,8 +114,8 @@ mod tests { Ok(()) } - #[test] - fn test_list_from() -> Result<(), Box> { + #[tokio::test] + async fn test_list_from() -> Result<(), Box> { let storage = SyncStorageHandler; let tmp_dir = tempfile::tempdir().unwrap(); let mut expected = vec![]; @@ -128,28 +127,36 @@ mod tests { } let url_path = tmp_dir.path().join(get_json_filename(1)); let url = Url::from_file_path(url_path).unwrap(); - let list = storage.list_from(&url)?; + let mut list = storage.list_from(&url)?; let mut file_count = 0; - for (i, file) in list.enumerate() { + let mut i = 0; + while let Some(file) = list.try_next().await? { // i+1 in index because we started at 0001 in the listing assert_eq!( - file?.location.to_file_path().unwrap().to_str().unwrap(), + file.location.to_file_path().unwrap().to_str().unwrap(), expected[i + 2].to_str().unwrap() ); file_count += 1; + i += 1; } assert_eq!(file_count, 1); let url_path = tmp_dir.path().join(""); let url = Url::from_file_path(url_path).unwrap(); - let list = storage.list_from(&url)?; - file_count = list.count(); + let mut list = storage.list_from(&url)?; + file_count = 0; + while let Some(_) = list.try_next().await? { + file_count += 1; + } assert_eq!(file_count, 3); let url_path = tmp_dir.path().join(format!("{:020}", 1)); let url = Url::from_file_path(url_path).unwrap(); - let list = storage.list_from(&url)?; - file_count = list.count(); + let mut list = storage.list_from(&url)?; + file_count = 0; + while let Some(_) = list.try_next().await? { + file_count += 1; + } assert_eq!(file_count, 2); Ok(()) } diff --git a/kernel/src/last_checkpoint_hint.rs b/kernel/src/last_checkpoint_hint.rs index 363180856..7e10fd1c3 100644 --- a/kernel/src/last_checkpoint_hint.rs +++ b/kernel/src/last_checkpoint_hint.rs @@ -50,7 +50,7 @@ impl LastCheckpointHint { /// are assumed to cause failure. // TODO(#1047): weird that we propagate FileNotFound as part of the iterator instead of top- // level result coming from storage.read_files - pub(crate) fn try_read( + pub(crate) async fn try_read( storage: &dyn StorageHandler, log_root: &Url, ) -> DeltaResult> { diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index bcbb7b44c..296b53a84 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -80,6 +80,7 @@ use std::time::SystemTime; use std::{cmp::Ordering, ops::Range}; use bytes::Bytes; +use futures::stream::BoxStream; use url::Url; use self::schema::{DataType, SchemaRef}; @@ -521,8 +522,7 @@ pub trait StorageHandler: AsAny { /// /// If the path is directory-like (ends with '/'), the result should contain /// all the files in the directory. - fn list_from(&self, path: &Url) - -> DeltaResult>>>; + fn list_from(&self, path: &Url) -> DeltaResult>>; /// Read data specified by the start and end offset from the file. fn read_files( diff --git a/kernel/src/listed_log_files.rs b/kernel/src/listed_log_files.rs index 2aaa10b1c..88625cf43 100644 --- a/kernel/src/listed_log_files.rs +++ b/kernel/src/listed_log_files.rs @@ -20,7 +20,8 @@ use crate::{DeltaResult, Error, StorageHandler, Version}; use delta_kernel_derive::internal_api; -use itertools::Itertools; +use futures::stream::{self, BoxStream}; +use futures::{StreamExt, TryStreamExt}; use tracing::{info, warn}; use url::Url; @@ -40,9 +41,9 @@ pub(crate) struct ListedLogFiles { pub(crate) latest_crc_file: Option, } -/// Returns a fallible iterator of [`ParsedLogPath`] over versions `start_version..=end_version` +/// Returns a fallible stream of [`ParsedLogPath`] over versions `start_version..=end_version` /// taking into account the `log_tail` which was (ostentibly) returned from the catalog. If there -/// are fewer files than requested (e.g. `end_version` is past the end of the log), the iterator +/// are fewer files than requested (e.g. `end_version` is past the end of the log), the stream /// will simply end before reaching `end_version`. /// /// Note that the `log_tail` must strictly adhere to being a 'tail' - that is, it is a contiguous @@ -59,13 +60,13 @@ pub(crate) struct ListedLogFiles { /// 1. list from the storage handler and filter based on [`ParsedLogPath::should_list`] (to prevent /// listing staged commits) /// 2. add the log_tail from the catalog -fn list_log_files( - storage: &dyn StorageHandler, +fn list_log_files<'a>( + storage: &'a dyn StorageHandler, log_root: &Url, log_tail: Vec, start_version: impl Into>, end_version: impl Into>, -) -> DeltaResult>> { +) -> DeltaResult>> { // check log_tail is only commits // note that LogSegment checks no gaps/duplicates so we don't duplicate that here debug_assert!( @@ -86,38 +87,48 @@ fn list_log_files( // if the log_tail covers the entire requested range (i.e. starts at or before start_version), // we skip listing entirely. note that if we don't include this check, we will end up listing // and then just filtering out all the files we listed. - let listed_files = log_tail_start - // log_tail covers the entire requested range, so no listing is required + let listed_stream: BoxStream<'a, DeltaResult> = if log_tail_start .is_none_or(|tail_start| start_version < tail_start.version) - .then(|| -> DeltaResult<_> { - // NOTE: since engine APIs don't limit listing, we list from start_version and filter - Ok(storage - .list_from(&start_from)? - .map(|meta| ParsedLogPath::try_from(meta?)) + { + // NOTE: since engine APIs don't limit listing, we list from start_version and filter + let file_stream = storage.list_from(&start_from)?; + + Box::pin( + file_stream + .map(move |meta_result| { + meta_result.and_then(|meta| ParsedLogPath::try_from(meta).map_err(Into::into)) + }) // NOTE: this filters out .crc files etc which start with "." - some engines // produce `.something.parquet.crc` corresponding to `something.parquet`. Kernel // doesn't care about these files. Critically, note these are _different_ than // normal `version.crc` files which are listed + captured normally. Additionally // we likely aren't even 'seeing' these files since lexicographically the string // "." comes before the string "0". - .filter_map_ok(|path_opt| path_opt.filter(|p| p.should_list())) - .take_while(move |path_res| match path_res { + .try_filter_map(move |path_opt| async move { + Ok(path_opt.filter(|p| p.should_list())) + }) + .take_while(move |path_res| { // discard any path with too-large version; keep errors - Ok(path) => path.version <= list_end_version, - Err(_) => true, - })) - }) - .transpose()? - .into_iter() - .flatten(); + match path_res { + Ok(path) => futures::future::ready(path.version <= list_end_version), + Err(_) => futures::future::ready(true), + } + }) + ) + } else { + // log_tail covers the entire requested range, so no listing is required + Box::pin(stream::empty()) + }; // return chained [listed_files..log_tail], filtering log_tail by the requested range - let filtered_log_tail = log_tail - .into_iter() - .filter(move |entry| entry.version >= start_version && entry.version <= end_version) - .map(Ok); + let filtered_log_tail = stream::iter( + log_tail + .into_iter() + .filter(move |entry| entry.version >= start_version && entry.version <= end_version) + .map(Ok) + ); - Ok(listed_files.chain(filtered_log_tail)) + Ok(Box::pin(listed_stream.chain(filtered_log_tail))) } /// Groups all checkpoint parts according to the checkpoint they belong to. @@ -222,7 +233,7 @@ impl ListedLogFiles { /// List all commits between the provided `start_version` (inclusive) and `end_version` /// (inclusive). All other types are ignored. - pub(crate) fn list_commits( + pub(crate) async fn list_commits( storage: &dyn StorageHandler, log_root: &Url, start_version: Option, @@ -230,10 +241,19 @@ impl ListedLogFiles { ) -> DeltaResult { // TODO: plumb through a log_tail provided by our caller let log_tail = vec![]; - let listed_commits = - list_log_files(storage, log_root, log_tail, start_version, end_version)? - .filter_ok(|log_file| log_file.is_commit()) - .try_collect()?; + let mut stream = list_log_files(storage, log_root, log_tail, start_version, end_version)?; + + let mut listed_commits = Vec::new(); + + use futures::TryStreamExt; + + // Process stream incrementally, filtering for commits only + while let Some(log_file) = stream.try_next().await? { + if log_file.is_commit() { + listed_commits.push(log_file); + } + } + ListedLogFiles::try_new(listed_commits, vec![], vec![], None) } @@ -242,7 +262,7 @@ impl ListedLogFiles { // TODO: encode some of these guarantees in the output types. e.g. we could have: // - SortedCommitFiles: Vec, is_ascending: bool, end_version: Version // - CheckpointParts: Vec, checkpoint_version: Version (guarantee all same version) - pub(crate) fn list( + pub(crate) async fn list( storage: &dyn StorageHandler, log_root: &Url, start_version: Option, @@ -250,71 +270,114 @@ impl ListedLogFiles { ) -> DeltaResult { // TODO: plumb through a log_tail provided by our caller let log_tail = vec![]; - let log_files = list_log_files(storage, log_root, log_tail, start_version, end_version)?; - - log_files.process_results(|iter| { - let mut ascending_commit_files = Vec::new(); - let mut ascending_compaction_files = Vec::new(); - let mut checkpoint_parts = vec![]; - let mut latest_crc_file: Option = None; - - // Group log files by version - let log_files_per_version = iter.chunk_by(|x| x.version); - - for (version, files) in &log_files_per_version { - let mut new_checkpoint_parts = vec![]; - for file in files { - use LogPathFileType::*; - match file.file_type { - Commit | StagedCommit => ascending_commit_files.push(file), - CompactedCommit { hi } if end_version.is_none_or(|end| hi <= end) => { - ascending_compaction_files.push(file); - } - CompactedCommit { .. } => (), // Failed the bounds check above - SinglePartCheckpoint | UuidCheckpoint(_) | MultiPartCheckpoint { .. } => { - new_checkpoint_parts.push(file) - } - Crc => { - let latest_crc_ref = latest_crc_file.as_ref(); - if latest_crc_ref.is_none_or(|latest| latest.version < file.version) { - latest_crc_file = Some(file); - } - } - Unknown => { - warn!( - "Found file {} with unknown file type {:?} at version {}", - file.filename, file.file_type, version - ); - } + let mut stream = list_log_files(storage, log_root, log_tail, start_version, end_version)?; + + let mut ascending_commit_files = Vec::new(); + let mut ascending_compaction_files = Vec::new(); + let mut checkpoint_parts = vec![]; + let mut latest_crc_file: Option = None; + + // Process stream incrementally, manually handling version grouping + let mut current_version_files = Vec::new(); + let mut current_version = None; + + use futures::TryStreamExt; + + while let Some(file) = stream.try_next().await? { + // If version changed, process the accumulated files for the previous version + if current_version.is_some() && current_version != Some(file.version) { + Self::process_version_group( + current_version.unwrap(), + ¤t_version_files, + &mut ascending_commit_files, + &mut ascending_compaction_files, + &mut checkpoint_parts, + &mut latest_crc_file, + end_version, + ); + current_version_files.clear(); + } + + current_version = Some(file.version); + current_version_files.push(file); + } + + // Process the final version group + if let Some(version) = current_version { + Self::process_version_group( + version, + ¤t_version_files, + &mut ascending_commit_files, + &mut ascending_compaction_files, + &mut checkpoint_parts, + &mut latest_crc_file, + end_version, + ); + } + + ListedLogFiles::try_new( + ascending_commit_files, + ascending_compaction_files, + checkpoint_parts, + latest_crc_file, + ) + } + + fn process_version_group( + version: Version, + files: &[ParsedLogPath], + ascending_commit_files: &mut Vec, + ascending_compaction_files: &mut Vec, + checkpoint_parts: &mut Vec, + latest_crc_file: &mut Option, + end_version: Option, + ) { + let mut new_checkpoint_parts = vec![]; + + for file in files { + use LogPathFileType::*; + match file.file_type { + Commit | StagedCommit => ascending_commit_files.push(file.clone()), + CompactedCommit { hi } if end_version.is_none_or(|end| hi <= end) => { + ascending_compaction_files.push(file.clone()); + } + CompactedCommit { .. } => (), // Failed the bounds check above + SinglePartCheckpoint | UuidCheckpoint(_) | MultiPartCheckpoint { .. } => { + new_checkpoint_parts.push(file.clone()) + } + Crc => { + let latest_crc_ref = latest_crc_file.as_ref(); + if latest_crc_ref.is_none_or(|latest| latest.version < file.version) { + *latest_crc_file = Some(file.clone()); } } - // Group and find the first complete checkpoint for this version. - // All checkpoints for the same version are equivalent, so we only take one. - if let Some((_, complete_checkpoint)) = group_checkpoint_parts(new_checkpoint_parts) - .into_iter() - // `num_parts` is guaranteed to be non-negative and within `usize` range - .find(|(num_parts, part_files)| part_files.len() == *num_parts as usize) - { - checkpoint_parts = complete_checkpoint; - // Log replay only uses commits/compactions after a complete checkpoint - ascending_commit_files.clear(); - ascending_compaction_files.clear(); + Unknown => { + warn!( + "Found file {} with unknown file type {:?} at version {}", + file.filename, file.file_type, version + ); } } + } - ListedLogFiles::try_new( - ascending_commit_files, - ascending_compaction_files, - checkpoint_parts, - latest_crc_file, - ) - })? + // Group and find the first complete checkpoint for this version. + // All checkpoints for the same version are equivalent, so we only take one. + if let Some((_, complete_checkpoint)) = group_checkpoint_parts(new_checkpoint_parts) + .into_iter() + // `num_parts` is guaranteed to be non-negative and within `usize` range + .find(|(num_parts, part_files)| part_files.len() == *num_parts as usize) + { + *checkpoint_parts = complete_checkpoint; + // Log replay only uses commits/compactions after a complete checkpoint + ascending_commit_files.clear(); + ascending_compaction_files.clear(); + } } /// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all /// the returned [`ParsedLogPath`]s will have a version less than or equal to the `end_version`. /// See [`list_log_files_with_version`] for details on the return type. - pub(crate) fn list_with_checkpoint_hint( + pub(crate) async fn list_with_checkpoint_hint( checkpoint_metadata: &LastCheckpointHint, storage: &dyn StorageHandler, log_root: &Url, @@ -325,7 +388,7 @@ impl ListedLogFiles { log_root, Some(checkpoint_metadata.version), end_version, - )?; + ).await?; let Some(latest_checkpoint) = listed_files.checkpoint_parts.last() else { // TODO: We could potentially recover here @@ -472,10 +535,11 @@ mod list_log_files_with_log_tail_tests { ]; let (storage, log_root) = create_storage(log_files); - let result: Vec<_> = list_log_files(storage.as_ref(), &log_root, vec![], Some(1), Some(2)) - .unwrap() - .try_collect() - .unwrap(); + use futures::executor::block_on; + let stream = list_log_files(storage.as_ref(), &log_root, vec![], Some(1), Some(2)).unwrap(); + let result: Vec<_> = block_on(async { + stream.try_collect().await + }).unwrap(); assert_eq!(result.len(), 2); assert_eq!(result[0].version, 1); @@ -487,6 +551,7 @@ mod list_log_files_with_log_tail_tests { #[test] fn test_log_tail_as_latest_commits() { + use futures::executor::block_on; // Filesystem has commits 0-2, log_tail has commits 3-5 (the latest) let log_files = vec![ (0, LogPathFileType::Commit, CommitSource::Filesystem), @@ -502,11 +567,10 @@ mod list_log_files_with_log_tail_tests { make_parsed_log_path_with_source(5, LogPathFileType::Commit, CommitSource::Catalog), ]; - let result: Vec<_> = - list_log_files(storage.as_ref(), &log_root, log_tail, Some(0), Some(5)) - .unwrap() - .try_collect() - .unwrap(); + let stream = list_log_files(storage.as_ref(), &log_root, log_tail, Some(0), Some(5)).unwrap(); + let result: Vec<_> = block_on(async { + stream.try_collect().await + }).unwrap(); assert_eq!(result.len(), 6); // filesystem @@ -527,6 +591,8 @@ mod list_log_files_with_log_tail_tests { #[test] fn test_request_subset_with_log_tail() { + use futures::executor::block_on; + // Test requesting a subset when log_tail is the latest commits let log_files = vec![ (0, LogPathFileType::Commit, CommitSource::Filesystem), @@ -542,11 +608,10 @@ mod list_log_files_with_log_tail_tests { ]; // list for only versions 1-3 - let result: Vec<_> = - list_log_files(storage.as_ref(), &log_root, log_tail, Some(1), Some(3)) - .unwrap() - .try_collect() - .unwrap(); + let stream = list_log_files(storage.as_ref(), &log_root, log_tail.clone(), Some(1), Some(3)).unwrap(); + let result: Vec<_> = block_on(async { + stream.try_collect().await + }).unwrap(); // The result includes version 1 from filesystem, and log_tail until requested version (2-3) assert_eq!(result.len(), 3); @@ -576,10 +641,10 @@ mod list_log_files_with_log_tail_tests { CommitSource::Catalog, )]; - let result: Vec<_> = list_log_files(storage.as_ref(), &log_root, log_tail, Some(0), None) - .unwrap() - .try_collect() - .unwrap(); + let stream = list_log_files(storage.as_ref(), &log_root, log_tail, Some(0), None).unwrap(); + let result: Vec<_> = block_on(async { + stream.try_collect().await + }).unwrap(); // expect only 0 from file system and 1 from log tail assert_eq!(result.len(), 2); @@ -591,13 +656,11 @@ mod list_log_files_with_log_tail_tests { #[test] fn test_log_tail_covers_entire_range_no_listing() { + use futures::executor::block_on; // test-only storage handler that panics if you use it struct StorageThatPanics {} impl StorageHandler for StorageThatPanics { - fn list_from( - &self, - _path: &Url, - ) -> DeltaResult>>> { + fn list_from(&self, _path: &Url) -> DeltaResult>> { panic!("list_from used"); } fn read_files( @@ -622,10 +685,10 @@ mod list_log_files_with_log_tail_tests { let storage = StorageThatPanics {}; let url = Url::parse("memory:///anything").unwrap(); - let result: Vec<_> = list_log_files(&storage, &url, log_tail, Some(0), Some(2)) - .unwrap() - .try_collect() - .unwrap(); + let stream = list_log_files(&storage, &url, log_tail, Some(0), Some(2)).unwrap(); + let result: Vec<_> = block_on(async { + stream.try_collect().await + }).unwrap(); assert_eq!(result.len(), 3); assert_eq!(result[0].version, 0); @@ -651,10 +714,10 @@ mod list_log_files_with_log_tail_tests { ]; let (storage, log_root) = create_storage(log_files); - let result: Vec<_> = list_log_files(storage.as_ref(), &log_root, vec![], None, None) - .unwrap() - .try_collect() - .unwrap(); + let stream = list_log_files(storage.as_ref(), &log_root, vec![], None, None).unwrap(); + let result: Vec<_> = block_on(async { + stream.try_collect().await + }).unwrap(); // we must only see two regular commits assert_eq!(result.len(), 2); @@ -674,10 +737,10 @@ mod list_log_files_with_log_tail_tests { let (storage, log_root) = create_storage(log_files); // note we let you request end version past the end of log. up to consumer to interpret - let result: Vec<_> = list_log_files(storage.as_ref(), &log_root, vec![], None, Some(3)) - .unwrap() - .try_collect() - .unwrap(); + let stream = list_log_files(storage.as_ref(), &log_root, vec![], None, Some(3)).unwrap(); + let result: Vec<_> = block_on(async { + stream.try_collect().await + }).unwrap(); // we must only see two regular commits assert_eq!(result.len(), 2); diff --git a/kernel/src/log_compaction/tests.rs b/kernel/src/log_compaction/tests.rs index 49dfeb8ab..dda081df2 100644 --- a/kernel/src/log_compaction/tests.rs +++ b/kernel/src/log_compaction/tests.rs @@ -4,27 +4,27 @@ use crate::engine::sync::SyncEngine; use crate::snapshot::Snapshot; use crate::SnapshotRef; -fn create_mock_snapshot() -> SnapshotRef { +async fn create_mock_snapshot() -> SnapshotRef { let path = std::fs::canonicalize(std::path::PathBuf::from( "./tests/data/table-with-dv-small/", )) .unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - Snapshot::builder_for(url).build(&engine).unwrap() + Snapshot::builder_for(url).build(&engine).await.unwrap() } -fn create_multi_version_snapshot() -> SnapshotRef { +async fn create_multi_version_snapshot() -> SnapshotRef { let path = std::fs::canonicalize(std::path::PathBuf::from("./tests/data/basic_partitioned/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - Snapshot::builder_for(url).build(&engine).unwrap() + Snapshot::builder_for(url).build(&engine).await.unwrap() } -#[test] -fn test_log_compaction_writer_creation() { - let snapshot = create_mock_snapshot(); +#[tokio::test] +async fn test_log_compaction_writer_creation() { + let snapshot = create_mock_snapshot().await; let start_version = 0; let end_version = 1; @@ -36,12 +36,12 @@ fn test_log_compaction_writer_creation() { assert!(path.to_string().ends_with(expected_filename)); } -#[test] -fn test_invalid_version_range() { +#[tokio::test] +async fn test_invalid_version_range() { let start_version = 20; let end_version = 10; // Invalid: start > end - let result = LogCompactionWriter::try_new(create_mock_snapshot(), start_version, end_version); + let result = LogCompactionWriter::try_new(create_mock_snapshot().await, start_version, end_version); assert!(result.is_err()); assert!(result @@ -50,12 +50,12 @@ fn test_invalid_version_range() { .contains("Invalid version range")); } -#[test] -fn test_equal_version_range_invalid() { +#[tokio::test] +async fn test_equal_version_range_invalid() { let start_version = 5; let end_version = 5; // Invalid: start == end (must be start < end) - let result = LogCompactionWriter::try_new(create_mock_snapshot(), start_version, end_version); + let result = LogCompactionWriter::try_new(create_mock_snapshot().await, start_version, end_version); assert!(result.is_err()); assert!(result @@ -64,8 +64,8 @@ fn test_equal_version_range_invalid() { .contains("Invalid version range")); } -#[test] -fn test_should_compact() { +#[tokio::test] +async fn test_should_compact() { assert!(should_compact(9, 10)); assert!(!should_compact(5, 10)); assert!(!should_compact(10, 0)); @@ -73,8 +73,8 @@ fn test_should_compact() { assert!(should_compact(19, 10)); } -#[test] -fn test_compaction_actions_schema_access() { +#[tokio::test] +async fn test_compaction_actions_schema_access() { let schema = &*COMPACTION_ACTIONS_SCHEMA; assert!(schema.fields().len() > 0); @@ -86,22 +86,22 @@ fn test_compaction_actions_schema_access() { assert!(field_names.contains(&"protocol")); } -#[test] -fn test_writer_debug_impl() { - let snapshot = create_mock_snapshot(); +#[tokio::test] +async fn test_writer_debug_impl() { + let snapshot = create_mock_snapshot().await; let writer = LogCompactionWriter::try_new(snapshot, 1, 5).unwrap(); let debug_str = format!("{:?}", writer); assert!(debug_str.contains("LogCompactionWriter")); } -#[test] -fn test_compaction_data() { - let snapshot = create_mock_snapshot(); +#[tokio::test] +async fn test_compaction_data() { + let snapshot = create_mock_snapshot().await; let mut writer = LogCompactionWriter::try_new(snapshot, 0, 1).unwrap(); let engine = SyncEngine::new(); - let result = writer.compaction_data(&engine); + let result = writer.compaction_data(&engine).await; assert!(result.is_ok()); let iterator = result.unwrap(); @@ -117,16 +117,16 @@ fn test_compaction_data() { assert!(debug_str.contains("add_actions_count")); } -#[test] -fn test_end_version_exceeds_snapshot_version() { - let snapshot = create_mock_snapshot(); +#[tokio::test] +async fn test_end_version_exceeds_snapshot_version() { + let snapshot = create_mock_snapshot().await; let snapshot_version = snapshot.version(); // Negative test to create a writer with end_version greater than snapshot version let mut writer = LogCompactionWriter::try_new(snapshot, 0, snapshot_version + 100).unwrap(); let engine = SyncEngine::new(); - let result = writer.compaction_data(&engine); + let result = writer.compaction_data(&engine).await; assert!(result.is_err()); assert!(result .unwrap_err() @@ -134,22 +134,22 @@ fn test_end_version_exceeds_snapshot_version() { .contains("exceeds snapshot version")); } -#[test] -fn test_retention_calculator() { - let snapshot = create_mock_snapshot(); +#[tokio::test] +async fn test_retention_calculator() { + let snapshot = create_mock_snapshot().await; let writer = LogCompactionWriter::try_new(snapshot.clone(), 0, 1).unwrap(); let table_props = writer.table_properties(); assert_eq!(table_props, snapshot.table_properties()); } -#[test] -fn test_compaction_data_with_actual_iterator() { - let snapshot = create_multi_version_snapshot(); +#[tokio::test] +async fn test_compaction_data_with_actual_iterator() { + let snapshot = create_multi_version_snapshot().await; let mut writer = LogCompactionWriter::try_new(snapshot, 0, 1).unwrap(); let engine = SyncEngine::new(); - let mut iterator = writer.compaction_data(&engine).unwrap(); + let mut iterator = writer.compaction_data(&engine).await.unwrap(); let mut batch_count = 0; let initial_actions = iterator.total_actions(); @@ -171,9 +171,9 @@ fn test_compaction_data_with_actual_iterator() { assert!(batch_count > 0, "Expected to process at least one batch"); } -#[test] -fn test_compaction_paths() { - let snapshot = create_mock_snapshot(); +#[tokio::test] +async fn test_compaction_paths() { + let snapshot = create_mock_snapshot().await; // Test various version ranges produce correct paths let test_cases = vec![ @@ -206,16 +206,16 @@ fn test_compaction_paths() { } } -#[test] -fn test_version_filtering() { - let snapshot = create_multi_version_snapshot(); +#[tokio::test] +async fn test_version_filtering() { + let snapshot = create_multi_version_snapshot().await; let engine = SyncEngine::new(); let snapshot_version = snapshot.version(); if snapshot_version >= 1 { let mut writer = LogCompactionWriter::try_new(snapshot.clone(), 0, 1).unwrap(); - let result = writer.compaction_data(&engine); + let result = writer.compaction_data(&engine).await; assert!( result.is_ok(), "Failed to get compaction data: {:?}", diff --git a/kernel/src/log_compaction/writer.rs b/kernel/src/log_compaction/writer.rs index 0195e0944..cab85df1a 100644 --- a/kernel/src/log_compaction/writer.rs +++ b/kernel/src/log_compaction/writer.rs @@ -73,7 +73,7 @@ impl LogCompactionWriter { /// Get an iterator over the compaction data to be written /// /// Performs action reconciliation for the version range specified in the constructor - pub fn compaction_data( + pub async fn compaction_data( &mut self, engine: &dyn Engine, ) -> DeltaResult { @@ -93,7 +93,7 @@ impl LogCompactionWriter { self.snapshot.log_segment().log_root.clone(), self.start_version, Some(self.end_version), - )?; + ).await?; // Read actions from the version-filtered log segment let actions_iter = compaction_log_segment.read_actions( diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 5500e5522..8d730df0f 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -141,18 +141,18 @@ impl LogSegment { /// /// [`Snapshot`]: crate::snapshot::Snapshot #[internal_api] - pub(crate) fn for_snapshot( + pub(crate) async fn for_snapshot( storage: &dyn StorageHandler, log_root: Url, time_travel_version: impl Into>, ) -> DeltaResult { let time_travel_version = time_travel_version.into(); - let checkpoint_hint = LastCheckpointHint::try_read(storage, &log_root)?; - Self::for_snapshot_impl(storage, log_root, checkpoint_hint, time_travel_version) + let checkpoint_hint = LastCheckpointHint::try_read(storage, &log_root).await?; + Self::for_snapshot_impl(storage, log_root, checkpoint_hint, time_travel_version).await } // factored out for testing - pub(crate) fn for_snapshot_impl( + pub(crate) async fn for_snapshot_impl( storage: &dyn StorageHandler, log_root: Url, checkpoint_hint: Option, @@ -160,7 +160,7 @@ impl LogSegment { ) -> DeltaResult { let listed_files = match (checkpoint_hint, time_travel_version) { (Some(cp), None) => { - ListedLogFiles::list_with_checkpoint_hint(&cp, storage, &log_root, None)? + ListedLogFiles::list_with_checkpoint_hint(&cp, storage, &log_root, None).await? } (Some(cp), Some(end_version)) if cp.version <= end_version => { ListedLogFiles::list_with_checkpoint_hint( @@ -168,9 +168,9 @@ impl LogSegment { storage, &log_root, Some(end_version), - )? + ).await? } - _ => ListedLogFiles::list(storage, &log_root, None, time_travel_version)?, + _ => ListedLogFiles::list(storage, &log_root, None, time_travel_version).await?, }; LogSegment::try_new(listed_files, log_root, time_travel_version) @@ -181,7 +181,7 @@ impl LogSegment { /// between versions `start_version` (inclusive) and `end_version` (inclusive). If no `end_version` /// is specified it will be the most recent version by default. #[internal_api] - pub(crate) fn for_table_changes( + pub(crate) async fn for_table_changes( storage: &dyn StorageHandler, log_root: Url, start_version: Version, @@ -198,7 +198,7 @@ impl LogSegment { // TODO: compactions? let listed_files = - ListedLogFiles::list_commits(storage, &log_root, Some(start_version), end_version)?; + ListedLogFiles::list_commits(storage, &log_root, Some(start_version), end_version).await?; // - Here check that the start version is correct. // - [`LogSegment::try_new`] will verify that the `end_version` is correct if present. // - [`ListedLogFiles::list_commits`] also checks that there are no gaps between commits. @@ -223,7 +223,7 @@ impl LogSegment { /// // This lists all files starting from `end-limit` if `limit` is defined. For large tables, // listing with a `limit` can be a significant speedup over listing _all_ the files in the log. - pub(crate) fn for_timestamp_conversion( + pub(crate) async fn for_timestamp_conversion( storage: &dyn StorageHandler, log_root: Url, end_version: Version, @@ -242,7 +242,7 @@ impl LogSegment { // this is a list of commits with possible gaps, we want to take the latest contiguous // chunk of commits let mut listed_commits = - ListedLogFiles::list_commits(storage, &log_root, start_from, Some(end_version))?; + ListedLogFiles::list_commits(storage, &log_root, start_from, Some(end_version)).await?; // remove gaps - return latest contiguous chunk of commits let commits = &mut listed_commits.ascending_commit_files; diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 0ddab72af..3b7a14f78 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -44,8 +44,8 @@ use super::*; // type nulls min / max // txn.appId BINARY 0 "3ae45b72-24e1-865a-a211-3..." / "3ae45b72-24e1-865a-a211-3..." // txn.version INT64 0 "4390" / "4390" -#[test] -fn test_replay_for_metadata() { +#[tokio::test] +async fn test_replay_for_metadata() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/")); let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 8b06a18be..560f1edd9 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -1002,8 +1002,8 @@ mod tests { use super::*; - #[test] - fn test_static_skipping() { + #[tokio::test] + async fn test_static_skipping() { const NULL: Pred = Pred::null_literal(); let test_cases = [ (false, column_pred!("a")), @@ -1027,8 +1027,8 @@ mod tests { } } - #[test] - fn test_physical_predicate() { + #[tokio::test] + async fn test_physical_predicate() { let logical_schema = StructType::new_unchecked(vec![ StructField::nullable("a", DataType::LONG), StructField::nullable("b", DataType::LONG).with_metadata([( @@ -1202,14 +1202,14 @@ mod tests { Ok(files) } - #[test] - fn test_scan_metadata_paths() { + #[tokio::test] + async fn test_scan_metadata_paths() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).await.unwrap(); let scan = snapshot.scan_builder().build().unwrap(); let files = get_files_for_scan(scan, &engine).unwrap(); assert_eq!(files.len(), 1); @@ -1219,14 +1219,14 @@ mod tests { ); } - #[test_log::test] - fn test_scan_metadata() { + #[tokio::test] + async fn test_scan_metadata() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap(); + let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).await.unwrap(); let scan = snapshot.scan_builder().build().unwrap(); let files: Vec = scan.execute(engine).unwrap().try_collect().unwrap(); @@ -1235,14 +1235,14 @@ mod tests { assert_eq!(num_rows, 10) } - #[test_log::test] - fn test_scan_metadata_from_same_version() { + #[tokio::test] + async fn test_scan_metadata_from_same_version() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap(); + let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).await.unwrap(); let version = snapshot.version(); let scan = snapshot.scan_builder().build().unwrap(); let files: Vec<_> = scan @@ -1270,8 +1270,8 @@ mod tests { // reading v0 with 3 files. // updating to v1 with 3 more files added. - #[test_log::test] - fn test_scan_metadata_from_with_update() { + #[tokio::test] + async fn test_scan_metadata_from_with_update() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/basic_partitioned/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = Arc::new(SyncEngine::new()); @@ -1279,7 +1279,7 @@ mod tests { let snapshot = Snapshot::builder_for(url.clone()) .at_version(0) .build(engine.as_ref()) - .unwrap(); + .await.unwrap(); let scan = snapshot.scan_builder().build().unwrap(); let files: Vec<_> = scan .scan_metadata(engine.as_ref()) @@ -1303,7 +1303,7 @@ mod tests { let snapshot = Snapshot::builder_for(url) .at_version(1) .build(engine.as_ref()) - .unwrap(); + .await.unwrap(); let scan = snapshot.scan_builder().build().unwrap(); let new_files: Vec<_> = scan .scan_metadata_from(engine.as_ref(), 0, files, None) @@ -1322,8 +1322,8 @@ mod tests { assert_eq!(new_files[1].num_rows(), 3); } - #[test] - fn test_get_partition_value() { + #[tokio::test] + async fn test_get_partition_value() { let cases = [ ( "string", @@ -1366,13 +1366,13 @@ mod tests { } } - #[test] - fn test_replay_for_scan_metadata() { + #[tokio::test] + async fn test_replay_for_scan_metadata() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/")); let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).await.unwrap(); let scan = snapshot.scan_builder().build().unwrap(); let data: Vec<_> = scan .replay_for_scan_metadata(&engine) @@ -1386,13 +1386,13 @@ mod tests { assert_eq!(data.len(), 5); } - #[test] - fn test_data_row_group_skipping() { + #[tokio::test] + async fn test_data_row_group_skipping() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/")); let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap(); + let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).await.unwrap(); // No predicate pushdown attempted, so the one data file should be returned. // @@ -1429,13 +1429,13 @@ mod tests { assert_eq!(data.len(), 1); } - #[test] - fn test_missing_column_row_group_skipping() { + #[tokio::test] + async fn test_missing_column_row_group_skipping() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/")); let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap(); + let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).await.unwrap(); // Predicate over a logically valid but physically missing column. No data files should be // returned because the column is inferred to be all-null. @@ -1461,8 +1461,8 @@ mod tests { .expect_err("unknown column"); } - #[test_log::test] - fn test_scan_with_checkpoint() -> DeltaResult<()> { + #[tokio::test] + async fn test_scan_with_checkpoint() -> DeltaResult<()> { let path = std::fs::canonicalize(PathBuf::from( "./tests/data/with_checkpoint_no_last_checkpoint/", ))?; @@ -1470,7 +1470,7 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).await.unwrap(); let scan = snapshot.scan_builder().build()?; let files = get_files_for_scan(scan, &engine)?; // test case: diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 3b41383d0..4c75466cb 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -97,7 +97,7 @@ impl Snapshot { /// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you /// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the /// snapshot to a later version. - fn try_new_from( + async fn try_new_from( existing_snapshot: SnapshotRef, engine: &dyn Engine, version: impl Into>, @@ -130,7 +130,7 @@ impl Snapshot { &log_root, Some(listing_start), new_version, - )?; + ).await?; // NB: we need to check both checkpoints and commits since we filter commits at and below // the checkpoint version. Example: if we have a checkpoint + commit at version 1, the log @@ -414,8 +414,8 @@ mod tests { use crate::utils::test_utils::string_array_to_engine_data; use test_utils::{add_commit, delta_path_for_version}; - #[test] - fn test_snapshot_read_metadata() { + #[tokio::test] + async fn test_snapshot_read_metadata() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); @@ -424,7 +424,7 @@ mod tests { let snapshot = Snapshot::builder_for(url) .at_version(1) .build(&engine) - .unwrap(); + .await.unwrap(); let expected = Protocol::try_new(3, 7, Some(["deletionVectors"]), Some(["deletionVectors"])).unwrap(); @@ -435,14 +435,14 @@ mod tests { assert_eq!(snapshot.schema(), expected); } - #[test] - fn test_new_snapshot() { + #[tokio::test] + async fn test_new_snapshot() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).await.unwrap(); let expected = Protocol::try_new(3, 7, Some(["deletionVectors"]), Some(["deletionVectors"])).unwrap(); @@ -484,11 +484,11 @@ mod tests { let old_snapshot = Snapshot::builder_for(url.clone()) .at_version(1) .build(&engine) - .unwrap(); + .await.unwrap(); // 1. new version < existing version: error let snapshot_res = Snapshot::builder_from(old_snapshot.clone()) .at_version(0) - .build(&engine); + .build(&engine).await; assert!(matches!( snapshot_res, Err(Error::Generic(msg)) if msg == "Requested snapshot version 0 is older than snapshot hint version 1" @@ -498,7 +498,7 @@ mod tests { let snapshot = Snapshot::builder_from(old_snapshot.clone()) .at_version(1) .build(&engine) - .unwrap(); + .await.unwrap(); let expected = old_snapshot.clone(); assert_eq!(snapshot, expected); @@ -511,18 +511,18 @@ mod tests { // - commit 1 -> final snapshots at this version // // in each test we will modify versions 1 and 2 to test different scenarios - fn test_new_from(store: Arc) -> DeltaResult<()> { + async fn test_new_from(store: Arc) -> DeltaResult<()> { let url = Url::parse("memory:///")?; let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); let base_snapshot = Snapshot::builder_for(url.clone()) .at_version(0) - .build(&engine)?; + .build(&engine).await?; let snapshot = Snapshot::builder_from(base_snapshot.clone()) .at_version(1) - .build(&engine)?; + .build(&engine).await?; let expected = Snapshot::builder_for(url.clone()) .at_version(1) - .build(&engine)?; + .build(&engine).await?; assert_eq!(snapshot, expected); Ok(()) } @@ -569,15 +569,15 @@ mod tests { ); let base_snapshot = Snapshot::builder_for(url.clone()) .at_version(0) - .build(&engine)?; - let snapshot = Snapshot::builder_from(base_snapshot.clone()).build(&engine)?; + .build(&engine).await?; + let snapshot = Snapshot::builder_from(base_snapshot.clone()).build(&engine).await?; let expected = Snapshot::builder_for(url.clone()) .at_version(0) - .build(&engine)?; + .build(&engine).await?; assert_eq!(snapshot, expected); // version exceeds latest version of the table = err assert!(matches!( - Snapshot::builder_from(base_snapshot.clone()).at_version(1).build(&engine), + Snapshot::builder_from(base_snapshot.clone()).at_version(1).build(&engine).await, Err(Error::Generic(msg)) if msg == "Requested snapshot version 1 is newer than the latest version 0" )); @@ -621,7 +621,7 @@ mod tests { ) .await .unwrap(); - test_new_from(store_3a.into())?; + test_new_from(store_3a.into()).await?; // c. log segment for old..=new version has no checkpoint // i. commits have (new protocol, new metadata) @@ -635,16 +635,16 @@ mod tests { }); commit1[2]["partitionColumns"] = serde_json::to_value(["some_partition_column"])?; commit(store_3c_i.as_ref(), 1, commit1).await; - test_new_from(store_3c_i.clone())?; + test_new_from(store_3c_i.clone()).await?; // new commits AND request version > end of log let url = Url::parse("memory:///")?; let engine = DefaultEngine::new(store_3c_i, Arc::new(TokioBackgroundExecutor::new())); let base_snapshot = Snapshot::builder_for(url.clone()) .at_version(0) - .build(&engine)?; + .build(&engine).await?; assert!(matches!( - Snapshot::builder_from(base_snapshot.clone()).at_version(2).build(&engine), + Snapshot::builder_from(base_snapshot.clone()).at_version(2).build(&engine).await, Err(Error::Generic(msg)) if msg == "LogSegment end version 1 not the same as the specified end version 2" )); @@ -659,7 +659,7 @@ mod tests { }); commit1.remove(2); // remove metadata commit(&store_3c_ii, 1, commit1).await; - test_new_from(store_3c_ii.into())?; + test_new_from(store_3c_ii.into()).await?; // iii. commits have (no protocol, new metadata) let store_3c_iii = store.fork(); @@ -667,13 +667,13 @@ mod tests { commit1[2]["partitionColumns"] = serde_json::to_value(["some_partition_column"])?; commit1.remove(1); // remove protocol commit(&store_3c_iii, 1, commit1).await; - test_new_from(store_3c_iii.into())?; + test_new_from(store_3c_iii.into()).await?; // iv. commits have (no protocol, no metadata) let store_3c_iv = store.fork(); let commit1 = vec![commit0[0].clone()]; commit(&store_3c_iv, 1, commit1).await; - test_new_from(store_3c_iv.into())?; + test_new_from(store_3c_iv.into()).await?; Ok(()) } @@ -751,15 +751,15 @@ mod tests { // base snapshot is at version 0 let base_snapshot = Snapshot::builder_for(url.clone()) .at_version(0) - .build(&engine)?; + .build(&engine).await?; // first test: no new crc let snapshot = Snapshot::builder_from(base_snapshot.clone()) .at_version(1) - .build(&engine)?; + .build(&engine).await?; let expected = Snapshot::builder_for(url.clone()) .at_version(1) - .build(&engine)?; + .build(&engine).await?; assert_eq!(snapshot, expected); assert_eq!( snapshot @@ -785,10 +785,10 @@ mod tests { store.put(&path, crc.to_string().into()).await?; let snapshot = Snapshot::builder_from(base_snapshot.clone()) .at_version(1) - .build(&engine)?; + .build(&engine).await?; let expected = Snapshot::builder_for(url.clone()) .at_version(1) - .build(&engine)?; + .build(&engine).await?; assert_eq!(snapshot, expected); assert_eq!( snapshot @@ -803,8 +803,8 @@ mod tests { Ok(()) } - #[test] - fn test_read_table_with_missing_last_checkpoint() { + #[tokio::test] + async fn test_read_table_with_missing_last_checkpoint() { // this table doesn't have a _last_checkpoint file let path = std::fs::canonicalize(PathBuf::from( "./tests/data/table-with-dv-small/_delta_log/", @@ -815,7 +815,7 @@ mod tests { let store = Arc::new(LocalFileSystem::new()); let executor = Arc::new(TokioBackgroundExecutor::new()); let storage = ObjectStoreStorageHandler::new(store, executor); - let cp = LastCheckpointHint::try_read(&storage, &url).unwrap(); + let cp = LastCheckpointHint::try_read(&storage, &url).await.unwrap(); assert!(cp.is_none()); } @@ -823,8 +823,8 @@ mod tests { r#"{"size":8,"sizeInBytes":21857,"version":1}"#.as_bytes().to_vec() } - #[test] - fn test_read_table_with_empty_last_checkpoint() { + #[tokio::test] + async fn test_read_table_with_empty_last_checkpoint() { // in memory file system let store = Arc::new(InMemory::new()); @@ -844,12 +844,12 @@ mod tests { let executor = Arc::new(TokioBackgroundExecutor::new()); let storage = ObjectStoreStorageHandler::new(store, executor); let url = Url::parse("memory:///invalid/").expect("valid url"); - let invalid = LastCheckpointHint::try_read(&storage, &url).expect("read last checkpoint"); + let invalid = LastCheckpointHint::try_read(&storage, &url).await.expect("read last checkpoint"); assert!(invalid.is_none()) } - #[test] - fn test_read_table_with_last_checkpoint() { + #[tokio::test] + async fn test_read_table_with_last_checkpoint() { // in memory file system let store = Arc::new(InMemory::new()); @@ -875,9 +875,9 @@ mod tests { let executor = Arc::new(TokioBackgroundExecutor::new()); let storage = ObjectStoreStorageHandler::new(store, executor); let url = Url::parse("memory:///valid/").expect("valid url"); - let valid = LastCheckpointHint::try_read(&storage, &url).expect("read last checkpoint"); + let valid = LastCheckpointHint::try_read(&storage, &url).await.expect("read last checkpoint"); let url = Url::parse("memory:///invalid/").expect("valid url"); - let invalid = LastCheckpointHint::try_read(&storage, &url).expect("read last checkpoint"); + let invalid = LastCheckpointHint::try_read(&storage, &url).await.expect("read last checkpoint"); let expected = LastCheckpointHint { version: 1, size: 8, @@ -891,15 +891,15 @@ mod tests { assert!(invalid.is_none()); } - #[test_log::test] - fn test_read_table_with_checkpoint() { + #[tokio::test] + async fn test_read_table_with_checkpoint() { let path = std::fs::canonicalize(PathBuf::from( "./tests/data/with_checkpoint_no_last_checkpoint/", )) .unwrap(); let location = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder_for(location).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(location).build(&engine).await.unwrap(); assert_eq!(snapshot.log_segment.checkpoint_parts.len(), 1); assert_eq!( @@ -1006,7 +1006,7 @@ mod tests { .join("\n"); add_commit(store.as_ref(), 1, commit).await.unwrap(); - let snapshot = Snapshot::builder_for(url.clone()).build(&engine)?; + let snapshot = Snapshot::builder_for(url.clone()).build(&engine).await?; assert_eq!(snapshot.get_domain_metadata("domain1", &engine)?, None); assert_eq!( @@ -1025,14 +1025,14 @@ mod tests { Ok(()) } - #[test] - fn test_log_compaction_writer() { + #[tokio::test] + async fn test_log_compaction_writer() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).await.unwrap(); // Test creating a log compaction writer let writer = snapshot.clone().log_compaction_writer(0, 1).unwrap(); diff --git a/kernel/src/snapshot/builder.rs b/kernel/src/snapshot/builder.rs index aa0cbc772..ce6e1aaaa 100644 --- a/kernel/src/snapshot/builder.rs +++ b/kernel/src/snapshot/builder.rs @@ -65,13 +65,13 @@ impl SnapshotBuilder { /// # Parameters /// /// - `engine`: Implementation of [`Engine`] apis. - pub fn build(self, engine: &dyn Engine) -> DeltaResult { + pub async fn build(self, engine: &dyn Engine) -> DeltaResult { if let Some(table_root) = self.table_root { let log_segment = LogSegment::for_snapshot( engine.storage_handler().as_ref(), table_root.join("_delta_log/")?, self.version, - )?; + ).await?; Ok(Snapshot::try_new_from_log_segment(table_root, log_segment, engine)?.into()) } else { let existing_snapshot = self.existing_snapshot.ok_or_else(|| { @@ -79,7 +79,7 @@ impl SnapshotBuilder { "SnapshotBuilder should have either table_root or existing_snapshot", ) })?; - Snapshot::try_new_from(existing_snapshot, engine, self.version) + Snapshot::try_new_from(existing_snapshot, engine, self.version).await } } } @@ -177,18 +177,18 @@ mod tests { Ok(()) } - #[test] - fn test_snapshot_builder() -> Result<(), Box> { + #[tokio::test] + async fn test_snapshot_builder() -> Result<(), Box> { let (engine, store, table_root) = setup_test(); let engine = engine.as_ref(); create_table(&store, &table_root)?; - let snapshot = SnapshotBuilder::new_for(table_root.clone()).build(engine)?; + let snapshot = SnapshotBuilder::new_for(table_root.clone()).build(engine).await?; assert_eq!(snapshot.version(), 1); let snapshot = SnapshotBuilder::new_for(table_root.clone()) .at_version(0) - .build(engine)?; + .build(engine).await?; assert_eq!(snapshot.version(), 0); Ok(()) diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 63f787810..fe21bb1f4 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -25,7 +25,7 @@ use crate::table_properties::TableProperties; use crate::utils::require; use crate::{DeltaResult, Engine, EngineData, Error, PredicateRef, RowVisitor}; -use itertools::Itertools; +use futures::{stream, Stream, StreamExt, TryStreamExt}; #[cfg(test)] mod tests; @@ -43,28 +43,34 @@ pub(crate) struct TableChangesScanMetadata { pub(crate) remove_dvs: Arc>, } -/// Given an iterator of [`ParsedLogPath`] returns an iterator of [`TableChangesScanMetadata`]. +/// Given an iterator of [`ParsedLogPath`] returns a stream of [`TableChangesScanMetadata`]. /// Each row that is selected in the returned `TableChangesScanMetadata.scan_metadata` (according /// to the `selection_vector` field) _must_ be processed to complete the scan. Non-selected /// rows _must_ be ignored. /// /// Note: The [`ParsedLogPath`]s in the `commit_files` iterator must be ordered, contiguous /// (JSON) commit files. -pub(crate) fn table_changes_action_iter( +pub(crate) async fn table_changes_action_iter( engine: Arc, commit_files: impl IntoIterator, table_schema: SchemaRef, physical_predicate: Option<(PredicateRef, SchemaRef)>, -) -> DeltaResult>> { +) -> DeltaResult>> { let filter = DataSkippingFilter::new(engine.as_ref(), physical_predicate).map(Arc::new); - let result = commit_files - .into_iter() - .map(move |commit_file| -> DeltaResult<_> { - let scanner = LogReplayScanner::try_new(engine.as_ref(), commit_file, &table_schema)?; - scanner.into_scan_batches(engine.clone(), filter.clone()) - }) //Iterator-Result-Iterator-Result - .flatten_ok() // Iterator-Result-Result - .map(|x| x?); // Iterator-Result + + // Convert the commit_files iterator into a stream and process them + let result = stream::iter(commit_files) + .then(move |commit_file| { + let engine = engine.clone(); + let table_schema = table_schema.clone(); + let filter = filter.clone(); + async move { + let scanner = LogReplayScanner::try_new(engine.as_ref(), commit_file, &table_schema).await?; + Ok::<_, crate::Error>(scanner.into_scan_batches(engine, filter).await?) + } + }) + .try_flatten(); + Ok(result) } @@ -142,7 +148,7 @@ impl LogReplayScanner { /// 3. Perform validation on each protocol and metadata action in the commit. /// /// For more details, see the documentation for [`LogReplayScanner`]. - fn try_new( + async fn try_new( engine: &dyn Engine, commit_file: ParsedLogPath, table_schema: &SchemaRef, @@ -212,14 +218,14 @@ impl LogReplayScanner { remove_dvs, }) } - /// Generates an iterator of [`TableChangesScanMetadata`] by iterating over each action of the + /// Generates a stream of [`TableChangesScanMetadata`] by iterating over each action of the /// commit, generating a selection vector, and transforming the engine data. This performs /// phase 2 of [`LogReplayScanner`]. - fn into_scan_batches( + async fn into_scan_batches( self, engine: Arc, filter: Option>, - ) -> DeltaResult>> { + ) -> DeltaResult>> { let Self { has_cdc_action, remove_dvs, @@ -245,7 +251,8 @@ impl LogReplayScanner { cdf_scan_row_schema().into(), ); - let result = action_iter.map(move |actions| -> DeltaResult<_> { + // Convert the iterator to a stream and process each batch + let result = futures::stream::iter(action_iter).map(move |actions| -> DeltaResult<_> { let actions = actions?; // Apply data skipping to get back a selection vector for actions that passed skipping. diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index e9d830b5c..8634cc4c4 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -16,6 +16,7 @@ use crate::Predicate; use crate::{DeltaResult, Engine, Error, Version}; use itertools::Itertools; +use futures::{StreamExt, TryStreamExt}; use std::collections::HashMap; use std::path::Path; use std::sync::Arc; @@ -27,7 +28,7 @@ fn get_schema() -> StructType { ]) } -fn get_segment( +async fn get_segment( engine: &dyn Engine, path: &Path, start_version: Version, @@ -40,14 +41,15 @@ fn get_segment( log_root, start_version, end_version, - )?; + ).await?; Ok(log_segment.ascending_commit_files) } -fn result_to_sv(iter: impl Iterator>) -> Vec { - iter.map_ok(|scan_metadata| scan_metadata.selection_vector.into_iter()) - .flatten_ok() +async fn result_to_sv(stream: impl futures::Stream>) -> Vec { + stream.map_ok(|scan_metadata| futures::stream::iter(scan_metadata.selection_vector.into_iter().map(Ok::<_, Error>))) + .try_flatten() .try_collect() + .await .unwrap() } @@ -82,13 +84,13 @@ async fn metadata_protocol() { ]) .await; - let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None).await .unwrap() .into_iter(); let scan_batches = - table_changes_action_iter(engine, commits, get_schema().into(), None).unwrap(); - let sv = result_to_sv(scan_batches); + table_changes_action_iter(engine, commits, get_schema().into(), None).await.unwrap(); + let sv = result_to_sv(scan_batches).await; assert_eq!(sv, &[false, false]); } #[tokio::test] @@ -107,14 +109,15 @@ async fn cdf_not_enabled() { })]) .await; - let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None).await .unwrap() .into_iter(); let res: DeltaResult> = table_changes_action_iter(engine, commits, get_schema().into(), None) - .unwrap() - .try_collect(); + .await.unwrap() + .try_collect() + .await; assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_)))); } @@ -135,14 +138,15 @@ async fn unsupported_reader_feature() { )]) .await; - let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None).await .unwrap() .into_iter(); let res: DeltaResult> = table_changes_action_iter(engine, commits, get_schema().into(), None) - .unwrap() - .try_collect(); + .await.unwrap() + .try_collect() + .await; assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_)))); } @@ -166,14 +170,15 @@ async fn column_mapping_should_fail() { })]) .await; - let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None).await .unwrap() .into_iter(); let res: DeltaResult> = table_changes_action_iter(engine, commits, get_schema().into(), None) - .unwrap() - .try_collect(); + .await.unwrap() + .try_collect() + .await; assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_)))); } @@ -197,14 +202,15 @@ async fn incompatible_schemas_fail() { })]) .await; - let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None).await .unwrap() .into_iter(); let res: DeltaResult> = table_changes_action_iter(engine, commits, cdf_schema.into(), None) - .unwrap() - .try_collect(); + .await.unwrap() + .try_collect() + .await; assert!(matches!( res, @@ -287,18 +293,20 @@ async fn add_remove() { ]) .await; - let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None).await .unwrap() .into_iter(); let sv = table_changes_action_iter(engine, commits, get_schema().into(), None) - .unwrap() + .await.unwrap() .flat_map(|scan_metadata| { let scan_metadata = scan_metadata.unwrap(); assert_eq!(scan_metadata.remove_dvs, HashMap::new().into()); - scan_metadata.selection_vector + futures::stream::iter(scan_metadata.selection_vector.into_iter().map(Ok::<_, Error>)) }) - .collect_vec(); + .try_collect::>() + .await + .unwrap(); assert_eq!(sv, &[true, true]); } @@ -337,18 +345,20 @@ async fn filter_data_change() { ]) .await; - let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None).await .unwrap() .into_iter(); let sv = table_changes_action_iter(engine, commits, get_schema().into(), None) - .unwrap() + .await.unwrap() .flat_map(|scan_metadata| { let scan_metadata = scan_metadata.unwrap(); assert_eq!(scan_metadata.remove_dvs, HashMap::new().into()); - scan_metadata.selection_vector + futures::stream::iter(scan_metadata.selection_vector.into_iter().map(Ok::<_, Error>)) }) - .collect_vec(); + .try_collect::>() + .await + .unwrap(); assert_eq!(sv, &[false; 5]); } @@ -383,18 +393,20 @@ async fn cdc_selection() { ]) .await; - let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None).await .unwrap() .into_iter(); let sv = table_changes_action_iter(engine, commits, get_schema().into(), None) - .unwrap() + .await.unwrap() .flat_map(|scan_metadata| { let scan_metadata = scan_metadata.unwrap(); assert_eq!(scan_metadata.remove_dvs, HashMap::new().into()); - scan_metadata.selection_vector + futures::stream::iter(scan_metadata.selection_vector.into_iter().map(Ok::<_, Error>)) }) - .collect_vec(); + .try_collect::>() + .await + .unwrap(); assert_eq!(sv, &[true, false, true, true]); } @@ -442,7 +454,7 @@ async fn dv() { ]) .await; - let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None).await .unwrap() .into_iter(); @@ -454,13 +466,15 @@ async fn dv() { )]) .into(); let sv = table_changes_action_iter(engine, commits, get_schema().into(), None) - .unwrap() + .await.unwrap() .flat_map(|scan_metadata| { let scan_metadata = scan_metadata.unwrap(); assert_eq!(scan_metadata.remove_dvs, expected_remove_dvs); - scan_metadata.selection_vector + futures::stream::iter(scan_metadata.selection_vector.into_iter().map(Ok::<_, Error>)) }) - .collect_vec(); + .try_collect::>() + .await + .unwrap(); assert_eq!(sv, &[false, true, true]); } @@ -526,17 +540,19 @@ async fn data_skipping_filter() { Ok(PhysicalPredicate::Some(p, s)) => Some((p, s)), other => panic!("Unexpected result: {other:?}"), }; - let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None).await .unwrap() .into_iter(); let sv = table_changes_action_iter(engine, commits, logical_schema.into(), predicate) - .unwrap() + .await.unwrap() .flat_map(|scan_metadata| { let scan_metadata = scan_metadata.unwrap(); - scan_metadata.selection_vector + futures::stream::iter(scan_metadata.selection_vector.into_iter().map(Ok::<_, Error>)) }) - .collect_vec(); + .try_collect::>() + .await + .unwrap(); // Note: since the first pair is a dv operation, remove action will always be filtered assert_eq!(sv, &[false, true, false, false, true]); @@ -571,14 +587,15 @@ async fn failing_protocol() { ]) .await; - let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None).await .unwrap() .into_iter(); let res: DeltaResult> = table_changes_action_iter(engine, commits, get_schema().into(), None) - .unwrap() - .try_collect(); + .await.unwrap() + .try_collect() + .await; assert_result_error_with_message( res, @@ -599,12 +616,12 @@ async fn file_meta_timestamp() { })]) .await; - let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None).await .unwrap() .into_iter(); let commit = commits.next().unwrap(); let file_meta_ts = commit.location.last_modified; - let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap(); + let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).await.unwrap(); assert_eq!(scanner.timestamp, file_meta_ts); } diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index f39fd7974..cb17a62b8 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -133,7 +133,7 @@ impl TableChanges { /// - `start_version`: The start version of the change data feed /// - `end_version`: The end version (inclusive) of the change data feed. If this is none, this /// defaults to the newest table version. - pub fn try_new( + pub async fn try_new( table_root: Url, engine: &dyn Engine, start_version: Version, @@ -145,19 +145,19 @@ impl TableChanges { log_root, start_version, end_version, - )?; + ).await?; // Both snapshots ensure that reading is supported at the start and end version using // `ensure_read_supported`. Note that we must still verify that reading is // supported for every protocol action in the CDF range. let start_snapshot = Snapshot::builder_for(table_root.as_url().clone()) .at_version(start_version) - .build(engine)?; + .build(engine).await?; let end_snapshot = match end_version { Some(version) => Snapshot::builder_from(start_snapshot.clone()) .at_version(version) - .build(engine)?, - None => Snapshot::builder_from(start_snapshot.clone()).build(engine)?, + .build(engine).await?, + None => Snapshot::builder_from(start_snapshot.clone()).build(engine).await?, }; // Verify CDF is enabled at the beginning and end of the interval using @@ -285,8 +285,8 @@ mod tests { use crate::Error; use itertools::assert_equal; - #[test] - fn table_changes_checks_enable_cdf_flag() { + #[tokio::test] + async fn table_changes_checks_enable_cdf_flag() { // Table with CDF enabled, then disabled at version 2 and enabled at version 3 let path = "./tests/data/table-with-cdf"; let engine = Box::new(SyncEngine::new()); @@ -300,7 +300,7 @@ mod tests { start_version, end_version.into(), ) - .unwrap(); + .await.unwrap(); assert_eq!(table_changes.start_version, start_version); assert_eq!(table_changes.end_version(), end_version); } @@ -312,24 +312,24 @@ mod tests { engine.as_ref(), start_version, end_version.into(), - ); + ).await; assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_)))) } } - #[test] - fn schema_evolution_fails() { + #[tokio::test] + async fn schema_evolution_fails() { let path = "./tests/data/table-with-cdf"; let engine = Box::new(SyncEngine::new()); let url = delta_kernel::try_parse_uri(path).unwrap(); let expected_msg = "Failed to build TableChanges: Start and end version schemas are different. Found start version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: true, metadata: {} }}, metadata_columns: {} } and end version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: false, metadata: {} }}, metadata_columns: {} }"; // A field in the schema goes from being nullable to non-nullable - let table_changes_res = TableChanges::try_new(url, engine.as_ref(), 3, Some(4)); + let table_changes_res = TableChanges::try_new(url, engine.as_ref(), 3, Some(4)).await; assert!(matches!(table_changes_res, Err(Error::Generic(msg)) if msg == expected_msg)); } - #[test] - fn table_changes_has_cdf_schema() { + #[tokio::test] + async fn table_changes_has_cdf_schema() { let path = "./tests/data/table-with-cdf"; let engine = Box::new(SyncEngine::new()); let url = delta_kernel::try_parse_uri(path).unwrap(); @@ -341,7 +341,7 @@ mod tests { .chain(CDF_FIELDS.clone()); let table_changes = - TableChanges::try_new(url.clone(), engine.as_ref(), 0, 0.into()).unwrap(); + TableChanges::try_new(url.clone(), engine.as_ref(), 0, 0.into()).await.unwrap(); assert_equal(expected_schema, table_changes.schema().fields().cloned()); } } diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 8cfe627bd..795e92490 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use itertools::Itertools; +use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; use tracing::debug; use url::Url; @@ -180,16 +181,16 @@ impl TableChangesScanBuilder { } impl TableChangesScan { - /// Returns an iterator of [`TableChangesScanMetadata`] necessary to read CDF. Each row + /// Returns a stream of [`TableChangesScanMetadata`] necessary to read CDF. Each row /// represents an action in the delta log. These rows are filtered to yield only the actions /// necessary to read CDF. Additionally, [`TableChangesScanMetadata`] holds metadata on the /// deletion vectors present in the commit. The engine data in each scan metadata is guaranteed /// to belong to the same commit. Several [`TableChangesScanMetadata`] may belong to the same /// commit. - fn scan_metadata( + async fn scan_metadata( &self, engine: Arc, - ) -> DeltaResult>> { + ) -> DeltaResult>> { let commits = self .table_changes .log_segment @@ -197,13 +198,19 @@ impl TableChangesScan { .clone(); // NOTE: This is a cheap arc clone let physical_predicate = match self.physical_predicate.clone() { - PhysicalPredicate::StaticSkipAll => return Ok(None.into_iter().flatten()), + PhysicalPredicate::StaticSkipAll => None, // We'll handle this after getting the stream PhysicalPredicate::Some(predicate, schema) => Some((predicate, schema)), PhysicalPredicate::None => None, }; let schema = self.table_changes.end_snapshot.schema(); - let it = table_changes_action_iter(engine, commits, schema, physical_predicate)?; - Ok(Some(it).into_iter().flatten()) + let stream = table_changes_action_iter(engine, commits, schema, physical_predicate).await?; + + // Check if we should return an empty stream + if matches!(self.physical_predicate, PhysicalPredicate::StaticSkipAll) { + Ok(futures::stream::empty().boxed()) + } else { + Ok(stream.boxed()) + } } /// Get a shared reference to the logical [`Schema`] of the table changes scan. @@ -234,47 +241,59 @@ impl TableChangesScan { } /// Perform an "all in one" scan to get the change data feed. This will use the provided `engine` - /// to read and process all the data for the query. Each [`ScanResult`] in the resultant iterator + /// to read and process all the data for the query. Each [`ScanResult`] in the resultant stream /// encapsulates the raw data and an optional boolean vector built from the deletion vector if it /// was present. See the documentation for [`ScanResult`] for more details. - pub fn execute( + pub async fn execute( &self, engine: Arc, - ) -> DeltaResult> + use<'_>> { - let scan_metadata = self.scan_metadata(engine.clone())?; + ) -> DeltaResult> + use<'_>> { + let scan_metadata = self.scan_metadata(engine.clone()).await?; let scan_files = scan_metadata_to_scan_file(scan_metadata); let table_root = self.table_changes.table_root().clone(); let all_fields = self.all_fields.clone(); let physical_predicate = self.physical_predicate(); let dv_engine_ref = engine.clone(); + let logical_schema = self.logical_schema().clone(); + let physical_schema = self.physical_schema().clone(); + let table_root_for_read = table_root.clone(); let result = scan_files - .map(move |scan_file| { - resolve_scan_file_dv(dv_engine_ref.as_ref(), &table_root, scan_file?) - }) // Iterator-Result-Iterator - .flatten_ok() // Iterator-Result - .map(move |resolved_scan_file| -> DeltaResult<_> { - read_scan_file( - engine.as_ref(), - resolved_scan_file?, - self.table_root(), - self.logical_schema(), - self.physical_schema(), - &all_fields, - physical_predicate.clone(), - ) - }) // Iterator-Result-Iterator-Result - .flatten_ok() // Iterator-Result-Result - .map(|x| x?); // Iterator-Result + .map(move |scan_file| -> DeltaResult<_> { + let scan_file = scan_file?; + let resolved_files = resolve_scan_file_dv(dv_engine_ref.as_ref(), &table_root, scan_file)?; + Ok(futures::stream::iter(resolved_files.map(Ok))) + }) // Stream-Result-Stream + .try_flatten() // Stream-Result + .and_then(move |resolved_scan_file| { + let engine = engine.clone(); + let logical_schema = logical_schema.clone(); + let physical_schema = physical_schema.clone(); + let all_fields = all_fields.clone(); + let physical_predicate = physical_predicate.clone(); + let table_root = table_root_for_read.clone(); + async move { + read_scan_file( + engine.as_ref(), + resolved_scan_file, + &table_root, + &logical_schema, + &physical_schema, + &all_fields, + physical_predicate, + ).await + } + }) // Stream-Result-Stream-Result + .try_flatten(); // Stream-Result Ok(result) } } /// Reads the data at the `resolved_scan_file` and transforms the data from physical to logical. -/// The result is a fallible iterator of [`ScanResult`] containing the logical data. -fn read_scan_file( +/// The result is a fallible stream of [`ScanResult`] containing the logical data. +async fn read_scan_file( engine: &dyn Engine, resolved_scan_file: ResolvedCdfScanFile, table_root: &Url, @@ -282,7 +301,7 @@ fn read_scan_file( physical_schema: &SchemaRef, all_fields: &[ColumnType], _physical_predicate: Option, -) -> DeltaResult>> { +) -> DeltaResult>> { let ResolvedCdfScanFile { scan_file, mut selection_vector, @@ -311,7 +330,7 @@ fn read_scan_file( .parquet_handler() .read_parquet_files(&[file], physical_schema, None)?; - let result = read_result_iter.map(move |batch| -> DeltaResult<_> { + let result = futures::stream::iter(read_result_iter).map(move |batch| -> DeltaResult<_> { let batch = batch?; // to transform the physical data into the correct logical form let logical = phys_to_logical_eval.evaluate(batch.as_ref()); @@ -369,14 +388,14 @@ mod tests { use crate::transforms::ColumnType; use crate::Predicate; - #[test] - fn simple_table_changes_scan_builder() { + #[tokio::test] + async fn simple_table_changes_scan_builder() { let path = "./tests/data/table-with-cdf"; let engine = Box::new(SyncEngine::new()); let url = delta_kernel::try_parse_uri(path).unwrap(); // A field in the schema goes from being nullable to non-nullable - let table_changes = TableChanges::try_new(url, engine.as_ref(), 0, Some(1)).unwrap(); + let table_changes = TableChanges::try_new(url, engine.as_ref(), 0, Some(1)).await.unwrap(); let scan = table_changes.into_scan_builder().build().unwrap(); // Note that this table is not partitioned. `part` is a regular field @@ -394,14 +413,14 @@ mod tests { assert_eq!(scan.physical_predicate, PhysicalPredicate::None); } - #[test] - fn projected_and_filtered_table_changes_scan_builder() { + #[tokio::test] + async fn projected_and_filtered_table_changes_scan_builder() { let path = "./tests/data/table-with-cdf"; let engine = Box::new(SyncEngine::new()); let url = delta_kernel::try_parse_uri(path).unwrap(); // A field in the schema goes from being nullable to non-nullable - let table_changes = TableChanges::try_new(url, engine.as_ref(), 0, Some(1)).unwrap(); + let table_changes = TableChanges::try_new(url, engine.as_ref(), 0, Some(1)).await.unwrap(); let schema = table_changes .schema() diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index 642e79b9b..ba33e4d60 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -2,7 +2,7 @@ //! metadata required to generate a change data feed. [`CdfScanFile`] can be constructed using //! [`CdfScanFileVisitor`]. The visitor reads from engine data with the schema [`cdf_scan_row_schema`]. //! You can convert engine data to this schema using the [`cdf_scan_row_expression`]. -use itertools::Itertools; +use futures::{StreamExt, TryStreamExt}; use std::collections::HashMap; use std::sync::{Arc, LazyLock}; @@ -47,19 +47,23 @@ pub(crate) struct CdfScanFile { pub(crate) type CdfScanCallback = fn(context: &mut T, scan_file: CdfScanFile); -/// Transforms an iterator of [`TableChangesScanMetadata`] into an iterator of +/// Transforms a stream of [`TableChangesScanMetadata`] into a stream of /// [`CdfScanFile`] by visiting the engine data. pub(crate) fn scan_metadata_to_scan_file( - scan_metadata: impl Iterator>, -) -> impl Iterator> { + scan_metadata: impl futures::Stream>, +) -> impl futures::Stream> { scan_metadata .map(|scan_metadata| -> DeltaResult<_> { let scan_metadata = scan_metadata?; let callback: CdfScanCallback> = |context, scan_file| context.push(scan_file); - Ok(visit_cdf_scan_files(&scan_metadata, vec![], callback)?.into_iter()) - }) // Iterator-Result-Iterator - .flatten_ok() // Iterator-Result + Ok(futures::stream::iter( + visit_cdf_scan_files(&scan_metadata, vec![], callback)? + .into_iter() + .map(Ok) + )) + }) // Stream-Result-Stream + .try_flatten() // Stream-Result } /// Request that the kernel call a callback on each valid file that needs to be read for the @@ -240,6 +244,7 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; + use futures::TryStreamExt; use itertools::Itertools; use super::{scan_metadata_to_scan_file, CdfScanFile, CdfScanFileType}; @@ -328,7 +333,7 @@ mod tests { let log_root = table_root.join("_delta_log/").unwrap(); let log_segment = LogSegment::for_table_changes(engine.storage_handler().as_ref(), log_root, 0, None) - .unwrap(); + .await.unwrap(); let table_schema = StructType::new_unchecked([ StructField::nullable("id", DataType::INTEGER), StructField::nullable("value", DataType::STRING), @@ -339,10 +344,10 @@ mod tests { table_schema.into(), None, ) - .unwrap(); + .await.unwrap(); let scan_files: Vec<_> = scan_metadata_to_scan_file(scan_metadata) .try_collect() - .unwrap(); + .await.unwrap(); // Generate the expected [`CdfScanFile`] let timestamps = log_segment diff --git a/kernel/tests/cdf.rs b/kernel/tests/cdf.rs index 002edc15e..2c64a4e86 100644 --- a/kernel/tests/cdf.rs +++ b/kernel/tests/cdf.rs @@ -3,6 +3,7 @@ use std::error; use delta_kernel::arrow::array::RecordBatch; use delta_kernel::arrow::compute::filter_record_batch; use delta_kernel::arrow::datatypes::Schema as ArrowSchema; +use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use delta_kernel::engine::arrow_conversion::TryFromKernel as _; @@ -15,7 +16,7 @@ mod common; use test_utils::DefaultEngineExtension; use test_utils::{load_test_data, to_arrow}; -fn read_cdf_for_table( +async fn read_cdf_for_table( test_name: impl AsRef, start_version: Version, end_version: impl Into>, @@ -30,7 +31,7 @@ fn read_cdf_for_table( engine.as_ref(), start_version, end_version.into(), - )?; + ).await?; // Project out the commit timestamp since file modification time may change anytime git clones // or switches branches @@ -49,7 +50,7 @@ fn read_cdf_for_table( let scan_schema_as_arrow = ArrowSchema::try_from_kernel(scan.logical_schema().as_ref()).unwrap(); let batches: Vec = scan - .execute(engine)? + .execute(engine).await? .map(|scan_result| -> DeltaResult<_> { let scan_result = scan_result?; let mask = scan_result.full_mask(); @@ -62,13 +63,13 @@ fn read_cdf_for_table( None => Ok(record_batch), } }) - .try_collect()?; + .try_collect().await?; Ok(batches) } -#[test] -fn cdf_with_deletion_vector() -> Result<(), Box> { - let batches = read_cdf_for_table("cdf-table-with-dv", 0, None, None)?; +#[tokio::test] +async fn cdf_with_deletion_vector() -> Result<(), Box> { + let batches = read_cdf_for_table("cdf-table-with-dv", 0, None, None).await?; // Each commit performs the following: // 0. Insert 0..=9 // 1. Remove [0, 9] @@ -112,9 +113,9 @@ fn cdf_with_deletion_vector() -> Result<(), Box> { Ok(()) } -#[test] -fn basic_cdf() -> Result<(), Box> { - let batches = read_cdf_for_table("cdf-table", 0, None, None)?; +#[tokio::test] +async fn basic_cdf() -> Result<(), Box> { + let batches = read_cdf_for_table("cdf-table", 0, None, None).await?; let mut expected = vec![ "+----+--------+------------+------------------+-----------------+", "| id | name | birthday | _change_type | _commit_version |", @@ -149,9 +150,9 @@ fn basic_cdf() -> Result<(), Box> { Ok(()) } -#[test] -fn cdf_non_partitioned() -> Result<(), Box> { - let batches = read_cdf_for_table("cdf-table-non-partitioned", 0, None, None)?; +#[tokio::test] +async fn cdf_non_partitioned() -> Result<(), Box> { + let batches = read_cdf_for_table("cdf-table-non-partitioned", 0, None, None).await?; let mut expected = vec![ "+----+--------+------------+-------------------+---------------+--------------+----------------+------------------+-----------------+", "| id | name | birthday | long_field | boolean_field | double_field | smallint_field | _change_type | _commit_version |", @@ -188,9 +189,9 @@ fn cdf_non_partitioned() -> Result<(), Box> { Ok(()) } -#[test] -fn cdf_with_cdc_and_dvs() -> Result<(), Box> { - let batches = read_cdf_for_table("cdf-table-with-cdc-and-dvs", 0, None, None)?; +#[tokio::test] +async fn cdf_with_cdc_and_dvs() -> Result<(), Box> { + let batches = read_cdf_for_table("cdf-table-with-cdc-and-dvs", 0, None, None).await?; let mut expected = vec![ "+----+--------------------+------------------+-----------------+", "| id | comment | _change_type | _commit_version |", @@ -245,9 +246,9 @@ fn cdf_with_cdc_and_dvs() -> Result<(), Box> { Ok(()) } -#[test] -fn simple_cdf_version_ranges() -> DeltaResult<()> { - let batches = read_cdf_for_table("cdf-table-simple", 0, 0, None)?; +#[tokio::test] +async fn simple_cdf_version_ranges() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-simple", 0, 0, None).await?; let mut expected = vec![ "+----+--------------+-----------------+", "| id | _change_type | _commit_version |", @@ -267,7 +268,7 @@ fn simple_cdf_version_ranges() -> DeltaResult<()> { sort_lines!(expected); assert_batches_sorted_eq!(expected, &batches); - let batches = read_cdf_for_table("cdf-table-simple", 1, 1, None)?; + let batches = read_cdf_for_table("cdf-table-simple", 1, 1, None).await?; let mut expected = vec![ "+----+--------------+-----------------+", "| id | _change_type | _commit_version |", @@ -287,7 +288,7 @@ fn simple_cdf_version_ranges() -> DeltaResult<()> { sort_lines!(expected); assert_batches_sorted_eq!(expected, &batches); - let batches = read_cdf_for_table("cdf-table-simple", 2, 2, None)?; + let batches = read_cdf_for_table("cdf-table-simple", 2, 2, None).await?; let mut expected = vec![ "+----+--------------+-----------------+", "| id | _change_type | _commit_version |", @@ -302,7 +303,7 @@ fn simple_cdf_version_ranges() -> DeltaResult<()> { sort_lines!(expected); assert_batches_sorted_eq!(expected, &batches); - let batches = read_cdf_for_table("cdf-table-simple", 0, 2, None)?; + let batches = read_cdf_for_table("cdf-table-simple", 0, 2, None).await?; let mut expected = vec![ "+----+--------------+-----------------+", "| id | _change_type | _commit_version |", @@ -339,9 +340,9 @@ fn simple_cdf_version_ranges() -> DeltaResult<()> { Ok(()) } -#[test] -fn update_operations() -> DeltaResult<()> { - let batches = read_cdf_for_table("cdf-table-update-ops", 0, 2, None)?; +#[tokio::test] +async fn update_operations() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-update-ops", 0, 2, None).await?; // Note: `update_pre` and `update_post` are technically not part of the delta spec, and instead // should be `update_preimage` and `update_postimage` respectively. However, the tests in // delta-spark use the post and pre. @@ -376,9 +377,9 @@ fn update_operations() -> DeltaResult<()> { Ok(()) } -#[test] -fn false_data_change_is_ignored() -> DeltaResult<()> { - let batches = read_cdf_for_table("cdf-table-data-change", 0, 1, None)?; +#[tokio::test] +async fn false_data_change_is_ignored() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-data-change", 0, 1, None).await?; let mut expected = vec![ "+----+--------------+-----------------+", "| id | _change_type | _commit_version |", @@ -400,24 +401,24 @@ fn false_data_change_is_ignored() -> DeltaResult<()> { Ok(()) } -#[test] -fn invalid_range_end_before_start() { - let res = read_cdf_for_table("cdf-table-simple", 1, 0, None); +#[tokio::test] +async fn invalid_range_end_before_start() { + let res = read_cdf_for_table("cdf-table-simple", 1, 0, None).await; let expected_msg = "Failed to build LogSegment: start_version cannot be greater than end_version"; assert!(matches!(res, Err(Error::Generic(msg)) if msg == expected_msg)); } -#[test] -fn invalid_range_start_after_last_version_of_table() { - let res = read_cdf_for_table("cdf-table-simple", 3, 4, None); +#[tokio::test] +async fn invalid_range_start_after_last_version_of_table() { + let res = read_cdf_for_table("cdf-table-simple", 3, 4, None).await; let expected_msg = "Expected the first commit to have version 3"; assert!(matches!(res, Err(Error::Generic(msg)) if msg == expected_msg)); } -#[test] -fn partition_table() -> DeltaResult<()> { - let batches = read_cdf_for_table("cdf-table-partitioned", 0, 2, None)?; +#[tokio::test] +async fn partition_table() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-partitioned", 0, 2, None).await?; let mut expected = vec![ "+----+------+------+------------------+-----------------+", "| id | text | part | _change_type | _commit_version |", @@ -441,9 +442,9 @@ fn partition_table() -> DeltaResult<()> { Ok(()) } -#[test] -fn backtick_column_names() -> DeltaResult<()> { - let batches = read_cdf_for_table("cdf-table-backtick-column-names", 0, None, None)?; +#[tokio::test] +async fn backtick_column_names() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-backtick-column-names", 0, None, None).await?; let mut expected = vec![ "+--------+----------+--------------------------+--------------+-----------------+", "| id.num | id.num`s | struct_col | _change_type | _commit_version |", @@ -460,9 +461,9 @@ fn backtick_column_names() -> DeltaResult<()> { Ok(()) } -#[test] -fn unconditional_delete() -> DeltaResult<()> { - let batches = read_cdf_for_table("cdf-table-delete-unconditional", 0, None, None)?; +#[tokio::test] +async fn unconditional_delete() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-delete-unconditional", 0, None, None).await?; let mut expected = vec![ "+----+--------------+-----------------+", "| id | _change_type | _commit_version |", @@ -494,9 +495,9 @@ fn unconditional_delete() -> DeltaResult<()> { Ok(()) } -#[test] -fn conditional_delete_all_rows() -> DeltaResult<()> { - let batches = read_cdf_for_table("cdf-table-delete-conditional-all-rows", 0, None, None)?; +#[tokio::test] +async fn conditional_delete_all_rows() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-delete-conditional-all-rows", 0, None, None).await?; let mut expected = vec![ "+----+--------------+-----------------+", "| id | _change_type | _commit_version |", @@ -528,9 +529,9 @@ fn conditional_delete_all_rows() -> DeltaResult<()> { Ok(()) } -#[test] -fn conditional_delete_two_rows() -> DeltaResult<()> { - let batches = read_cdf_for_table("cdf-table-delete-conditional-two-rows", 0, None, None)?; +#[tokio::test] +async fn conditional_delete_two_rows() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-delete-conditional-two-rows", 0, None, None).await?; let mut expected = vec![ "+----+--------------+-----------------+", "| id | _change_type | _commit_version |", diff --git a/kernel/tests/dv.rs b/kernel/tests/dv.rs index a8def7b29..8bbbe17ab 100644 --- a/kernel/tests/dv.rs +++ b/kernel/tests/dv.rs @@ -26,13 +26,13 @@ fn count_total_scan_rows( .fold_ok(0, Add::add) } -#[test] -fn dv_table() -> Result<(), Box> { +#[tokio::test] +async fn dv_table() -> Result<(), Box> { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/"))?; let url = url::Url::from_directory_path(path).unwrap(); let engine = DefaultEngine::new_local(); - let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; + let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).await?; let scan = snapshot.scan_builder().build()?; let stream = scan.execute(engine)?; @@ -41,13 +41,13 @@ fn dv_table() -> Result<(), Box> { Ok(()) } -#[test] -fn non_dv_table() -> Result<(), Box> { +#[tokio::test] +async fn non_dv_table() -> Result<(), Box> { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/"))?; let url = url::Url::from_directory_path(path).unwrap(); let engine = DefaultEngine::new_local(); - let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; + let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).await?; let scan = snapshot.scan_builder().build()?; let stream = scan.execute(engine)?; diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index 758902e7f..1c68907c3 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -168,7 +168,7 @@ async fn latest_snapshot_test( url: Url, expected_path: Option, ) -> Result<(), Box> { - let snapshot = Snapshot::builder_for(url).build(&engine)?; + let snapshot = Snapshot::builder_for(url).build(&engine).await?; let scan = snapshot.scan_builder().build()?; let scan_res = scan.execute(Arc::new(engine))?; let batches: Vec = scan_res @@ -271,7 +271,7 @@ async fn canonicalized_paths_test( _expected: Option, ) -> Result<(), Box> { // assert latest version is 1 and there are no files in the snapshot (add is removed) - let snapshot = Snapshot::builder_for(table_root).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(table_root).build(&engine).await.unwrap(); assert_eq!(snapshot.version(), 1); let scan = snapshot.scan_builder().build().expect("build the scan"); let mut scan_metadata = scan.scan_metadata(&engine).expect("scan metadata"); @@ -284,7 +284,7 @@ async fn checkpoint_test( table_root: Url, _expected: Option, ) -> Result<(), Box> { - let snapshot = Snapshot::builder_for(table_root).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(table_root).build(&engine).await.unwrap(); let version = snapshot.version(); let scan = snapshot.scan_builder().build().expect("build the scan"); let scan_metadata: Vec<_> = scan diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 817c85cbc..5bb52b33d 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -68,7 +68,7 @@ async fn single_commit_two_add_files() -> Result<(), Box> let expected_data = vec![batch.clone(), batch]; - let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?; + let snapshot = Snapshot::builder_for(location).build(engine.as_ref()).await?; let scan = snapshot.scan_builder().build()?; let mut files = 0; @@ -120,7 +120,7 @@ async fn two_commits() -> Result<(), Box> { let expected_data = vec![batch.clone(), batch]; - let snapshot = Snapshot::builder_for(location).build(&engine)?; + let snapshot = Snapshot::builder_for(location).build(&engine).await?; let scan = snapshot.scan_builder().build()?; let mut files = 0; @@ -173,7 +173,7 @@ async fn remove_action() -> Result<(), Box> { let expected_data = vec![batch]; - let snapshot = Snapshot::builder_for(location).build(&engine)?; + let snapshot = Snapshot::builder_for(location).build(&engine).await?; let scan = snapshot.scan_builder().build()?; let stream = scan.execute(Arc::new(engine))?.zip(expected_data); @@ -244,7 +244,7 @@ async fn stats() -> Result<(), Box> { storage.clone(), Arc::new(TokioBackgroundExecutor::new()), )); - let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?; + let snapshot = Snapshot::builder_for(location).build(engine.as_ref()).await?; // The first file has id between 1 and 3; the second has id between 5 and 7. For each operator, // we validate the boundary values where we expect the set of matched files to change. @@ -420,7 +420,7 @@ fn read_with_scan_metadata( Ok(()) } -fn read_table_data( +async fn read_table_data( path: &str, select_cols: Option<&[&str]>, predicate: Option, @@ -435,7 +435,7 @@ fn read_table_data( Arc::new(TokioBackgroundExecutor::new()), )?); - let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?; + let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref()).await?; let read_schema = select_cols.map(|select_cols| { let table_schema = snapshot.schema(); @@ -458,7 +458,7 @@ fn read_table_data( } // util to take a Vec<&str> and call read_table_data with Vec -fn read_table_data_str( +async fn read_table_data_str( path: &str, select_cols: Option<&[&str]>, predicate: Option, @@ -469,11 +469,11 @@ fn read_table_data_str( select_cols, predicate, expected.into_iter().map(String::from).collect(), - ) + ).await } -#[test] -fn data() -> Result<(), Box> { +#[tokio::test] +async fn data() -> Result<(), Box> { let expected = vec![ "+--------+--------+---------+", "| letter | number | a_float |", @@ -486,13 +486,13 @@ fn data() -> Result<(), Box> { "| e | 5 | 5.5 |", "+--------+--------+---------+", ]; - read_table_data_str("./tests/data/basic_partitioned", None, None, expected)?; + read_table_data_str("./tests/data/basic_partitioned", None, None, expected).await?; Ok(()) } -#[test] -fn column_ordering() -> Result<(), Box> { +#[tokio::test] +async fn column_ordering() -> Result<(), Box> { let expected = vec![ "+---------+--------+--------+", "| a_float | letter | number |", @@ -510,13 +510,13 @@ fn column_ordering() -> Result<(), Box> { Some(&["a_float", "letter", "number"]), None, expected, - )?; + ).await?; Ok(()) } -#[test] -fn column_ordering_and_projection() -> Result<(), Box> { +#[tokio::test] +async fn column_ordering_and_projection() -> Result<(), Box> { let expected = vec![ "+---------+--------+", "| a_float | number |", @@ -534,7 +534,7 @@ fn column_ordering_and_projection() -> Result<(), Box> { Some(&["a_float", "number"]), None, expected, - )?; + ).await?; Ok(()) } @@ -576,8 +576,8 @@ fn table_for_letters(letters: &[char]) -> Vec { res } -#[test] -fn predicate_on_number() -> Result<(), Box> { +#[tokio::test] +async fn predicate_on_number() -> Result<(), Box> { let cases = vec![ ( column_expr!("number").lt(Expr::literal(4i64)), @@ -611,13 +611,13 @@ fn predicate_on_number() -> Result<(), Box> { Some(&["a_float", "number"]), Some(pred), expected, - )?; + ).await?; } Ok(()) } -#[test] -fn predicate_on_letter() -> Result<(), Box> { +#[tokio::test] +async fn predicate_on_letter() -> Result<(), Box> { // Test basic column pruning. Note that the actual predicate machinery is already well-tested, // so we're just testing wiring here. let null_row_table: Vec = vec![ @@ -674,8 +674,8 @@ fn predicate_on_letter() -> Result<(), Box> { Ok(()) } -#[test] -fn predicate_on_letter_and_number() -> Result<(), Box> { +#[tokio::test] +async fn predicate_on_letter_and_number() -> Result<(), Box> { // Partition skipping and file skipping are currently implemented separately. Mixing them in an // AND clause will evaulate each separately, but mixing them in an OR clause disables both. let full_table: Vec = vec![ @@ -734,8 +734,8 @@ fn predicate_on_letter_and_number() -> Result<(), Box> { Ok(()) } -#[test] -fn predicate_on_number_not() -> Result<(), Box> { +#[tokio::test] +async fn predicate_on_number_not() -> Result<(), Box> { let cases = vec![ ( Pred::not(column_expr!("number").lt(Expr::literal(4i64))), @@ -773,8 +773,8 @@ fn predicate_on_number_not() -> Result<(), Box> { Ok(()) } -#[test] -fn predicate_on_number_with_not_null() -> Result<(), Box> { +#[tokio::test] +async fn predicate_on_number_with_not_null() -> Result<(), Box> { let expected = vec![ "+---------+--------+", "| a_float | number |", @@ -791,24 +791,24 @@ fn predicate_on_number_with_not_null() -> Result<(), Box> column_expr!("number").lt(Expr::literal(3i64)), )), expected, - )?; + ).await?; Ok(()) } -#[test] -fn predicate_null() -> Result<(), Box> { +#[tokio::test] +async fn predicate_null() -> Result<(), Box> { let expected = vec![]; // number is never null read_table_data_str( "./tests/data/basic_partitioned", Some(&["a_float", "number"]), Some(column_expr!("number").is_null()), expected, - )?; + ).await?; Ok(()) } -#[test] -fn mixed_null() -> Result<(), Box> { +#[tokio::test] +async fn mixed_null() -> Result<(), Box> { let expected = vec![ "+------+--------------+", "| part | n |", @@ -830,12 +830,12 @@ fn mixed_null() -> Result<(), Box> { Some(&["part", "n"]), Some(column_expr!("n").is_null()), expected, - )?; + ).await?; Ok(()) } -#[test] -fn mixed_not_null() -> Result<(), Box> { +#[tokio::test] +async fn mixed_not_null() -> Result<(), Box> { let expected = vec![ "+------+--------------+", "| part | n |", @@ -857,12 +857,12 @@ fn mixed_not_null() -> Result<(), Box> { Some(&["part", "n"]), Some(column_expr!("n").is_not_null()), expected, - )?; + ).await?; Ok(()) } -#[test] -fn and_or_predicates() -> Result<(), Box> { +#[tokio::test] +async fn and_or_predicates() -> Result<(), Box> { let cases = vec![ ( Pred::and( @@ -904,8 +904,8 @@ fn and_or_predicates() -> Result<(), Box> { Ok(()) } -#[test] -fn not_and_or_predicates() -> Result<(), Box> { +#[tokio::test] +async fn not_and_or_predicates() -> Result<(), Box> { let cases = vec![ ( Pred::not(Pred::and( @@ -947,8 +947,8 @@ fn not_and_or_predicates() -> Result<(), Box> { Ok(()) } -#[test] -fn invalid_skips_none_predicates() -> Result<(), Box> { +#[tokio::test] +async fn invalid_skips_none_predicates() -> Result<(), Box> { let empty_struct = Expr::struct_from(Vec::::new()); let cases = vec![ (Pred::literal(false), table_for_numbers(vec![])), @@ -1000,8 +1000,8 @@ fn invalid_skips_none_predicates() -> Result<(), Box> { Ok(()) } -#[test] -fn with_predicate_and_removes() -> Result<(), Box> { +#[tokio::test] +async fn with_predicate_and_removes() -> Result<(), Box> { let expected = vec![ "+-------+", "| value |", @@ -1021,7 +1021,7 @@ fn with_predicate_and_removes() -> Result<(), Box> { None, Some(Pred::gt(column_expr!("value"), Expr::literal(3))), expected, - )?; + ).await?; Ok(()) } @@ -1059,7 +1059,7 @@ async fn predicate_on_non_nullable_partition_column() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { +#[tokio::test] +async fn short_dv() -> Result<(), Box> { let expected = vec![ "+----+-------+--------------------------+---------------------+", "| id | value | timestamp | rand |", @@ -1158,12 +1158,12 @@ fn short_dv() -> Result<(), Box> { "| 9 | 9 | 2023-05-31T18:58:33.633Z | 0.5175919190815845 |", "+----+-------+--------------------------+---------------------+", ]; - read_table_data_str("./tests/data/with-short-dv/", None, None, expected)?; + read_table_data_str("./tests/data/with-short-dv/", None, None, expected).await?; Ok(()) } -#[test] -fn basic_decimal() -> Result<(), Box> { +#[tokio::test] +async fn basic_decimal() -> Result<(), Box> { let expected = vec![ "+----------------+---------+--------------+------------------------+", "| part | col1 | col2 | col3 |", @@ -1174,12 +1174,12 @@ fn basic_decimal() -> Result<(), Box> { "| 2342222.23454 | 111.11 | 22222.22222 | 3333333333.3333333333 |", "+----------------+---------+--------------+------------------------+", ]; - read_table_data_str("./tests/data/basic-decimal-table/", None, None, expected)?; + read_table_data_str("./tests/data/basic-decimal-table/", None, None, expected).await?; Ok(()) } -#[test] -fn timestamp_ntz() -> Result<(), Box> { +#[tokio::test] +async fn timestamp_ntz() -> Result<(), Box> { let expected = vec![ "+----+----------------------------+----------------------------+", "| id | tsNtz | tsNtzPartition |", @@ -1200,12 +1200,12 @@ fn timestamp_ntz() -> Result<(), Box> { None, None, expected, - )?; + ).await?; Ok(()) } -#[test] -fn type_widening_basic() -> Result<(), Box> { +#[tokio::test] +async fn type_widening_basic() -> Result<(), Box> { let expected = vec![ "+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+", "| byte_long | int_long | float_double | byte_double | short_double | int_double | date_timestamp_ntz |", @@ -1224,11 +1224,11 @@ fn type_widening_basic() -> Result<(), Box> { "date_timestamp_ntz", ]); - read_table_data_str("./tests/data/type-widening/", select_cols, None, expected) + read_table_data_str("./tests/data/type-widening/", select_cols, None, expected).await } -#[test] -fn type_widening_decimal() -> Result<(), Box> { +#[tokio::test] +async fn type_widening_decimal() -> Result<(), Box> { let expected = vec![ "+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+", "| decimal_decimal_same_scale | decimal_decimal_greater_scale | byte_decimal | short_decimal | int_decimal | long_decimal |", @@ -1245,12 +1245,12 @@ fn type_widening_decimal() -> Result<(), Box> { "int_decimal", "long_decimal", ]); - read_table_data_str("./tests/data/type-widening/", select_cols, None, expected) + read_table_data_str("./tests/data/type-widening/", select_cols, None, expected).await } // Verify that predicates over invalid/missing columns do not cause skipping. -#[test] -fn predicate_references_invalid_missing_column() -> Result<(), Box> { +#[tokio::test] +async fn predicate_references_invalid_missing_column() -> Result<(), Box> { // Attempted skipping over a logically valid but physically missing column. We should be able to // skip the data file because the missing column is inferred to be all-null. // @@ -1280,7 +1280,7 @@ fn predicate_references_invalid_missing_column() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { +#[tokio::test] +async fn timestamp_partitioned_table() -> Result<(), Box> { let expected = vec![ "+----+-----+---+----------------------+", "| id | x | s | time |", @@ -1322,11 +1322,11 @@ fn timestamp_partitioned_table() -> Result<(), Box> { let test_name = "timestamp-partitioned-table"; let test_dir = load_test_data("./tests/data", test_name).unwrap(); let test_path = test_dir.path().join(test_name); - read_table_data_str(test_path.to_str().unwrap(), None, None, expected) + read_table_data_str(test_path.to_str().unwrap(), None, None, expected).await } -#[test] -fn compacted_log_files_table() -> Result<(), Box> { +#[tokio::test] +async fn compacted_log_files_table() -> Result<(), Box> { let expected = vec![ "+----+--------------------+", "| id | comment |", @@ -1341,16 +1341,16 @@ fn compacted_log_files_table() -> Result<(), Box> { let test_name = "compacted-log-files-table"; let test_dir = load_test_data("./tests/data", test_name).unwrap(); let test_path = test_dir.path().join(test_name); - read_table_data_str(test_path.to_str().unwrap(), None, None, expected) + read_table_data_str(test_path.to_str().unwrap(), None, None, expected).await } -#[test] -fn unshredded_variant_table() -> Result<(), Box> { +#[tokio::test] +async fn unshredded_variant_table() -> Result<(), Box> { let expected = include!("data/unshredded-variant.expected.in"); let test_name = "unshredded-variant"; let test_dir = load_test_data("./tests/data", test_name).unwrap(); let test_path = test_dir.path().join(test_name); - read_table_data_str(test_path.to_str().unwrap(), None, None, expected) + read_table_data_str(test_path.to_str().unwrap(), None, None, expected).await } #[tokio::test] @@ -1414,7 +1414,7 @@ async fn test_row_index_metadata_column() -> Result<(), Box Result<(), Box>, data: Vec, ) -> DeltaResult { - let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref()).await?; let mut txn = snapshot.transaction()?; // Write data out by spawning async tasks to simulate executors @@ -237,7 +237,7 @@ async fn test_row_tracking_append() -> DeltaResult<()> { )?), &table_url, engine, - )?; + ).await?; Ok(()) } @@ -315,7 +315,7 @@ async fn test_row_tracking_large_batch() -> DeltaResult<()> { )?), &table_url, engine, - )?; + ).await?; Ok(()) } @@ -377,7 +377,7 @@ async fn test_row_tracking_consecutive_transactions() -> DeltaResult<()> { )?), &table_url, engine, - )?; + ).await?; Ok(()) } @@ -510,7 +510,7 @@ async fn test_row_tracking_with_regular_and_empty_adds() -> DeltaResult<()> { )?), &table_url, engine, - )?; + ).await?; Ok(()) } @@ -553,7 +553,7 @@ async fn test_row_tracking_with_empty_adds() -> DeltaResult<()> { .await?; // Verify that the table is empty - let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?; + let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref()).await?; let scan = snapshot.scan_builder().build()?; let batches = read_scan(&scan, engine)?; @@ -575,7 +575,7 @@ async fn test_row_tracking_without_adds() -> DeltaResult<()> { let (table_url, engine, store) = create_row_tracking_table(&tmp_test_dir, "test_consecutive_commits", schema.clone()) .await?; - let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref()).await?; let txn = snapshot.transaction()?; // Commit without adding any add files @@ -615,8 +615,8 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { let engine2 = engine; // Create two snapshots from the same initial state - let snapshot1 = Snapshot::builder_for(table_url.clone()).build(engine1.as_ref())?; - let snapshot2 = Snapshot::builder_for(table_url.clone()).build(engine2.as_ref())?; + let snapshot1 = Snapshot::builder_for(table_url.clone()).build(engine1.as_ref()).await?; + let snapshot2 = Snapshot::builder_for(table_url.clone()).build(engine2.as_ref()).await?; // Create two transactions from the same snapshot (simulating parallel transactions) let mut txn1 = snapshot1.transaction()?.with_engine_info("transaction 1"); diff --git a/kernel/tests/v2_checkpoints.rs b/kernel/tests/v2_checkpoints.rs index a3ae99b23..51da2c3e8 100644 --- a/kernel/tests/v2_checkpoints.rs +++ b/kernel/tests/v2_checkpoints.rs @@ -10,25 +10,25 @@ use test_utils::{load_test_data, DefaultEngineExtension}; use itertools::Itertools; use test_utils::read_scan; -fn read_v2_checkpoint_table(test_name: impl AsRef) -> DeltaResult> { +async fn read_v2_checkpoint_table(test_name: impl AsRef) -> DeltaResult> { let test_dir = load_test_data("tests/data", test_name.as_ref()).unwrap(); let test_path = test_dir.path().join(test_name.as_ref()); let engine = DefaultEngine::new_local(); let url = delta_kernel::try_parse_uri(test_path.to_str().expect("table path to string")).unwrap(); - let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap(); + let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).await.unwrap(); let scan = snapshot.scan_builder().build()?; let batches = read_scan(&scan, engine)?; Ok(batches) } -fn test_v2_checkpoint_with_table( +async fn test_v2_checkpoint_with_table( table_name: &str, mut expected_table: Vec, ) -> DeltaResult<()> { - let batches = read_v2_checkpoint_table(table_name)?; + let batches = read_v2_checkpoint_table(table_name).await?; sort_lines!(expected_table); assert_batches_sorted_eq!(expected_table, &batches); @@ -163,63 +163,63 @@ fn get_without_sidecars_table() -> Vec { /// - `V2 Checkpoint compat file equivalency to normal V2 Checkpoint` -> `v2_classic_checkpoint_parquet` /// - `last checkpoint contains correct schema for v1/v2 Checkpoints` -> `v2_checkpoints_json_with_last_checkpoint` /// - `last checkpoint contains correct schema for v1/v2 Checkpoints` -> `v2_checkpoints_parquet_with_last_checkpoint` -#[test] -fn v2_checkpoints_json_with_sidecars() -> DeltaResult<()> { +#[tokio::test] +async fn v2_checkpoints_json_with_sidecars() -> DeltaResult<()> { test_v2_checkpoint_with_table( "v2-checkpoints-json-with-sidecars", generate_sidecar_expected_data(), - ) + ).await } -#[test] -fn v2_checkpoints_parquet_with_sidecars() -> DeltaResult<()> { +#[tokio::test] +async fn v2_checkpoints_parquet_with_sidecars() -> DeltaResult<()> { test_v2_checkpoint_with_table( "v2-checkpoints-parquet-with-sidecars", generate_sidecar_expected_data(), - ) + ).await } -#[test] -fn v2_checkpoints_json_without_sidecars() -> DeltaResult<()> { +#[tokio::test] +async fn v2_checkpoints_json_without_sidecars() -> DeltaResult<()> { test_v2_checkpoint_with_table( "v2-checkpoints-json-without-sidecars", get_without_sidecars_table(), - ) + ).await } -#[test] -fn v2_checkpoints_parquet_without_sidecars() -> DeltaResult<()> { +#[tokio::test] +async fn v2_checkpoints_parquet_without_sidecars() -> DeltaResult<()> { test_v2_checkpoint_with_table( "v2-checkpoints-parquet-without-sidecars", get_without_sidecars_table(), - ) + ).await } -#[test] -fn v2_classic_checkpoint_json() -> DeltaResult<()> { - test_v2_checkpoint_with_table("v2-classic-checkpoint-json", get_classic_checkpoint_table()) +#[tokio::test] +async fn v2_classic_checkpoint_json() -> DeltaResult<()> { + test_v2_checkpoint_with_table("v2-classic-checkpoint-json", get_classic_checkpoint_table()).await } -#[test] -fn v2_classic_checkpoint_parquet() -> DeltaResult<()> { +#[tokio::test] +async fn v2_classic_checkpoint_parquet() -> DeltaResult<()> { test_v2_checkpoint_with_table( "v2-classic-checkpoint-parquet", get_classic_checkpoint_table(), - ) + ).await } -#[test] -fn v2_checkpoints_json_with_last_checkpoint() -> DeltaResult<()> { +#[tokio::test] +async fn v2_checkpoints_json_with_last_checkpoint() -> DeltaResult<()> { test_v2_checkpoint_with_table( "v2-checkpoints-json-with-last-checkpoint", get_simple_id_table(), - ) + ).await } -#[test] -fn v2_checkpoints_parquet_with_last_checkpoint() -> DeltaResult<()> { +#[tokio::test] +async fn v2_checkpoints_parquet_with_last_checkpoint() -> DeltaResult<()> { test_v2_checkpoint_with_table( "v2-checkpoints-parquet-with-last-checkpoint", get_simple_id_table(), - ) + ).await } diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index bd3da49e7..1e5baa912 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -60,7 +60,7 @@ async fn test_commit_info() -> Result<(), Box> { setup_test_tables(schema, &[], None, "test_table").await? { // create a transaction - let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine).await?; let txn = snapshot.transaction()?.with_engine_info("default engine"); // commit! @@ -143,7 +143,7 @@ async fn write_data_and_check_result_and_stats( engine: Arc>, expected_since_commit: u64, ) -> Result<(), Box> { - let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref()).await?; let mut txn = snapshot.transaction()?; // create two new arrow record batches to append @@ -213,7 +213,7 @@ async fn test_commit_info_action() -> Result<(), Box> { for (table_url, engine, store, table_name) in setup_test_tables(schema.clone(), &[], None, "test_table").await? { - let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine).await?; let txn = snapshot.transaction()?.with_engine_info("default engine"); txn.commit(&engine)?; @@ -338,7 +338,7 @@ async fn test_append() -> Result<(), Box> { )?), &table_url, engine, - )?; + ).await?; } Ok(()) } @@ -356,7 +356,7 @@ async fn test_no_add_actions() -> Result<(), Box> { for (table_url, engine, store, table_name) in setup_test_tables(schema.clone(), &[], None, "test_table").await? { - let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine).await?; let txn = snapshot.transaction()?.with_engine_info("default engine"); // Commit without adding any add files @@ -407,7 +407,7 @@ async fn test_append_twice() -> Result<(), Box> { )?), &table_url, engine, - )?; + ).await?; } Ok(()) } @@ -433,7 +433,7 @@ async fn test_append_partitioned() -> Result<(), Box> { for (table_url, engine, store, table_name) in setup_test_tables(table_schema.clone(), &[partition_col], None, "test_table").await? { - let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine).await?; let mut txn = snapshot.transaction()?.with_engine_info("default engine"); // create two new arrow record batches to append @@ -553,7 +553,7 @@ async fn test_append_partitioned() -> Result<(), Box> { )?), &table_url, engine, - )?; + ).await?; } Ok(()) } @@ -576,7 +576,7 @@ async fn test_append_invalid_schema() -> Result<(), Box> for (table_url, engine, _store, _table_name) in setup_test_tables(table_schema, &[], None, "test_table").await? { - let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine).await?; let txn = snapshot.transaction()?.with_engine_info("default engine"); // create two new arrow record batches to append @@ -634,7 +634,7 @@ async fn test_write_txn_actions() -> Result<(), Box> { setup_test_tables(schema, &[], None, "test_table").await? { // can't have duplicate app_id in same transaction - let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine).await?; assert!(matches!( snapshot .transaction()? @@ -644,7 +644,7 @@ async fn test_write_txn_actions() -> Result<(), Box> { Err(KernelError::Generic(msg)) if msg == "app_id app_id1 already exists in transaction" )); - let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine).await?; let txn = snapshot .transaction()? .with_engine_info("default engine") @@ -656,7 +656,7 @@ async fn test_write_txn_actions() -> Result<(), Box> { let snapshot = Snapshot::builder_for(table_url.clone()) .at_version(1) - .build(&engine)?; + .build(&engine).await?; assert_eq!( snapshot.clone().get_app_id_version("app_id1", &engine)?, Some(1) @@ -778,7 +778,7 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { ) .await?; - let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine).await?; let mut txn = snapshot.transaction()?.with_engine_info("default engine"); // Create Arrow data with TIMESTAMP_NTZ values including edge cases @@ -833,7 +833,7 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { assert!(parsed_commits[1].get("add").is_some()); // Verify the data can be read back correctly - test_read(&ArrowEngineData::new(data), &table_url, engine)?; + test_read(&ArrowEngineData::new(data), &table_url, engine).await?; Ok(()) } @@ -913,7 +913,7 @@ async fn test_append_variant() -> Result<(), Box> { ) .await?; - let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine).await?; let mut txn = snapshot.transaction()?; // First value corresponds to the variant value "1". Third value corresponds to the variant @@ -1079,7 +1079,7 @@ async fn test_append_variant() -> Result<(), Box> { ) .unwrap(); - test_read(&ArrowEngineData::new(expected_data), &table_url, engine)?; + test_read(&ArrowEngineData::new(expected_data), &table_url, engine).await?; Ok(()) } @@ -1123,7 +1123,7 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box Result<(), Box) -> DeltaResult, ) -> DeltaResult<()> { - let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?; + let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref()).await?; let scan = snapshot.scan_builder().build()?; let batches = read_scan(&scan, engine)?; let formatted = pretty_format_batches(&batches).unwrap().to_string(); diff --git a/test_table_changes.rs b/test_table_changes.rs new file mode 100644 index 000000000..0a0911151 --- /dev/null +++ b/test_table_changes.rs @@ -0,0 +1,50 @@ +use std::sync::Arc; +use delta_kernel::engine::sync::SyncEngine; +use delta_kernel::table_changes::TableChanges; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("Testing table_changes async functionality..."); + + let path = "./tests/data/table-with-cdf"; + let engine = Arc::new(SyncEngine::new()); + let url = delta_kernel::try_parse_uri(path)?; + + println!("Creating TableChanges..."); + let table_changes = TableChanges::try_new(url, engine.as_ref(), 0, Some(1)).await?; + + println!("TableChanges created successfully!"); + println!("Start version: {}", table_changes.start_version()); + println!("End version: {}", table_changes.end_version()); + println!("Schema: {}", table_changes.schema()); + + println!("Creating scan builder..."); + let scan = table_changes.into_scan_builder().build()?; + + println!("Executing scan..."); + let mut results = scan.execute(engine).await?; + + let mut count = 0; + while let Some(result) = futures::StreamExt::next(&mut results).await { + match result { + Ok(_scan_result) => { + count += 1; + println!("Got scan result {}", count); + } + Err(e) => { + println!("Error in scan result: {}", e); + return Err(e.into()); + } + } + + // Limit to first few results to avoid too much output + if count >= 3 { + break; + } + } + + println!("Successfully processed {} scan results", count); + println!("table_changes async functionality test completed successfully!"); + + Ok(()) +} \ No newline at end of file