Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"kernel/examples/*",
"test-utils",
"feature-tests",
"mem-test",
"uc-client", # WIP: this is an experimental UC client for catalog-managed table work
]
# note that in addition to the members above, the workspace includes examples:
Expand Down
25 changes: 25 additions & 0 deletions mem-test/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "mem-test"
edition.workspace = true
homepage.workspace = true
keywords.workspace = true
license.workspace = true
repository.workspace = true
readme.workspace = true
rust-version.workspace = true
version.workspace = true

# for cargo-release
[package.metadata.release]
release = false

[dependencies]
arrow = "56"
delta_kernel = { path = "../kernel", features = ["arrow", "default-engine-rustls"] }
dhat = "0.3"
object_store = "0.12.3"
rayon = "1.10"
serde_json = "1"
tempfile = "3"
tracing = "0.1"
url = "2"
3 changes: 3 additions & 0 deletions mem-test/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Memory testing.
//!
//! Run all tests with: `cargo test -p mem-test -- --ignored --nocapture`
168 changes: 168 additions & 0 deletions mem-test/tests/dhat_large_table_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
//! This tests our memory usage for reading tables with large data files.
//!
//! run with `cargo test -p mem-test dhat_large_table_data -- --ignored --nocapture`

use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::Path;
use std::sync::Arc;

use delta_kernel::arrow::array::{ArrayRef, Int64Array, StringArray};
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::parquet::arrow::ArrowWriter;
use delta_kernel::parquet::file::properties::WriterProperties;
use delta_kernel::Snapshot;

use arrow::compute::filter_record_batch;
use object_store::local::LocalFileSystem;
use serde_json::json;
use tempfile::tempdir;
use url::Url;

#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

const NUM_ROWS: u64 = 1_000_000;

/// write a 1M row parquet file that is 1GB in size
fn write_large_parquet_to(path: &Path) -> Result<(), Box<dyn std::error::Error>> {
let path = path.join("1.parquet");
let file = File::create(&path)?;

let i_col = Arc::new(Int64Array::from_iter_values(0..NUM_ROWS as i64)) as ArrayRef;
let s_col = (0..NUM_ROWS).map(|i| format!("val_{}_{}", i, "XYZ".repeat(350)));
let s_col = Arc::new(StringArray::from_iter_values(s_col)) as ArrayRef;
let rb = RecordBatch::try_from_iter(vec![("i", i_col.clone()), ("s", s_col.clone())])?;

let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(file, rb.schema(), Some(props))?;
writer.write(&rb)?;
let parquet_metadata = writer.close()?;

// read to show file sizes
let metadata = std::fs::metadata(&path)?;
let file_size = metadata.len();
let total_row_group_size: i64 = parquet_metadata
.row_groups
.iter()
.map(|rg| rg.total_byte_size)
.sum();
println!("File size (compressed file size): {} bytes", file_size);
println!(
"Total size (uncompressed file size): {} bytes",
total_row_group_size
);

Ok(())
}

/// create a _delta_log/00000000000000000000.json file with a single add file for our 1.parquet
/// above
fn create_commit(path: &Path) -> Result<(), Box<dyn std::error::Error>> {
let delta_log_path = path.join("_delta_log");
create_dir_all(&delta_log_path)?;
let commit_path = delta_log_path.join("00000000000000000000.json");
let mut file = File::create(&commit_path)?;

let actions = vec![
json!({
"metaData": {
"id": "00000000000000000000",
"format": {"provider": "parquet", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"i\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"s\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": [],
"configuration": {}
}
}),
json!({
"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 1
}
}),
json!({
"add": {
"path": "1.parquet",
"partitionValues": {},
"size": 1000000000,
"modificationTime": 0,
"dataChange": true
}
}),
];

for action in actions {
writeln!(file, "{}", action)?;
}

Ok(())
}

#[ignore = "mem-test - run manually"]
#[test]
fn test_dhat_large_table_data() -> Result<(), Box<dyn std::error::Error>> {
let dir = tempdir()?;
let table_path = dir.path();
let _profiler = dhat::Profiler::builder().testing().build();

// Step 1: Write the large parquet file
write_large_parquet_to(table_path)?;
let stats = dhat::HeapStats::get();
println!("Heap stats after writing parquet:\n{:?}", stats);

// Step 2: Create the Delta log
create_commit(table_path)?;

// Step 3: Create engine and snapshot
let store = Arc::new(LocalFileSystem::new());
let url = Url::from_directory_path(table_path).unwrap();
let engine = Arc::new(DefaultEngine::new(
store,
Arc::new(TokioBackgroundExecutor::new()),
));

let snapshot = Snapshot::builder_for(url)
.build(engine.as_ref())
.expect("Failed to create snapshot");

let stats = dhat::HeapStats::get();
println!("Heap stats after creating snapshot:\n{:?}", stats);

// Step 4: Build and execute scan
let scan = snapshot
.scan_builder()
.build()
.expect("Failed to build scan");

let stats = dhat::HeapStats::get();
println!("Heap stats after building scan:\n{:?}", stats);

// Step 5: Execute the scan and read data
let mut row_count = 0;
for scan_result in scan.execute(engine)? {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();

let batch = if let Some(mask) = mask {
filter_record_batch(&record_batch, &mask.into())?
} else {
record_batch
};
row_count += batch.num_rows();
}

let stats = dhat::HeapStats::get();
println!("Heap stats after scan execution:\n{:?}", stats);
println!("Total rows read: {}", row_count);

Ok(())
}
138 changes: 138 additions & 0 deletions mem-test/tests/dhat_large_table_log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//! This tests our memory usage for reading tables with large/many log files.
//!
//! run with `cargo test -p mem-test dhat_large_table_log -- --ignored --nocapture`

use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::Path;
use std::sync::Arc;

use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::Snapshot;
use object_store::local::LocalFileSystem;
use serde_json::json;
use tempfile::tempdir;
use tracing::info;
use url::Url;

#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

const NUM_COMMITS: u64 = 1_000; // number of commit files to generate
const TOTAL_ACTIONS: u64 = 1_000_000; // total add/remove actions across all commits

/// Generate a delta log with NUM_COMMITS commit files and TOTAL_ACTIONS add/remove action pairs
fn generate_delta_log(path: &Path) -> Result<(), Box<dyn std::error::Error>> {
let delta_log_path = path.join("_delta_log");
create_dir_all(&delta_log_path)?;

let actions_per_commit = TOTAL_ACTIONS / NUM_COMMITS;
let mut current_file_id = 0u64;

for commit_id in 0..NUM_COMMITS {
let commit_filename = format!("{:020}.json", commit_id);
let commit_path = delta_log_path.join(&commit_filename);
let mut file = File::create(&commit_path)?;

let mut actions = Vec::new();

// Add metadata and protocol only for the first commit
if commit_id == 0 {
actions.push(json!({
"metaData": {
"id": format!("{:020}", commit_id),
"format": { "provider": "parquet", "options": {} },
"schemaString": r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#,
"partitionColumns": [],
"configuration": {}
}
}));
actions.push(json!({
"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 1
}
}));
}

// Generate add/remove action pairs for this commit
for _ in 0..actions_per_commit / 2 {
// Add action
actions.push(json!({
"add": {
"path": format!("{}.parquet", current_file_id),
"partitionValues": {},
"size": 1000000,
"modificationTime": commit_id * 1000,
"dataChange": true
}
}));

// Remove action (remove a previous file if we have any)
if current_file_id > 0 {
actions.push(json!({
"remove": {
"path": format!("{}.parquet", current_file_id - 1),
"deletionTimestamp": commit_id * 1000 + 500,
"dataChange": true
}
}));
}

current_file_id += 1;
}

// Write actions to file
for action in actions {
writeln!(file, "{}", action)?;
}
}

info!(
"Generated {} commit files with {} total actions",
NUM_COMMITS, TOTAL_ACTIONS
);
Ok(())
}

#[ignore = "mem-test - run manually"]
#[test]
fn test_dhat_large_table_log() -> Result<(), Box<dyn std::error::Error>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oof

Failed to get latest snapshot: ObjectStore(Generic { store: "LocalFileSystem", source: UnableToOpenFile { source: Os { code: 24, kind: Uncategorized, message: "Too many open files" }, path: "/var/folders/cr/cl0ybfwx6kjd8bybfc2mr4rw0000gp/T/.tmpapqcBg/_delta_log/00000000000000000749.json" } })

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah macos? i think might need to bump up ulimit

let dir = tempdir()?;
let table_path = dir.path();

info!(
"Generating delta log with {} commits and {} total actions",
NUM_COMMITS, TOTAL_ACTIONS
);
generate_delta_log(table_path)?;

let _profiler = dhat::Profiler::builder().testing().build();
let store = Arc::new(LocalFileSystem::new());
let url = Url::from_directory_path(table_path).unwrap();
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));

let snapshot = Snapshot::builder_for(url)
.build(&engine)
.expect("Failed to get latest snapshot");

let stats = dhat::HeapStats::get();
println!("Heap stats after PM replay:\n{:?}", stats);

let scan = snapshot
.scan_builder()
.build()
.expect("Failed to build scan");
let scan_metadata = scan
.scan_metadata(&engine)
.expect("Failed to get scan metadata");
for res in scan_metadata {
let _scan_metadata = res.expect("Failed to read scan metadata");
}

let stats = dhat::HeapStats::get();
println!("Heap stats after Scan replay:\n{:?}", stats);

Ok(())
}
Loading