diff --git a/Cargo.toml b/Cargo.toml index b8a9c76d5..2161343cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "kernel/examples/*", "test-utils", "feature-tests", + "mem-test", "uc-client", # WIP: this is an experimental UC client for catalog-managed table work ] # note that in addition to the members above, the workspace includes examples: diff --git a/mem-test/Cargo.toml b/mem-test/Cargo.toml new file mode 100644 index 000000000..81e3d2724 --- /dev/null +++ b/mem-test/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "mem-test" +edition.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true +readme.workspace = true +rust-version.workspace = true +version.workspace = true + +# for cargo-release +[package.metadata.release] +release = false + +[dependencies] +arrow = "56" +delta_kernel = { path = "../kernel", features = ["arrow", "default-engine-rustls"] } +dhat = "0.3" +object_store = "0.12.3" +rayon = "1.10" +serde_json = "1" +tempfile = "3" +tracing = "0.1" +url = "2" diff --git a/mem-test/src/lib.rs b/mem-test/src/lib.rs new file mode 100644 index 000000000..0221b9195 --- /dev/null +++ b/mem-test/src/lib.rs @@ -0,0 +1,3 @@ +//! Memory testing. +//! +//! Run all tests with: `cargo test -p mem-test -- --ignored --nocapture` diff --git a/mem-test/tests/dhat_large_table_data.rs b/mem-test/tests/dhat_large_table_data.rs new file mode 100644 index 000000000..eabfa09ec --- /dev/null +++ b/mem-test/tests/dhat_large_table_data.rs @@ -0,0 +1,168 @@ +//! This tests our memory usage for reading tables with large data files. +//! +//! run with `cargo test -p mem-test dhat_large_table_data -- --ignored --nocapture` + +use std::fs::{create_dir_all, File}; +use std::io::Write; +use std::path::Path; +use std::sync::Arc; + +use delta_kernel::arrow::array::{ArrayRef, Int64Array, StringArray}; +use delta_kernel::arrow::record_batch::RecordBatch; +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; +use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::parquet::arrow::ArrowWriter; +use delta_kernel::parquet::file::properties::WriterProperties; +use delta_kernel::Snapshot; + +use arrow::compute::filter_record_batch; +use object_store::local::LocalFileSystem; +use serde_json::json; +use tempfile::tempdir; +use url::Url; + +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +const NUM_ROWS: u64 = 1_000_000; + +/// write a 1M row parquet file that is 1GB in size +fn write_large_parquet_to(path: &Path) -> Result<(), Box> { + let path = path.join("1.parquet"); + let file = File::create(&path)?; + + let i_col = Arc::new(Int64Array::from_iter_values(0..NUM_ROWS as i64)) as ArrayRef; + let s_col = (0..NUM_ROWS).map(|i| format!("val_{}_{}", i, "XYZ".repeat(350))); + let s_col = Arc::new(StringArray::from_iter_values(s_col)) as ArrayRef; + let rb = RecordBatch::try_from_iter(vec![("i", i_col.clone()), ("s", s_col.clone())])?; + + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(file, rb.schema(), Some(props))?; + writer.write(&rb)?; + let parquet_metadata = writer.close()?; + + // read to show file sizes + let metadata = std::fs::metadata(&path)?; + let file_size = metadata.len(); + let total_row_group_size: i64 = parquet_metadata + .row_groups + .iter() + .map(|rg| rg.total_byte_size) + .sum(); + println!("File size (compressed file size): {} bytes", file_size); + println!( + "Total size (uncompressed file size): {} bytes", + total_row_group_size + ); + + Ok(()) +} + +/// create a _delta_log/00000000000000000000.json file with a single add file for our 1.parquet +/// above +fn create_commit(path: &Path) -> Result<(), Box> { + let delta_log_path = path.join("_delta_log"); + create_dir_all(&delta_log_path)?; + let commit_path = delta_log_path.join("00000000000000000000.json"); + let mut file = File::create(&commit_path)?; + + let actions = vec![ + json!({ + "metaData": { + "id": "00000000000000000000", + "format": {"provider": "parquet", "options": {}}, + "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"i\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"s\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}", + "partitionColumns": [], + "configuration": {} + } + }), + json!({ + "protocol": { + "minReaderVersion": 1, + "minWriterVersion": 1 + } + }), + json!({ + "add": { + "path": "1.parquet", + "partitionValues": {}, + "size": 1000000000, + "modificationTime": 0, + "dataChange": true + } + }), + ]; + + for action in actions { + writeln!(file, "{}", action)?; + } + + Ok(()) +} + +#[ignore = "mem-test - run manually"] +#[test] +fn test_dhat_large_table_data() -> Result<(), Box> { + let dir = tempdir()?; + let table_path = dir.path(); + let _profiler = dhat::Profiler::builder().testing().build(); + + // Step 1: Write the large parquet file + write_large_parquet_to(table_path)?; + let stats = dhat::HeapStats::get(); + println!("Heap stats after writing parquet:\n{:?}", stats); + + // Step 2: Create the Delta log + create_commit(table_path)?; + + // Step 3: Create engine and snapshot + let store = Arc::new(LocalFileSystem::new()); + let url = Url::from_directory_path(table_path).unwrap(); + let engine = Arc::new(DefaultEngine::new( + store, + Arc::new(TokioBackgroundExecutor::new()), + )); + + let snapshot = Snapshot::builder_for(url) + .build(engine.as_ref()) + .expect("Failed to create snapshot"); + + let stats = dhat::HeapStats::get(); + println!("Heap stats after creating snapshot:\n{:?}", stats); + + // Step 4: Build and execute scan + let scan = snapshot + .scan_builder() + .build() + .expect("Failed to build scan"); + + let stats = dhat::HeapStats::get(); + println!("Heap stats after building scan:\n{:?}", stats); + + // Step 5: Execute the scan and read data + let mut row_count = 0; + for scan_result in scan.execute(engine)? { + let scan_result = scan_result?; + let mask = scan_result.full_mask(); + let data = scan_result.raw_data?; + let record_batch: RecordBatch = data + .into_any() + .downcast::() + .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? + .into(); + + let batch = if let Some(mask) = mask { + filter_record_batch(&record_batch, &mask.into())? + } else { + record_batch + }; + row_count += batch.num_rows(); + } + + let stats = dhat::HeapStats::get(); + println!("Heap stats after scan execution:\n{:?}", stats); + println!("Total rows read: {}", row_count); + + Ok(()) +} diff --git a/mem-test/tests/dhat_large_table_log.rs b/mem-test/tests/dhat_large_table_log.rs new file mode 100644 index 000000000..a1186c8bf --- /dev/null +++ b/mem-test/tests/dhat_large_table_log.rs @@ -0,0 +1,138 @@ +//! This tests our memory usage for reading tables with large/many log files. +//! +//! run with `cargo test -p mem-test dhat_large_table_log -- --ignored --nocapture` + +use std::fs::{create_dir_all, File}; +use std::io::Write; +use std::path::Path; +use std::sync::Arc; + +use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; +use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::Snapshot; +use object_store::local::LocalFileSystem; +use serde_json::json; +use tempfile::tempdir; +use tracing::info; +use url::Url; + +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +const NUM_COMMITS: u64 = 1_000; // number of commit files to generate +const TOTAL_ACTIONS: u64 = 1_000_000; // total add/remove actions across all commits + +/// Generate a delta log with NUM_COMMITS commit files and TOTAL_ACTIONS add/remove action pairs +fn generate_delta_log(path: &Path) -> Result<(), Box> { + let delta_log_path = path.join("_delta_log"); + create_dir_all(&delta_log_path)?; + + let actions_per_commit = TOTAL_ACTIONS / NUM_COMMITS; + let mut current_file_id = 0u64; + + for commit_id in 0..NUM_COMMITS { + let commit_filename = format!("{:020}.json", commit_id); + let commit_path = delta_log_path.join(&commit_filename); + let mut file = File::create(&commit_path)?; + + let mut actions = Vec::new(); + + // Add metadata and protocol only for the first commit + if commit_id == 0 { + actions.push(json!({ + "metaData": { + "id": format!("{:020}", commit_id), + "format": { "provider": "parquet", "options": {} }, + "schemaString": r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#, + "partitionColumns": [], + "configuration": {} + } + })); + actions.push(json!({ + "protocol": { + "minReaderVersion": 1, + "minWriterVersion": 1 + } + })); + } + + // Generate add/remove action pairs for this commit + for _ in 0..actions_per_commit / 2 { + // Add action + actions.push(json!({ + "add": { + "path": format!("{}.parquet", current_file_id), + "partitionValues": {}, + "size": 1000000, + "modificationTime": commit_id * 1000, + "dataChange": true + } + })); + + // Remove action (remove a previous file if we have any) + if current_file_id > 0 { + actions.push(json!({ + "remove": { + "path": format!("{}.parquet", current_file_id - 1), + "deletionTimestamp": commit_id * 1000 + 500, + "dataChange": true + } + })); + } + + current_file_id += 1; + } + + // Write actions to file + for action in actions { + writeln!(file, "{}", action)?; + } + } + + info!( + "Generated {} commit files with {} total actions", + NUM_COMMITS, TOTAL_ACTIONS + ); + Ok(()) +} + +#[ignore = "mem-test - run manually"] +#[test] +fn test_dhat_large_table_log() -> Result<(), Box> { + let dir = tempdir()?; + let table_path = dir.path(); + + info!( + "Generating delta log with {} commits and {} total actions", + NUM_COMMITS, TOTAL_ACTIONS + ); + generate_delta_log(table_path)?; + + let _profiler = dhat::Profiler::builder().testing().build(); + let store = Arc::new(LocalFileSystem::new()); + let url = Url::from_directory_path(table_path).unwrap(); + let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + + let snapshot = Snapshot::builder_for(url) + .build(&engine) + .expect("Failed to get latest snapshot"); + + let stats = dhat::HeapStats::get(); + println!("Heap stats after PM replay:\n{:?}", stats); + + let scan = snapshot + .scan_builder() + .build() + .expect("Failed to build scan"); + let scan_metadata = scan + .scan_metadata(&engine) + .expect("Failed to get scan metadata"); + for res in scan_metadata { + let _scan_metadata = res.expect("Failed to read scan metadata"); + } + + let stats = dhat::HeapStats::get(); + println!("Heap stats after Scan replay:\n{:?}", stats); + + Ok(()) +}