Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tpchgen-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ futures = "0.3.31"
num_cpus = "1.0"
log = "0.4.26"
env_logger = "0.11.7"

[dev-dependencies]
assert_cmd = "2.0"
predicates = "3.0"
tempfile = "3.20.0"
flate2 = "1.1.0"
201 changes: 201 additions & 0 deletions tpchgen-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
use assert_cmd::Command;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
use std::fs;
use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::sync::Arc;
use tempfile::tempdir;
use tpchgen::generators::OrderGenerator;
use tpchgen_arrow::{OrderArrow, RecordBatchIterator};

/// Test TBL output for scale factor 0.001 using tpchgen-cli
#[test]
fn test_tpchgen_cli_tbl_scale_factor_0_001() {
// Create a temporary directory
let temp_dir = tempdir().expect("Failed to create temporary directory");

// Run the tpchgen-cli command
Command::cargo_bin("tpchgen-cli")
.expect("Binary not found")
.arg("--scale-factor")
.arg("0.001")
.arg("--output-dir")
.arg(temp_dir.path())
.assert()
.success();

// List of expected files
let expected_files = vec![
"customer.tbl",
"lineitem.tbl",
"nation.tbl",
"orders.tbl",
"part.tbl",
"partsupp.tbl",
"region.tbl",
"supplier.tbl",
];

// Verify that all expected files are created
for file in &expected_files {
let generated_file = temp_dir.path().join(file);
assert!(
generated_file.exists(),
"File {:?} does not exist",
generated_file
);
let generated_contents = fs::read(generated_file).expect("Failed to read generated file");
let generated_contents = String::from_utf8(generated_contents)
.expect("Failed to convert generated contents to string");

// load the reference file
let reference_file = format!("../tpchgen/data/sf-0.001/{}.gz", file);
let reference_contents = match read_gzipped_file_to_string(&reference_file) {
Ok(contents) => contents,
Err(e) => {
panic!("Failed to read reference file {reference_file}: {e}");
}
};

assert_eq!(
generated_contents, reference_contents,
"Contents of {:?} do not match reference",
file
);
}
}

/// Test generating the order table using --parts and --part options
#[test]
Comment thread
alamb marked this conversation as resolved.
fn test_tpchgen_cli_parts() {
// Create a temporary directory
let temp_dir = tempdir().expect("Failed to create temporary directory");

// generate 4 parts of the orders table with scale factor 0.001
// into directories /part1, /part2, /part3, /part4
let num_parts = 4;
for part in 1..=num_parts {
let part_dir = temp_dir.path().join(format!("part{part}"));
fs::create_dir(&part_dir).expect("Failed to create part directory");

// Run the tpchgen-cli command for each part
Command::cargo_bin("tpchgen-cli")
.expect("Binary not found")
.arg("--scale-factor")
.arg("0.001")
.arg("--output-dir")
.arg(&part_dir)
.arg("--parts")
.arg(num_parts.to_string())
.arg("--part")
.arg(part.to_string())
.arg("--tables")
.arg("orders")
.assert()
.success();
}

// printout the contents of the temp directory
println!(
"Temporary directory contents: {:?}",
fs::read_dir(temp_dir.path())
.expect("Failed to read temporary directory")
.map(|entry| entry.expect("Failed to read entry").file_name())
.collect::<Vec<_>>()
);

// Read the generated files into a single buffer and compare them
// to the contents of the reference file
let mut output_contents = Vec::new();
for part in 1..=4 {
let generated_file = temp_dir
.path()
.join(format!("part{part}"))
.join("orders.tbl");
assert!(
generated_file.exists(),
"File {:?} does not exist",
generated_file
);
let generated_contents =
fs::read_to_string(generated_file).expect("Failed to read generated file");
output_contents.append(&mut generated_contents.into_bytes());
}
let output_contents =
String::from_utf8(output_contents).expect("Failed to convert output contents to string");

// load the reference file
let reference_file = read_reference_file("orders", "0.001");
assert_eq!(output_contents, reference_file);
}

#[tokio::test]
async fn test_write_parquet_orders() {
// Run the CLI command to generate parquet data
let output_dir = tempdir().unwrap();
let output_path = output_dir.path().join("orders.parquet");
Command::cargo_bin("tpchgen-cli")
.expect("Binary not found")
.arg("--format")
.arg("parquet")
.arg("--tables")
.arg("orders")
.arg("--scale-factor")
.arg("0.001")
.arg("--output-dir")
.arg(output_dir.path())
.assert()
.success();

let batch_size = 4000;

// Create the reference Arrow data using OrderArrow
let generator = OrderGenerator::new(0.001, 1, 1);
let mut arrow_generator = OrderArrow::new(generator).with_batch_size(batch_size);

// Read the generated parquet file
let file = File::open(&output_path).expect("Failed to open parquet file");
let options = ArrowReaderOptions::new().with_schema(Arc::clone(arrow_generator.schema()));

let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
.expect("Failed to create ParquetRecordBatchReaderBuilder")
.with_batch_size(batch_size)
.build()
.expect("Failed to build ParquetRecordBatchReader");

// Compare the record batches
for batch in reader {
let parquet_batch = batch.expect("Failed to read record batch from parquet");
let arrow_batch = arrow_generator
.next()
.expect("Failed to generate record batch from OrderArrow");
assert_eq!(
parquet_batch, arrow_batch,
"Mismatch between parquet and arrow record batches"
);
}
}

fn read_gzipped_file_to_string<P: AsRef<Path>>(path: P) -> Result<String, std::io::Error> {
let file = File::open(path)?;
let mut decoder = flate2::read::GzDecoder::new(file);
let mut contents = Vec::new();
decoder.read_to_end(&mut contents)?;
let contents = String::from_utf8(contents)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
Ok(contents)
}

/// Reads the reference file for the specified table and scale factor.
///
/// example usage: `read_reference_file("orders", "0.001")`
fn read_reference_file(table_name: &str, scale_factor: &str) -> String {
let reference_file = format!("../tpchgen/data/sf-{scale_factor}/{table_name}.tbl.gz");
match read_gzipped_file_to_string(&reference_file) {
Ok(contents) => contents,
Err(e) => {
panic!("Failed to read reference file {reference_file}: {e}");
}
}
}
Loading