Skip to content

Commit c61a672

Browse files
committed
refactor
1 parent a917c54 commit c61a672

8 files changed

Lines changed: 300 additions & 124 deletions

File tree

tpchgen-cli/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ for PART in `seq 2 3`; do
7272
done
7373
```
7474

75+
By default `tpchgen-cli` shows a per-table progress bar on stderr while data
76+
is generated. Pass `--no-progress` to disable it (it is also disabled
77+
automatically when `--quiet` is set, when `--stdout` is used, or when stderr
78+
is not a terminal, e.g. in CI logs).
79+
7580
## Performance
7681

7782
| Scale Factor | `tpchgen-cli` | DuckDB | DuckDB (proprietary) |

tpchgen-cli/bin/main.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use clap::builder::TypedValueParser;
1111
use clap::{ArgAction, Parser};
1212
use log::{info, LevelFilter};
13-
use std::io;
13+
use std::io::{self, IsTerminal};
1414
use std::path::PathBuf;
1515
use std::str::FromStr;
1616
use tpchgen_cli::{
@@ -120,8 +120,11 @@ struct CommonArgs {
120120
#[arg(long, default_value_t = false)]
121121
stdout: bool,
122122

123-
/// Disable progress bars during data generation
124-
#[arg(long = "no-progress", action = ArgAction::SetFalse, default_value_t = true)]
123+
/// Disable progress bars during data generation.
124+
///
125+
/// Progress bars are on by default; passing `--no-progress` flips this
126+
/// field to `false` via `ArgAction::SetFalse`. Also suppressed by `--quiet`.
127+
#[arg(long = "no-progress", action = ArgAction::SetFalse)]
125128
progress: bool,
126129
}
127130

tpchgen-cli/src/generate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub trait Sink: Send {
5050
/// G: Generator
5151
/// I: Iterator<Item = G>
5252
/// S: Sink that writes buffers somewhere
53-
pub async fn generate_in_chunks<G, I, S>(
53+
pub(crate) async fn generate_in_chunks<G, I, S>(
5454
mut sink: S,
5555
sources: I,
5656
num_threads: usize,

tpchgen-cli/src/lib.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub mod generate;
3131
pub mod output_plan;
3232
pub mod parquet;
3333
pub mod plan;
34-
pub mod progress;
34+
pub(crate) mod progress;
3535
pub mod runner;
3636
pub mod statistics;
3737
pub mod tbl;
@@ -255,7 +255,9 @@ impl Default for GeneratorConfig {
255255
part: None,
256256
stdout: false,
257257
csv_delimiter: ',',
258-
show_progress: true,
258+
// Off by default for library users. The CLI opts in via
259+
// `--no-progress` semantics on top of this.
260+
show_progress: false,
259261
}
260262
}
261263
}
@@ -342,7 +344,7 @@ impl TpchGenerator {
342344
/// ```
343345
pub async fn generate(self) -> io::Result<()> {
344346
use crate::output_plan::OutputPlanGenerator;
345-
use crate::runner::PlanRunner;
347+
use crate::runner::{build_progress_tracker, PlanRunner};
346348
use log::info;
347349
use std::time::Instant;
348350
use tpchgen::distribution::Distributions;
@@ -396,7 +398,10 @@ impl TpchGenerator {
396398
info!("Created static distributions and text pools in {elapsed:?}");
397399

398400
// Run
399-
let runner = PlanRunner::new(output_plans, config.num_threads, config.show_progress);
401+
let tracker = config
402+
.show_progress
403+
.then(|| build_progress_tracker(&output_plans));
404+
let runner = PlanRunner::with_tracker(output_plans, config.num_threads, tracker);
400405
runner.run().await?;
401406
info!("Generation complete!");
402407
Ok(())

tpchgen-cli/src/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub trait IntoSize {
2929
///
3030
/// Note the input is an iterator of [`RecordBatchIterator`]; The batches
3131
/// produced by each iterator is encoded as its own row group.
32-
pub async fn generate_parquet<W: Write + Send + IntoSize + 'static, I>(
32+
pub(crate) async fn generate_parquet<W: Write + Send + IntoSize + 'static, I>(
3333
writer: W,
3434
iter_iter: I,
3535
num_threads: usize,

tpchgen-cli/src/progress.rs

Lines changed: 105 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,101 @@
1-
//! Progress tracking for table generation
1+
//! Progress tracking for table generation.
2+
//!
3+
//! # Design
4+
//!
5+
//! [`ProgressTracker`] owns one [`ProgressBar`] per table, grouped under a
6+
//! shared [`MultiProgress`]. It is intentionally minimal:
7+
//!
8+
//! - **Clone-cheap.** The tracker is a single [`Arc`], so each worker task
9+
//! gets its own clone without contention. There is no `Mutex` — the
10+
//! [`ProgressBar`] type from `indicatif` is already internally
11+
//! thread-safe, so external locking would only add lock contention on
12+
//! the per-chunk hot path.
13+
//! - **Pre-sized at construction.** Total chunk counts are fixed up front
14+
//! via the `IntoIterator` API so worker tasks never need to mutate the
15+
//! table map.
16+
//! - **Cached style.** The progress-bar template is built once via
17+
//! [`OnceLock`] and cloned per bar.
18+
//! - **Unknown-table no-op.** [`ProgressTracker::increment`] silently does
19+
//! nothing if the table was not registered. This keeps the skip-existing
20+
//! path safe against trackers built for a different set of plans.
21+
//!
22+
//! The type is `pub(crate)` while the public-trait design is worked out
23+
//! upstream (see issue #233).
224
325
use crate::Table;
426
use indicatif::{MultiProgress, ProgressBar, ProgressFinish, ProgressStyle};
527
use std::collections::HashMap;
6-
use std::sync::{Arc, Mutex};
28+
use std::sync::{Arc, OnceLock};
729

8-
/// Tracks progress for all tables being generated
30+
/// Tracks progress for all tables being generated.
31+
///
32+
/// Cheap to clone (single `Arc`). All updates are routed to per-table
33+
/// [`ProgressBar`]s, which are themselves internally thread-safe, so no
34+
/// external locking is needed.
35+
///
36+
/// Internal type. See issue #233 for the public-trait follow-up.
937
#[derive(Clone, Debug)]
10-
pub struct ProgressTracker {
38+
pub(crate) struct ProgressTracker {
1139
inner: Arc<ProgressTrackerInner>,
1240
}
1341

1442
#[derive(Debug)]
1543
struct ProgressTrackerInner {
16-
tables: Mutex<HashMap<Table, ProgressBar>>,
44+
tables: HashMap<Table, ProgressBar>,
1745
// MultiProgress must be kept alive to manage the registered progress bars
1846
_multi_progress: MultiProgress,
1947
}
2048

49+
fn bar_style() -> &'static ProgressStyle {
50+
static STYLE: OnceLock<ProgressStyle> = OnceLock::new();
51+
STYLE.get_or_init(|| {
52+
ProgressStyle::default_bar()
53+
.template("{msg:10} [{bar:28}] Progress: {percent:>3}%")
54+
.expect("static progress bar template is valid")
55+
.progress_chars("█▓░")
56+
})
57+
}
58+
2159
impl ProgressTracker {
2260
/// Create a new progress tracker for the given tables.
2361
///
2462
/// Each entry is `(table, total_chunks)`. Progress is incremented one
2563
/// unit per generated chunk via [`ProgressTracker::increment`].
26-
pub fn new(tables: Vec<(Table, u64)>) -> Self {
64+
pub(crate) fn new<I: IntoIterator<Item = (Table, u64)>>(tables: I) -> Self {
2765
let multi_progress = MultiProgress::new();
2866
let mut table_map = HashMap::new();
2967

3068
for (table, total_chunks) in tables {
3169
let pb = multi_progress.add(ProgressBar::new(total_chunks));
32-
pb.set_style(
33-
ProgressStyle::default_bar()
34-
.template("{msg:10} [{bar:28}] Progress: {percent:>3}%")
35-
.unwrap()
36-
.progress_chars("█▓░"),
37-
);
38-
pb.set_message(format!("{}", table));
70+
pb.set_style(bar_style().clone());
71+
pb.set_message(table.to_string());
3972
let pb = pb.with_finish(ProgressFinish::AndLeave);
4073
table_map.insert(table, pb);
4174
}
4275

4376
Self {
4477
inner: Arc::new(ProgressTrackerInner {
45-
tables: Mutex::new(table_map),
78+
tables: table_map,
4679
_multi_progress: multi_progress,
4780
}),
4881
}
4982
}
5083

51-
pub fn increment(&self, table: Table, chunks: u64) {
52-
let tables = self.inner.tables.lock().unwrap();
53-
if let Some(pb) = tables.get(&table) {
84+
/// Advance the progress bar for `table` by `chunks` units.
85+
pub(crate) fn increment(&self, table: Table, chunks: u64) {
86+
if let Some(pb) = self.inner.tables.get(&table) {
5487
pb.inc(chunks);
5588
}
5689
}
5790

58-
pub fn finish(&self, table: Table) {
59-
let tables = self.inner.tables.lock().unwrap();
60-
if let Some(pb) = tables.get(&table) {
91+
/// Mark every bar as finished. Each bar uses
92+
/// [`ProgressFinish::AndLeave`] so dropping the tracker also finalizes
93+
/// them, but calling this explicitly at the end of a run guarantees
94+
/// the terminal reflects the final state before the caller returns —
95+
/// useful for tests that inspect captured stderr and for short runs
96+
/// that might exit before the drop is observed.
97+
pub(crate) fn finish(&self) {
98+
for pb in self.inner.tables.values() {
6199
pb.finish();
62100
}
63101
}
@@ -69,10 +107,7 @@ mod tests {
69107

70108
#[test]
71109
fn test_progress_tracker_creation() {
72-
let tracker = ProgressTracker::new(vec![
73-
(Table::Lineitem, 60),
74-
(Table::Orders, 15),
75-
]);
110+
let tracker = ProgressTracker::new(vec![(Table::Lineitem, 60), (Table::Orders, 15)]);
76111
tracker.increment(Table::Lineitem, 1);
77112
}
78113

@@ -83,4 +118,50 @@ mod tests {
83118
tracker.increment(Table::Customer, 1);
84119
}
85120
}
121+
122+
#[test]
123+
fn test_progress_tracker_reaches_total() {
124+
let tracker = ProgressTracker::new(vec![(Table::Orders, 5), (Table::Lineitem, 8)]);
125+
for _ in 0..5 {
126+
tracker.increment(Table::Orders, 1);
127+
}
128+
tracker.increment(Table::Lineitem, 8);
129+
130+
let bars = &tracker.inner.tables;
131+
assert_eq!(bars[&Table::Orders].position(), 5);
132+
assert_eq!(bars[&Table::Lineitem].position(), 8);
133+
}
134+
135+
#[test]
136+
fn test_progress_tracker_ignores_unknown_table() {
137+
// Incrementing a table not registered at construction time is a no-op,
138+
// not a panic. This matters for the skip-existing path which may run
139+
// against a tracker built for a different set of plans.
140+
let tracker = ProgressTracker::new(vec![(Table::Orders, 1)]);
141+
tracker.increment(Table::Lineitem, 1);
142+
assert_eq!(tracker.inner.tables[&Table::Orders].position(), 0);
143+
}
144+
145+
#[test]
146+
fn test_progress_tracker_is_clone_cheap() {
147+
// ProgressTracker should be clone-cheap so it can be passed to each
148+
// worker task without contention. Clones must share the same inner.
149+
let tracker = ProgressTracker::new(vec![(Table::Region, 4)]);
150+
let clone = tracker.clone();
151+
tracker.increment(Table::Region, 2);
152+
clone.increment(Table::Region, 1);
153+
assert_eq!(tracker.inner.tables[&Table::Region].position(), 3);
154+
}
155+
156+
#[test]
157+
fn test_progress_tracker_finish_marks_bars_finished() {
158+
// After finish(), each bar should report itself finished so callers
159+
// (and tests reading captured output) see a deterministic end state.
160+
let tracker = ProgressTracker::new(vec![(Table::Nation, 3), (Table::Region, 2)]);
161+
tracker.increment(Table::Nation, 1);
162+
tracker.finish();
163+
for pb in tracker.inner.tables.values() {
164+
assert!(pb.is_finished());
165+
}
166+
}
86167
}

0 commit comments

Comments
 (0)