From 4b5693336787048cac1effd670024fd4b22e395e Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 5 Jun 2025 15:34:27 -0700 Subject: [PATCH 1/3] very wip. probably should do separate crate --- kernel/Cargo.toml | 2 + kernel/tests/dhat_large_table_data.rs | 102 ++++++++++++++++++++++++++ kernel/tests/dhat_large_table_log.rs | 0 3 files changed, 104 insertions(+) create mode 100644 kernel/tests/dhat_large_table_data.rs create mode 100644 kernel/tests/dhat_large_table_log.rs diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 820f33bb5..831265560 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -141,6 +141,8 @@ hdfs-native-object-store = { version = "0.15.0" } hdfs-native = "0.12.2" walkdir = { version = "2.5.0" } async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation +dhat = "0.3" # for DHAT tests +rayon = "1.10" # for DHAT tests paste = "1.0" test-log = { version = "0.2", default-features = false, features = ["trace"] } tempfile = "3" diff --git a/kernel/tests/dhat_large_table_data.rs b/kernel/tests/dhat_large_table_data.rs new file mode 100644 index 000000000..c12e7e1fe --- /dev/null +++ b/kernel/tests/dhat_large_table_data.rs @@ -0,0 +1,102 @@ +//! This tests our memory usage for reading tables with large data files. + +use std::fs::File; +use std::path::Path; +use std::sync::Arc; + +use delta_kernel::arrow::array::{ArrayRef, Int64Array, StringArray}; +use delta_kernel::arrow::record_batch::RecordBatch; +use delta_kernel::parquet::arrow::ArrowWriter; +use delta_kernel::parquet::file::properties::WriterProperties; + +use serde_json::json; +use tempfile::tempdir; +use tracing::info; + +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +const NUM_ROWS: u64 = 1_000_000; // need 100M + +/// write a 1M row parquet file that is 1GB in size +fn write_large_parquet_to(path: &Path) -> Result<(), Box> { + let path = path.join("1.parquet"); + let file = File::create(&path)?; + + let i_col = Arc::new(Int64Array::from_iter_values(0..NUM_ROWS as i64)) as ArrayRef; + let s_col = (0..NUM_ROWS).map(|i| format!("val_{}_{}", i, "XYZ".repeat(350))); + let s_col = Arc::new(StringArray::from_iter_values(s_col)) as ArrayRef; + let rb = RecordBatch::try_from_iter(vec![("i", i_col.clone()), ("s", s_col.clone())])?; + + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(file, rb.schema(), Some(props))?; + writer.write(&rb)?; + let parquet_metadata = writer.close()?; + + // read to show file sizes + let metadata = std::fs::metadata(&path)?; + let file_size = metadata.len(); + let total_row_group_size: i64 = parquet_metadata + .row_groups + .iter() + .map(|rg| rg.total_byte_size) + .sum(); + println!("File size (compressed file size): {} bytes", file_size); + println!( + "Total size (uncompressed file size): {} bytes", + total_row_group_size + ); + + Ok(()) +} + +/// create a _delta_log/00000000000000000000.json file with a single add file for our 1.parquet +/// above +fn create_commit(path: &Path) -> Result<(), Box> { + let path = path.join("_delta_log/00000000000000000000.json"); + let mut file = File::create(&path)?; + + let actions = vec![ + json!({ + "metadata": { + "id": "00000000000000000000", + "format": "parquet", + "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"i\",\"type\":{\"type\":\"long\"},\"nullable\":true,\"metadata\":{}},{\"name\":\"s\",\"type\":{\"type\":\"string\"},\"nullable\":true,\"metadata\":{}}]}", + "partitionColumns": [], + "configuration": {} + } + }), + json!({ + "protocol": { + "minReaderVersion": 1, + "minWriterVersion": 1 + } + }), + json!({ + "add": { + "path": "1.parquet", + "partitionValues": {}, + "size": 1000000000, + "modificationTime": 0, + "dataChange": true + } + }), + ]; + + file.write_all(actions.join("\n").as_bytes())?; + + Ok(()) +} + +#[test] +fn test_dhat_large_table_data() -> Result<(), Box> { + let dir = tempdir()?; + let table_path = dir.path(); + let _profiler = dhat::Profiler::builder().testing().build(); + write_large_parquet_to(table_path)?; + + let stats = dhat::HeapStats::get(); + println!("Heap stats after scan::execute: {:?}", stats); + + Ok(()) +} diff --git a/kernel/tests/dhat_large_table_log.rs b/kernel/tests/dhat_large_table_log.rs new file mode 100644 index 000000000..e69de29bb From a076d03691d54bd57f1d4b3cce1c98eb4e7644ca Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 12 Jun 2025 14:50:48 -0700 Subject: [PATCH 2/3] log test --- kernel/tests/dhat_large_table_data.rs | 6 +- kernel/tests/dhat_large_table_log.rs | 141 ++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 2 deletions(-) diff --git a/kernel/tests/dhat_large_table_data.rs b/kernel/tests/dhat_large_table_data.rs index c12e7e1fe..671a9f498 100644 --- a/kernel/tests/dhat_large_table_data.rs +++ b/kernel/tests/dhat_large_table_data.rs @@ -1,6 +1,7 @@ //! This tests our memory usage for reading tables with large data files. use std::fs::File; +use std::io::Write; use std::path::Path; use std::sync::Arc; @@ -11,7 +12,6 @@ use delta_kernel::parquet::file::properties::WriterProperties; use serde_json::json; use tempfile::tempdir; -use tracing::info; #[global_allocator] static ALLOC: dhat::Alloc = dhat::Alloc; @@ -83,7 +83,9 @@ fn create_commit(path: &Path) -> Result<(), Box> { }), ]; - file.write_all(actions.join("\n").as_bytes())?; + for action in actions { + writeln!(file, "{}", action)?; + } Ok(()) } diff --git a/kernel/tests/dhat_large_table_log.rs b/kernel/tests/dhat_large_table_log.rs index e69de29bb..ae82967f2 100644 --- a/kernel/tests/dhat_large_table_log.rs +++ b/kernel/tests/dhat_large_table_log.rs @@ -0,0 +1,141 @@ +//! This tests our memory usage for reading tables with large/many log files. + +use std::fs::{create_dir_all, File}; +use std::io::Write; +use std::path::Path; +use std::sync::Arc; + +use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; +use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::object_store::local::LocalFileSystem; + +use delta_kernel::Table; +use serde_json::json; +use tempfile::tempdir; +use tracing::info; +use url::Url; + +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +const NUM_COMMITS: u64 = 1_000; // number of commit files to generate +const TOTAL_ACTIONS: u64 = 1_000_000; // total add/remove actions across all commits + +/// Generate a delta log with NUM_COMMITS commit files and TOTAL_ACTIONS add/remove action pairs +fn generate_delta_log(path: &Path) -> Result<(), Box> { + let delta_log_path = path.join("_delta_log"); + create_dir_all(&delta_log_path)?; + + let actions_per_commit = TOTAL_ACTIONS / NUM_COMMITS; + let mut current_file_id = 0u64; + + for commit_id in 0..NUM_COMMITS { + let commit_filename = format!("{:020}.json", commit_id); + let commit_path = delta_log_path.join(&commit_filename); + let mut file = File::create(&commit_path)?; + + let mut actions = Vec::new(); + + // Add metadata and protocol only for the first commit + if commit_id == 0 { + actions.push(json!({ + "metaData": { + "id": format!("{:020}", commit_id), + "format": { "provider": "parquet", "options": {} }, + "schemaString": r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#, + "partitionColumns": [], + "configuration": {} + } + })); + actions.push(json!({ + "protocol": { + "minReaderVersion": 1, + "minWriterVersion": 1 + } + })); + } + + // Generate add/remove action pairs for this commit + for _ in 0..actions_per_commit / 2 { + // Add action + actions.push(json!({ + "add": { + "path": format!("{}.parquet", current_file_id), + "partitionValues": {}, + "size": 1000000, + "modificationTime": commit_id * 1000, + "dataChange": true + } + })); + + // Remove action (remove a previous file if we have any) + if current_file_id > 0 { + actions.push(json!({ + "remove": { + "path": format!("{}.parquet", current_file_id - 1), + "deletionTimestamp": commit_id * 1000 + 500, + "dataChange": true + } + })); + } + + current_file_id += 1; + } + + // Write actions to file + for action in actions { + writeln!(file, "{}", action)?; + } + } + + info!( + "Generated {} commit files with {} total actions", + NUM_COMMITS, TOTAL_ACTIONS + ); + Ok(()) +} + +#[test] +fn test_dhat_large_table_log() -> Result<(), Box> { + let dir = tempdir()?; + let table_path = dir.path(); + + info!( + "Generating delta log with {} commits and {} total actions", + NUM_COMMITS, TOTAL_ACTIONS + ); + generate_delta_log(table_path)?; + + let _profiler = dhat::Profiler::builder().testing().build(); + let store = Arc::new(LocalFileSystem::new()); + let url = "file:///".to_owned() + table_path.to_str().unwrap(); + let url = Url::parse(&url).unwrap(); + let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + + let table = Table::try_from_uri(&url).expect("Failed to create Delta Table"); + let snapshot = table + .snapshot(&engine, None) + .expect("Failed to get latest snapshot"); + + let stats = dhat::HeapStats::get(); + println!("Heap stats after PM replay: {:?}", stats); + + let scan = snapshot + .into_scan_builder() + .build() + .expect("Failed to build scan"); + let scan_metadata = scan + .scan_metadata(&engine) + .expect("Failed to get scan metadata"); + for res in scan_metadata { + let _scan_metadata = res.expect("Failed to read scan metadata"); + // scan_metadata.visit_scan_files((), |_, file, _, _, _, _, _| { + // // do something + // })?; + } + + let stats = dhat::HeapStats::get(); + println!("Heap stats after Scan replay: {:?}", stats); + + Ok(()) +} From 94e7a21c4bb401e3d06cabd1db0291da58f4fe70 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 18 Sep 2025 14:45:10 -0700 Subject: [PATCH 3/3] clean it up --- Cargo.toml | 1 + kernel/Cargo.toml | 2 - mem-test/Cargo.toml | 25 ++++++ mem-test/src/lib.rs | 3 + .../tests/dhat_large_table_data.rs | 80 +++++++++++++++++-- .../tests/dhat_large_table_log.rs | 25 +++--- 6 files changed, 112 insertions(+), 24 deletions(-) create mode 100644 mem-test/Cargo.toml create mode 100644 mem-test/src/lib.rs rename {kernel => mem-test}/tests/dhat_large_table_data.rs (51%) rename {kernel => mem-test}/tests/dhat_large_table_log.rs (87%) diff --git a/Cargo.toml b/Cargo.toml index 28510ed6b..a6bac8d97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "kernel/examples/*", "test-utils", "feature-tests", + "mem-test", "uc-client", # WIP: this is an experimental UC client for catalog-managed table work ] # note that in addition to the members above, the workspace includes examples: diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 831265560..820f33bb5 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -141,8 +141,6 @@ hdfs-native-object-store = { version = "0.15.0" } hdfs-native = "0.12.2" walkdir = { version = "2.5.0" } async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation -dhat = "0.3" # for DHAT tests -rayon = "1.10" # for DHAT tests paste = "1.0" test-log = { version = "0.2", default-features = false, features = ["trace"] } tempfile = "3" diff --git a/mem-test/Cargo.toml b/mem-test/Cargo.toml new file mode 100644 index 000000000..81e3d2724 --- /dev/null +++ b/mem-test/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "mem-test" +edition.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true +readme.workspace = true +rust-version.workspace = true +version.workspace = true + +# for cargo-release +[package.metadata.release] +release = false + +[dependencies] +arrow = "56" +delta_kernel = { path = "../kernel", features = ["arrow", "default-engine-rustls"] } +dhat = "0.3" +object_store = "0.12.3" +rayon = "1.10" +serde_json = "1" +tempfile = "3" +tracing = "0.1" +url = "2" diff --git a/mem-test/src/lib.rs b/mem-test/src/lib.rs new file mode 100644 index 000000000..0221b9195 --- /dev/null +++ b/mem-test/src/lib.rs @@ -0,0 +1,3 @@ +//! Memory testing. +//! +//! Run all tests with: `cargo test -p mem-test -- --ignored --nocapture` diff --git a/kernel/tests/dhat_large_table_data.rs b/mem-test/tests/dhat_large_table_data.rs similarity index 51% rename from kernel/tests/dhat_large_table_data.rs rename to mem-test/tests/dhat_large_table_data.rs index 671a9f498..eabfa09ec 100644 --- a/kernel/tests/dhat_large_table_data.rs +++ b/mem-test/tests/dhat_large_table_data.rs @@ -1,22 +1,31 @@ //! This tests our memory usage for reading tables with large data files. +//! +//! run with `cargo test -p mem-test dhat_large_table_data -- --ignored --nocapture` -use std::fs::File; +use std::fs::{create_dir_all, File}; use std::io::Write; use std::path::Path; use std::sync::Arc; use delta_kernel::arrow::array::{ArrayRef, Int64Array, StringArray}; use delta_kernel::arrow::record_batch::RecordBatch; +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; +use delta_kernel::engine::default::DefaultEngine; use delta_kernel::parquet::arrow::ArrowWriter; use delta_kernel::parquet::file::properties::WriterProperties; +use delta_kernel::Snapshot; +use arrow::compute::filter_record_batch; +use object_store::local::LocalFileSystem; use serde_json::json; use tempfile::tempdir; +use url::Url; #[global_allocator] static ALLOC: dhat::Alloc = dhat::Alloc; -const NUM_ROWS: u64 = 1_000_000; // need 100M +const NUM_ROWS: u64 = 1_000_000; /// write a 1M row parquet file that is 1GB in size fn write_large_parquet_to(path: &Path) -> Result<(), Box> { @@ -53,15 +62,17 @@ fn write_large_parquet_to(path: &Path) -> Result<(), Box> /// create a _delta_log/00000000000000000000.json file with a single add file for our 1.parquet /// above fn create_commit(path: &Path) -> Result<(), Box> { - let path = path.join("_delta_log/00000000000000000000.json"); - let mut file = File::create(&path)?; + let delta_log_path = path.join("_delta_log"); + create_dir_all(&delta_log_path)?; + let commit_path = delta_log_path.join("00000000000000000000.json"); + let mut file = File::create(&commit_path)?; let actions = vec![ json!({ - "metadata": { + "metaData": { "id": "00000000000000000000", - "format": "parquet", - "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"i\",\"type\":{\"type\":\"long\"},\"nullable\":true,\"metadata\":{}},{\"name\":\"s\",\"type\":{\"type\":\"string\"},\"nullable\":true,\"metadata\":{}}]}", + "format": {"provider": "parquet", "options": {}}, + "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"i\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"s\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}", "partitionColumns": [], "configuration": {} } @@ -90,15 +101,68 @@ fn create_commit(path: &Path) -> Result<(), Box> { Ok(()) } +#[ignore = "mem-test - run manually"] #[test] fn test_dhat_large_table_data() -> Result<(), Box> { let dir = tempdir()?; let table_path = dir.path(); let _profiler = dhat::Profiler::builder().testing().build(); + + // Step 1: Write the large parquet file write_large_parquet_to(table_path)?; + let stats = dhat::HeapStats::get(); + println!("Heap stats after writing parquet:\n{:?}", stats); + + // Step 2: Create the Delta log + create_commit(table_path)?; + + // Step 3: Create engine and snapshot + let store = Arc::new(LocalFileSystem::new()); + let url = Url::from_directory_path(table_path).unwrap(); + let engine = Arc::new(DefaultEngine::new( + store, + Arc::new(TokioBackgroundExecutor::new()), + )); + + let snapshot = Snapshot::builder_for(url) + .build(engine.as_ref()) + .expect("Failed to create snapshot"); + + let stats = dhat::HeapStats::get(); + println!("Heap stats after creating snapshot:\n{:?}", stats); + + // Step 4: Build and execute scan + let scan = snapshot + .scan_builder() + .build() + .expect("Failed to build scan"); + + let stats = dhat::HeapStats::get(); + println!("Heap stats after building scan:\n{:?}", stats); + + // Step 5: Execute the scan and read data + let mut row_count = 0; + for scan_result in scan.execute(engine)? { + let scan_result = scan_result?; + let mask = scan_result.full_mask(); + let data = scan_result.raw_data?; + let record_batch: RecordBatch = data + .into_any() + .downcast::() + .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? + .into(); + + let batch = if let Some(mask) = mask { + filter_record_batch(&record_batch, &mask.into())? + } else { + record_batch + }; + row_count += batch.num_rows(); + } let stats = dhat::HeapStats::get(); - println!("Heap stats after scan::execute: {:?}", stats); + println!("Heap stats after scan execution:\n{:?}", stats); + println!("Total rows read: {}", row_count); Ok(()) } diff --git a/kernel/tests/dhat_large_table_log.rs b/mem-test/tests/dhat_large_table_log.rs similarity index 87% rename from kernel/tests/dhat_large_table_log.rs rename to mem-test/tests/dhat_large_table_log.rs index ae82967f2..a1186c8bf 100644 --- a/kernel/tests/dhat_large_table_log.rs +++ b/mem-test/tests/dhat_large_table_log.rs @@ -1,4 +1,6 @@ //! This tests our memory usage for reading tables with large/many log files. +//! +//! run with `cargo test -p mem-test dhat_large_table_log -- --ignored --nocapture` use std::fs::{create_dir_all, File}; use std::io::Write; @@ -7,9 +9,8 @@ use std::sync::Arc; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; -use delta_kernel::object_store::local::LocalFileSystem; - -use delta_kernel::Table; +use delta_kernel::Snapshot; +use object_store::local::LocalFileSystem; use serde_json::json; use tempfile::tempdir; use tracing::info; @@ -95,6 +96,7 @@ fn generate_delta_log(path: &Path) -> Result<(), Box> { Ok(()) } +#[ignore = "mem-test - run manually"] #[test] fn test_dhat_large_table_log() -> Result<(), Box> { let dir = tempdir()?; @@ -108,20 +110,18 @@ fn test_dhat_large_table_log() -> Result<(), Box> { let _profiler = dhat::Profiler::builder().testing().build(); let store = Arc::new(LocalFileSystem::new()); - let url = "file:///".to_owned() + table_path.to_str().unwrap(); - let url = Url::parse(&url).unwrap(); + let url = Url::from_directory_path(table_path).unwrap(); let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); - let table = Table::try_from_uri(&url).expect("Failed to create Delta Table"); - let snapshot = table - .snapshot(&engine, None) + let snapshot = Snapshot::builder_for(url) + .build(&engine) .expect("Failed to get latest snapshot"); let stats = dhat::HeapStats::get(); - println!("Heap stats after PM replay: {:?}", stats); + println!("Heap stats after PM replay:\n{:?}", stats); let scan = snapshot - .into_scan_builder() + .scan_builder() .build() .expect("Failed to build scan"); let scan_metadata = scan @@ -129,13 +129,10 @@ fn test_dhat_large_table_log() -> Result<(), Box> { .expect("Failed to get scan metadata"); for res in scan_metadata { let _scan_metadata = res.expect("Failed to read scan metadata"); - // scan_metadata.visit_scan_files((), |_, file, _, _, _, _, _| { - // // do something - // })?; } let stats = dhat::HeapStats::get(); - println!("Heap stats after Scan replay: {:?}", stats); + println!("Heap stats after Scan replay:\n{:?}", stats); Ok(()) }