Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"kernel/examples/*",
"test-utils",
"feature-tests",
"mem-test",
"uc-client", # WIP: this is an experimental UC client for catalog-managed table work
]
# note that in addition to the members above, the workspace includes examples:
Expand Down
25 changes: 25 additions & 0 deletions mem-test/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "mem-test"
edition.workspace = true
homepage.workspace = true
keywords.workspace = true
license.workspace = true
repository.workspace = true
readme.workspace = true
rust-version.workspace = true
version.workspace = true

# for cargo-release
[package.metadata.release]
release = false

[dependencies]
arrow = "56"
delta_kernel = { path = "../kernel", features = ["arrow", "default-engine-rustls"] }
dhat = "0.3"
object_store = "0.12.3"
rayon = "1.10"
serde_json = "1"
tempfile = "3"
tracing = "0.1"
url = "2"
3 changes: 3 additions & 0 deletions mem-test/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Memory testing.
//!
//! Run all tests with: `cargo test -p mem-test -- --ignored --nocapture`
168 changes: 168 additions & 0 deletions mem-test/tests/dhat_large_table_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
//! This tests our memory usage for reading tables with large data files.
//!
//! run with `cargo test -p mem-test dhat_large_table_data -- --ignored --nocapture`

use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::Path;
use std::sync::Arc;

use delta_kernel::arrow::array::{ArrayRef, Int64Array, StringArray};
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::parquet::arrow::ArrowWriter;
use delta_kernel::parquet::file::properties::WriterProperties;
use delta_kernel::Snapshot;

use arrow::compute::filter_record_batch;
use object_store::local::LocalFileSystem;
use serde_json::json;
use tempfile::tempdir;
use url::Url;

#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

const NUM_ROWS: u64 = 1_000_000;

/// write a 1M row parquet file that is 1GB in size
fn write_large_parquet_to(path: &Path) -> Result<(), Box<dyn std::error::Error>> {
let path = path.join("1.parquet");
let file = File::create(&path)?;

let i_col = Arc::new(Int64Array::from_iter_values(0..NUM_ROWS as i64)) as ArrayRef;
let s_col = (0..NUM_ROWS).map(|i| format!("val_{}_{}", i, "XYZ".repeat(350)));
let s_col = Arc::new(StringArray::from_iter_values(s_col)) as ArrayRef;
let rb = RecordBatch::try_from_iter(vec![("i", i_col.clone()), ("s", s_col.clone())])?;

let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(file, rb.schema(), Some(props))?;
writer.write(&rb)?;
let parquet_metadata = writer.close()?;

// read to show file sizes
let metadata = std::fs::metadata(&path)?;
let file_size = metadata.len();
let total_row_group_size: i64 = parquet_metadata
.row_groups
.iter()
.map(|rg| rg.total_byte_size)
.sum();
println!("File size (compressed file size): {} bytes", file_size);
println!(
"Total size (uncompressed file size): {} bytes",
total_row_group_size
);

Ok(())
}

/// create a _delta_log/00000000000000000000.json file with a single add file for our 1.parquet
/// above
fn create_commit(path: &Path) -> Result<(), Box<dyn std::error::Error>> {
let delta_log_path = path.join("_delta_log");
create_dir_all(&delta_log_path)?;
let commit_path = delta_log_path.join("00000000000000000000.json");
let mut file = File::create(&commit_path)?;

let actions = vec![
json!({
"metaData": {
"id": "00000000000000000000",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"i\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"s\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": [],
"configuration": {}
}
}),
json!({
"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 1
}
}),
json!({
"add": {
"path": "1.parquet",
"partitionValues": {},
"size": 1000000000,
"modificationTime": 0,
"dataChange": true
}
}),
];

for action in actions {
writeln!(file, "{}", action)?;
}

Ok(())
}

#[ignore = "mem-test - run manually"]
#[test]
fn test_dhat_large_table_data() -> Result<(), Box<dyn std::error::Error>> {
let dir = tempdir()?;
let table_path = dir.path();
let _profiler = dhat::Profiler::builder().testing().build();

// Step 1: Write the large parquet file
write_large_parquet_to(table_path)?;
let stats = dhat::HeapStats::get();
println!("Heap stats after writing parquet:\n{:?}", stats);

// Step 2: Create the Delta log
create_commit(table_path)?;

// Step 3: Create engine and snapshot
let store = Arc::new(LocalFileSystem::new());
let url = Url::from_directory_path(table_path).unwrap();
let engine = Arc::new(DefaultEngine::new(
store,
Arc::new(TokioBackgroundExecutor::new()),
));

let snapshot = Snapshot::builder_for(url)
.build(engine.as_ref())
.expect("Failed to create snapshot");

let stats = dhat::HeapStats::get();
println!("Heap stats after creating snapshot:\n{:?}", stats);

// Step 4: Build and execute scan
let scan = snapshot
.scan_builder()
.build()
.expect("Failed to build scan");

let stats = dhat::HeapStats::get();
println!("Heap stats after building scan:\n{:?}", stats);

// Step 5: Execute the scan and read data
let mut row_count = 0;
for scan_result in scan.execute(engine)? {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();

let batch = if let Some(mask) = mask {
filter_record_batch(&record_batch, &mask.into())?
} else {
record_batch
};
row_count += batch.num_rows();
}

let stats = dhat::HeapStats::get();
println!("Heap stats after scan execution:\n{:?}", stats);
println!("Total rows read: {}", row_count);

Ok(())
}
138 changes: 138 additions & 0 deletions mem-test/tests/dhat_large_table_log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//! This tests our memory usage for reading tables with large/many log files.
//!
//! run with `cargo test -p mem-test dhat_large_table_log -- --ignored --nocapture`

use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::Path;
use std::sync::Arc;

use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::Snapshot;
use object_store::local::LocalFileSystem;
use serde_json::json;
use tempfile::tempdir;
use tracing::info;
use url::Url;

#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

const NUM_COMMITS: u64 = 1_000; // number of commit files to generate
const TOTAL_ACTIONS: u64 = 1_000_000; // total add/remove actions across all commits

/// Generate a delta log with NUM_COMMITS commit files and TOTAL_ACTIONS add/remove action pairs
fn generate_delta_log(path: &Path) -> Result<(), Box<dyn std::error::Error>> {
let delta_log_path = path.join("_delta_log");
create_dir_all(&delta_log_path)?;

let actions_per_commit = TOTAL_ACTIONS / NUM_COMMITS;
let mut current_file_id = 0u64;

for commit_id in 0..NUM_COMMITS {
let commit_filename = format!("{:020}.json", commit_id);
let commit_path = delta_log_path.join(&commit_filename);
let mut file = File::create(&commit_path)?;

let mut actions = Vec::new();

// Add metadata and protocol only for the first commit
if commit_id == 0 {
actions.push(json!({
"metaData": {
"id": format!("{:020}", commit_id),
"format": { "provider": "parquet", "options": {} },
"schemaString": r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#,
"partitionColumns": [],
"configuration": {}
}
}));
actions.push(json!({
"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 1
}
}));
}

// Generate add/remove action pairs for this commit
for _ in 0..actions_per_commit / 2 {
// Add action
actions.push(json!({
"add": {
"path": format!("{}.parquet", current_file_id),
"partitionValues": {},
"size": 1000000,
"modificationTime": commit_id * 1000,
"dataChange": true
}
}));

// Remove action (remove a previous file if we have any)
if current_file_id > 0 {
actions.push(json!({
"remove": {
"path": format!("{}.parquet", current_file_id - 1),
"deletionTimestamp": commit_id * 1000 + 500,
"dataChange": true
}
}));
}

current_file_id += 1;
}

// Write actions to file
for action in actions {
writeln!(file, "{}", action)?;
}
}

info!(
"Generated {} commit files with {} total actions",
NUM_COMMITS, TOTAL_ACTIONS
);
Ok(())
}

#[ignore = "mem-test - run manually"]
#[test]
fn test_dhat_large_table_log() -> Result<(), Box<dyn std::error::Error>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oof

Failed to get latest snapshot: ObjectStore(Generic { store: "LocalFileSystem", source: UnableToOpenFile { source: Os { code: 24, kind: Uncategorized, message: "Too many open files" }, path: "/var/folders/cr/cl0ybfwx6kjd8bybfc2mr4rw0000gp/T/.tmpapqcBg/_delta_log/00000000000000000749.json" } })

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah macos? i think might need to bump up ulimit

let dir = tempdir()?;
let table_path = dir.path();

info!(
"Generating delta log with {} commits and {} total actions",
NUM_COMMITS, TOTAL_ACTIONS
);
generate_delta_log(table_path)?;

let _profiler = dhat::Profiler::builder().testing().build();
let store = Arc::new(LocalFileSystem::new());
let url = Url::from_directory_path(table_path).unwrap();
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));

let snapshot = Snapshot::builder_for(url)
.build(&engine)
.expect("Failed to get latest snapshot");

let stats = dhat::HeapStats::get();
println!("Heap stats after PM replay:\n{:?}", stats);

let scan = snapshot
.scan_builder()
.build()
.expect("Failed to build scan");
let scan_metadata = scan
.scan_metadata(&engine)
.expect("Failed to get scan metadata");
for res in scan_metadata {
let _scan_metadata = res.expect("Failed to read scan metadata");
}

let stats = dhat::HeapStats::get();
println!("Heap stats after Scan replay:\n{:?}", stats);

Ok(())
}
Loading