Skip to content

Commit 3ab33ee

Browse files
authored
Automatically create multiple files with single --part command (#175)
* refactor plan generation into PartitionOutputPlan * Automatically parallelize multiple part generation * Clarify output * fix: new clippy * Update metadata size for new arrow
1 parent 8ee6d16 commit 3ab33ee

5 files changed

Lines changed: 891 additions & 273 deletions

File tree

tpchgen-cli/src/main.rs

Lines changed: 64 additions & 227 deletions
Original file line numberDiff line numberDiff line change
@@ -4,55 +4,21 @@
44
//! API wise to the original dbgen tool, as in we use the same command line flags
55
//! and arguments.
66
//!
7-
//! ```
8-
//! USAGE:
9-
//! tpchgen-cli [OPTIONS]
10-
//!
11-
//! OPTIONS:
12-
//! -h, --help Prints help information
13-
//! -V, --version Prints version information
14-
//! -s, --scale-factor <FACTOR> Scale factor for the data generation (default: 1)
15-
//! -T, --tables <TABLES> Comma-separated list of tables to generate (default: all)
16-
//! -f, --format <FORMAT> Output format: tbl, csv, or parquet (default: tbl)
17-
//! -o, --output-dir <DIR> Output directory (default: current directory)
18-
//! -p, --parts <N> Number of parts to split generation into (default: 1)
19-
//! --part <N> Which part to generate (1-based, default: 1)
20-
//! -n, --num-threads <N> Number of threads to use (default: number of CPUs)
21-
//! -c, --parquet-compression <C> Parquet compression codec, e.g., SNAPPY, ZSTD(1), UNCOMPRESSED (default: SNAPPY)
22-
//! --parquet-row-group-size <N> Number of rows per row group in Parquet files (default: 1048576)
23-
//! -v, --verbose Verbose output
24-
//! --stdout Write output to stdout instead of files
25-
//!```
26-
//!
27-
//! # Logging:
28-
//! Use the `-v` flag or `RUST_LOG` environment variable to control logging output.
29-
//!
30-
//! `-v` sets the log level to `info` and ignores the `RUST_LOG` environment variable.
31-
//!
32-
//! # Examples
33-
//! ```
34-
//! # see all info output
35-
//! tpchgen-cli -s 1 -v
36-
//!
37-
//! # same thing using RUST_LOG
38-
//! RUST_LOG=info tpchgen-cli -s 1
39-
//!
40-
//! # see all debug output
41-
//! RUST_LOG=debug tpchgen -s 1
42-
//! ```
7+
//! See the documentation on [`Cli`] for more information on the command line
438
mod csv;
449
mod generate;
10+
mod output_plan;
4511
mod parquet;
4612
mod plan;
13+
mod runner;
4714
mod statistics;
4815
mod tbl;
4916

50-
use crate::csv::*;
51-
use crate::generate::{generate_in_chunks, Sink, Source};
17+
use crate::generate::Sink;
18+
use crate::output_plan::OutputPlanGenerator;
5219
use crate::parquet::*;
5320
use crate::plan::{GenerationPlan, DEFAULT_PARQUET_ROW_GROUP_BYTES};
5421
use crate::statistics::WriteStatistics;
55-
use crate::tbl::*;
5622
use ::parquet::basic::Compression;
5723
use clap::builder::TypedValueParser;
5824
use clap::{Parser, ValueEnum};
@@ -64,20 +30,39 @@ use std::path::PathBuf;
6430
use std::str::FromStr;
6531
use std::time::Instant;
6632
use tpchgen::distribution::Distributions;
67-
use tpchgen::generators::{
68-
CustomerGenerator, LineItemGenerator, NationGenerator, OrderGenerator, PartGenerator,
69-
PartSuppGenerator, RegionGenerator, SupplierGenerator,
70-
};
7133
use tpchgen::text::TextPool;
72-
use tpchgen_arrow::{
73-
CustomerArrow, LineItemArrow, NationArrow, OrderArrow, PartArrow, PartSuppArrow,
74-
RecordBatchIterator, RegionArrow, SupplierArrow,
75-
};
7634

7735
#[derive(Parser)]
7836
#[command(name = "tpchgen")]
7937
#[command(version)]
80-
#[command(about = "TPC-H Data Generator", long_about = None)]
38+
#[command(
39+
// -h output
40+
about = "TPC-H Data Generator",
41+
// --help output
42+
long_about = r#"
43+
TPCH Data Generator (https://github.com/clflushopt/tpchgen-rs)
44+
45+
By default each table is written to a single file named <output_dir>/<table>.<format>
46+
47+
If `--part` option is specified, each table is written to a subdirectory in
48+
multiple files named <output_dir>/<table>/<table>.<part>.<format>
49+
50+
Examples
51+
52+
# Generate all tables at scale factor 1 (1GB) in TBL format to /tmp/tpch directory:
53+
54+
tpchgen-cli -s 1 --output-dir=/tmp/tpch
55+
56+
# Generate the lineitem table at scale factor 100 in 10 Apache Parquet files to
57+
# /tmp/tpch/lineitem
58+
59+
tpchgen-cli -s 100 --tables=lineitem --format=parquet --parts=10 --output-dir=/tmp/tpch
60+
61+
# Generate scale factor one in current directory, seeing debug output
62+
63+
RUST_LOG=debug tpchgen -s 1
64+
"#
65+
)]
8166
struct Cli {
8267
/// Scale factor to create
8368
#[arg(short, long, default_value_t = 1.)]
@@ -91,13 +76,11 @@ struct Cli {
9176
#[arg(short = 'T', long = "tables", value_delimiter = ',', value_parser = TableValueParser)]
9277
tables: Option<Vec<Table>>,
9378

94-
/// Number of part(itions) to generate (manual parallel generation)
79+
/// Number of part(itions) to generate. If not specified creates a single file per table
9580
#[arg(short, long)]
9681
parts: Option<i32>,
9782

98-
/// Which part(ition) to generate (1-based)
99-
///
100-
/// If not specified, generates all parts
83+
/// Which part(ition) to generate (1-based). If not specified, generates all parts
10184
#[arg(long)]
10285
part: Option<i32>,
10386

@@ -126,6 +109,9 @@ struct Cli {
126109
parquet_compression: Compression,
127110

128111
/// Verbose output
112+
///
113+
/// When specified, sets the log level to `info` and ignores the `RUST_LOG`
114+
/// environment variable. When not specified, uses `RUST_LOG`
129115
#[arg(short, long, default_value_t = false)]
130116
verbose: bool,
131117

@@ -136,11 +122,11 @@ struct Cli {
136122
/// Target size in row group bytes in Parquet files
137123
///
138124
/// Row groups are the typical unit of parallel processing and compression
139-
/// in Parquet. With many query engines, smaller row groups enable better
125+
/// with many query engines. Therfore, smaller row groups enable better
140126
/// parallelism and lower peak memory use but may reduce compression
141127
/// efficiency.
142128
///
143-
/// Note: parquet files are limited to 32k row groups, so at high scale
129+
/// Note: Parquet files are limited to 32k row groups, so at high scale
144130
/// factors, the row group size may be increased to keep the number of row
145131
/// groups under this limit.
146132
///
@@ -259,46 +245,6 @@ async fn main() -> io::Result<()> {
259245
cli.main().await
260246
}
261247

262-
/// macro to create a Cli function for generating a table
263-
///
264-
/// Arguments:
265-
/// $FUN_NAME: name of the function to create
266-
/// $TABLE: The [`Table`] to generate
267-
/// $GENERATOR: The generator type to use
268-
/// $TBL_SOURCE: The [`Source`] type to use for TBL format
269-
/// $CSV_SOURCE: The [`Source`] type to use for CSV format
270-
/// $PARQUET_SOURCE: The [`RecordBatchIterator`] type to use for Parquet format
271-
macro_rules! define_generate {
272-
($FUN_NAME:ident, $TABLE:expr, $GENERATOR:ident, $TBL_SOURCE:ty, $CSV_SOURCE:ty, $PARQUET_SOURCE:ty) => {
273-
async fn $FUN_NAME(&self) -> io::Result<()> {
274-
let filename = self.output_filename($TABLE);
275-
let plan = GenerationPlan::try_new(
276-
&$TABLE,
277-
self.format,
278-
self.scale_factor,
279-
self.part,
280-
self.parts,
281-
self.parquet_row_group_bytes,
282-
)
283-
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
284-
let scale_factor = self.scale_factor;
285-
info!("Writing table {} (SF={scale_factor}) to {filename}", $TABLE);
286-
debug!("Plan: {plan}");
287-
let gens = plan
288-
.into_iter()
289-
.map(move |(part, num_parts)| $GENERATOR::new(scale_factor, part, num_parts));
290-
match self.format {
291-
OutputFormat::Tbl => self.go(&filename, gens.map(<$TBL_SOURCE>::new)).await,
292-
OutputFormat::Csv => self.go(&filename, gens.map(<$CSV_SOURCE>::new)).await,
293-
OutputFormat::Parquet => {
294-
self.go_parquet(&filename, gens.map(<$PARQUET_SOURCE>::new))
295-
.await
296-
}
297-
}
298-
}
299-
};
300-
}
301-
302248
impl Cli {
303249
/// Main function to run the generation
304250
async fn main(self) -> io::Result<()> {
@@ -332,15 +278,6 @@ impl Cli {
332278
]
333279
};
334280

335-
// force the creation of the distributions and text pool to so it doesn't
336-
// get charged to the first table
337-
let start = Instant::now();
338-
debug!("Creating distributions and text pool");
339-
Distributions::static_default();
340-
TextPool::get_or_init_default();
341-
let elapsed = start.elapsed();
342-
info!("Created static distributions and text pools in {elapsed:?}");
343-
344281
// Warn if parquet specific options are set but not generating parquet
345282
if self.format != OutputFormat::Parquet {
346283
if self.parquet_compression != Compression::SNAPPY {
@@ -355,136 +292,36 @@ impl Cli {
355292
}
356293
}
357294

358-
// Generate each table
295+
// Determine what files to generate
296+
let mut output_plan_generator = OutputPlanGenerator::new(
297+
self.format,
298+
self.scale_factor,
299+
self.parquet_compression,
300+
self.parquet_row_group_bytes,
301+
self.stdout,
302+
self.output_dir.clone(),
303+
);
304+
359305
for table in tables {
360-
match table {
361-
Table::Nation => self.generate_nation().await?,
362-
Table::Region => self.generate_region().await?,
363-
Table::Part => self.generate_part().await?,
364-
Table::Supplier => self.generate_supplier().await?,
365-
Table::Partsupp => self.generate_partsupp().await?,
366-
Table::Customer => self.generate_customer().await?,
367-
Table::Orders => self.generate_orders().await?,
368-
Table::Lineitem => self.generate_lineitem().await?,
369-
}
306+
output_plan_generator.generate_plans(table, self.part, self.parts)?;
370307
}
308+
let output_plans = output_plan_generator.build();
309+
310+
// force the creation of the distributions and text pool to so it doesn't
311+
// get charged to the first table
312+
let start = Instant::now();
313+
debug!("Creating distributions and text pool");
314+
Distributions::static_default();
315+
TextPool::get_or_init_default();
316+
let elapsed = start.elapsed();
317+
info!("Created static distributions and text pools in {elapsed:?}");
371318

319+
// Run
320+
let runner = runner::PlanRunner::new(output_plans, self.num_threads);
321+
runner.run().await?;
372322
info!("Generation complete!");
373323
Ok(())
374324
}
375-
376-
define_generate!(
377-
generate_nation,
378-
Table::Nation,
379-
NationGenerator,
380-
NationTblSource,
381-
NationCsvSource,
382-
NationArrow
383-
);
384-
define_generate!(
385-
generate_region,
386-
Table::Region,
387-
RegionGenerator,
388-
RegionTblSource,
389-
RegionCsvSource,
390-
RegionArrow
391-
);
392-
define_generate!(
393-
generate_part,
394-
Table::Part,
395-
PartGenerator,
396-
PartTblSource,
397-
PartCsvSource,
398-
PartArrow
399-
);
400-
define_generate!(
401-
generate_supplier,
402-
Table::Supplier,
403-
SupplierGenerator,
404-
SupplierTblSource,
405-
SupplierCsvSource,
406-
SupplierArrow
407-
);
408-
define_generate!(
409-
generate_partsupp,
410-
Table::Partsupp,
411-
PartSuppGenerator,
412-
PartSuppTblSource,
413-
PartSuppCsvSource,
414-
PartSuppArrow
415-
);
416-
define_generate!(
417-
generate_customer,
418-
Table::Customer,
419-
CustomerGenerator,
420-
CustomerTblSource,
421-
CustomerCsvSource,
422-
CustomerArrow
423-
);
424-
define_generate!(
425-
generate_orders,
426-
Table::Orders,
427-
OrderGenerator,
428-
OrderTblSource,
429-
OrderCsvSource,
430-
OrderArrow
431-
);
432-
define_generate!(
433-
generate_lineitem,
434-
Table::Lineitem,
435-
LineItemGenerator,
436-
LineItemTblSource,
437-
LineItemCsvSource,
438-
LineItemArrow
439-
);
440-
441-
/// return the output filename for the given table
442-
fn output_filename(&self, table: Table) -> String {
443-
let extension = match self.format {
444-
OutputFormat::Tbl => "tbl",
445-
OutputFormat::Csv => "csv",
446-
OutputFormat::Parquet => "parquet",
447-
};
448-
format!("{}.{extension}", table.name())
449-
}
450-
451-
/// return a file for writing the given filename in the output directory
452-
fn new_output_file(&self, filename: &str) -> io::Result<File> {
453-
let path = self.output_dir.join(filename);
454-
File::create(path)
455-
}
456-
457-
/// Generates the output file from the sources
458-
async fn go<I>(&self, filename: &str, sources: I) -> Result<(), io::Error>
459-
where
460-
I: Iterator<Item: Source> + 'static,
461-
{
462-
// Since generate_in_chunks already buffers, there is no need to buffer again
463-
if self.stdout {
464-
let sink = WriterSink::new(io::stdout());
465-
generate_in_chunks(sink, sources, self.num_threads).await
466-
} else {
467-
let sink = WriterSink::new(self.new_output_file(filename)?);
468-
generate_in_chunks(sink, sources, self.num_threads).await
469-
}
470-
}
471-
472-
/// Generates an output parquet file from the sources
473-
async fn go_parquet<I>(&self, filename: &str, sources: I) -> Result<(), io::Error>
474-
where
475-
I: Iterator<Item: RecordBatchIterator> + 'static,
476-
{
477-
if self.stdout {
478-
// write to stdout
479-
let writer = BufWriter::with_capacity(32 * 1024 * 1024, io::stdout()); // 32MB buffer
480-
generate_parquet(writer, sources, self.num_threads, self.parquet_compression).await
481-
} else {
482-
// write to a file
483-
let file = self.new_output_file(filename)?;
484-
let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); // 32MB buffer
485-
generate_parquet(writer, sources, self.num_threads, self.parquet_compression).await
486-
}
487-
}
488325
}
489326

490327
impl IntoSize for BufWriter<Stdout> {

0 commit comments

Comments
 (0)