Skip to content

Commit abef702

Browse files
authored
Merge pull request #6 from OrlovEvgeny/feat/add_spinner
feat: add spinner progress indicator to all commands
2 parents 8f11c6a + 719c1f5 commit abef702

File tree

20 files changed

+231
-129
lines changed

20 files changed

+231
-129
lines changed

src/commands/cat.rs

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
use crate::cli::CatArgs;
2-
use crate::input::resolve_inputs_with_config;
2+
use crate::input::resolve_inputs_report;
33
use crate::output::OutputConfig;
44
use arrow::array::{ArrayRef, RecordBatch, StringArray, UInt32Array, new_null_array};
55
use arrow::compute::{SortColumn, SortOptions, concat_batches, lexsort_to_indices, take};
66
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
7-
use indicatif::{ProgressBar, ProgressStyle};
87
use parquet::arrow::ArrowWriter;
98
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
109
use std::fs::File;
1110
use std::sync::Arc;
1211

1312
pub fn execute(args: &CatArgs, output: &mut OutputConfig) -> miette::Result<()> {
14-
let sources = resolve_inputs_with_config(&args.files, &output.cloud_config)?;
13+
let sp = output.spinner("Loading");
14+
let sources = resolve_inputs_report(&args.files, &output.cloud_config, &mut |msg| {
15+
sp.set_message(msg);
16+
})?;
17+
sp.finish_and_clear();
1518
let out_path = output
1619
.output_path()
1720
.ok_or_else(|| miette::miette!("cat requires an output file (-o <path>)"))?
@@ -23,7 +26,6 @@ pub fn execute(args: &CatArgs, output: &mut OutputConfig) -> miette::Result<()>
2326
}
2427

2528
let output_schema: SchemaRef = if args.union_by_name {
26-
// unified schema across all files, preserving column order
2729
let mut fields: Vec<Arc<Field>> = Vec::new();
2830
let mut seen_names: Vec<String> = Vec::new();
2931

@@ -44,7 +46,6 @@ pub fn execute(args: &CatArgs, output: &mut OutputConfig) -> miette::Result<()>
4446

4547
Arc::new(Schema::new(fields))
4648
} else {
47-
// Use schema from first file
4849
let first_file = File::open(sources[0].path())
4950
.map_err(|e| miette::miette!("cannot open '{}': {}", sources[0].display_name(), e))?;
5051
let first_builder = ParquetRecordBatchReaderBuilder::try_new(first_file)
@@ -63,17 +64,7 @@ pub fn execute(args: &CatArgs, output: &mut OutputConfig) -> miette::Result<()>
6364
let mut all_batches: Vec<RecordBatch> = Vec::new();
6465
let mut total_rows: usize = 0;
6566

66-
let spinner = if output.is_tty && !output.quiet && sources.len() > 1 {
67-
let pb = ProgressBar::new(sources.len() as u64);
68-
pb.set_style(
69-
ProgressStyle::default_bar()
70-
.template(" Concatenating [{bar:20}] {pos}/{len} files")
71-
.unwrap(),
72-
);
73-
Some(pb)
74-
} else {
75-
None
76-
};
67+
let progress = output.progress_bar("Concatenating", sources.len() as u64);
7768

7869
for source in &sources {
7970
let file = File::open(source.path())
@@ -115,13 +106,9 @@ pub fn execute(args: &CatArgs, output: &mut OutputConfig) -> miette::Result<()>
115106

116107
all_batches.push(final_batch);
117108
}
118-
if let Some(ref pb) = spinner {
119-
pb.inc(1);
120-
}
121-
}
122-
if let Some(ref pb) = spinner {
123-
pb.finish_and_clear();
109+
progress.inc(1);
124110
}
111+
drop(progress);
125112

126113
let write_batches: Vec<RecordBatch> = if let Some(ref sort_by) = args.sort_by {
127114
if all_batches.is_empty() {
@@ -189,7 +176,6 @@ pub fn execute(args: &CatArgs, output: &mut OutputConfig) -> miette::Result<()>
189176
Ok(())
190177
}
191178

192-
/// Align a batch to a target schema by reordering columns and filling missing ones with nulls.
193179
fn align_batch_to_schema(
194180
batch: &RecordBatch,
195181
file_schema: &SchemaRef,
@@ -207,7 +193,6 @@ fn align_batch_to_schema(
207193
{
208194
columns.push(batch.column(idx).clone());
209195
} else {
210-
// Column not present in this file — fill with nulls
211196
columns.push(new_null_array(target_field.data_type(), num_rows));
212197
}
213198
}
@@ -216,7 +201,6 @@ fn align_batch_to_schema(
216201
.map_err(|e| miette::miette!("failed to build aligned batch: {}", e))
217202
}
218203

219-
/// Add a `_filename` column to a batch.
220204
fn add_filename_column(
221205
batch: &RecordBatch,
222206
filename: &str,

src/commands/check.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::cli::CheckArgs;
22
use crate::error::PqError;
33
use crate::input::cloud::is_cloud_url;
4-
use crate::input::{resolve_inputs, resolve_inputs_with_config};
4+
use crate::input::{resolve_inputs, resolve_inputs_report};
55
use crate::output::{OutputConfig, OutputFormat};
66
use crate::parquet_ext::metadata;
77
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
@@ -75,7 +75,12 @@ fn parse_size_bytes(s: &str) -> Option<u64> {
7575

7676
pub fn execute(args: &CheckArgs, output: &mut OutputConfig) -> miette::Result<()> {
7777
let sources = if args.files.iter().any(|f| is_cloud_url(f)) {
78-
resolve_inputs_with_config(&args.files, &output.cloud_config)?
78+
let sp = output.spinner("Loading");
79+
let s = resolve_inputs_report(&args.files, &output.cloud_config, &mut |msg| {
80+
sp.set_message(msg);
81+
})?;
82+
sp.finish_and_clear();
83+
s
7984
} else {
8085
resolve_inputs(&args.files)?
8186
};
@@ -214,7 +219,10 @@ pub fn execute(args: &CheckArgs, output: &mut OutputConfig) -> miette::Result<()
214219

215220
if args.read_data {
216221
let expected_rows = metadata::total_rows(meta);
217-
match validate_data(source.path()) {
222+
let sp = output.spinner("Validating data pages");
223+
let validate_result = validate_data(source.path());
224+
sp.finish_and_clear();
225+
match validate_result {
218226
Ok(actual_rows) => {
219227
checks.push(CheckItem {
220228
name: "All pages decompressible".to_string(),

src/commands/compact.rs

Lines changed: 10 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use crate::cli::CompactArgs;
22
use crate::error::PqError;
3-
use crate::input::resolve_inputs_with_config;
3+
use crate::input::resolve_inputs_report;
44
use crate::output::{OutputConfig, OutputFormat};
55
use crate::parquet_ext::metadata;
66
use arrow::record_batch::RecordBatch;
7-
use indicatif::{ProgressBar, ProgressStyle};
87
use parquet::arrow::ArrowWriter;
98
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
109
use parquet::basic::Compression;
@@ -60,25 +59,12 @@ fn resolve_compression(args: &CompactArgs) -> Compression {
6059
}
6160
}
6261

63-
/// Create an optional progress bar. Returns `None` when stdout is not a TTY
64-
/// (piped output) so that machine-readable output stays clean.
65-
fn make_progress_bar(output: &OutputConfig, total: u64) -> Option<ProgressBar> {
66-
if !output.is_tty || output.quiet {
67-
return None;
68-
}
69-
70-
let pb = ProgressBar::new(total);
71-
pb.set_style(
72-
ProgressStyle::default_bar()
73-
.template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} groups ({eta})")
74-
.unwrap_or_else(|_| ProgressStyle::default_bar())
75-
.progress_chars("##-"),
76-
);
77-
Some(pb)
78-
}
79-
8062
pub fn execute(args: &CompactArgs, output: &mut OutputConfig) -> miette::Result<()> {
81-
let sources = resolve_inputs_with_config(&args.files, &output.cloud_config)?;
63+
let sp = output.spinner("Loading");
64+
let sources = resolve_inputs_report(&args.files, &output.cloud_config, &mut |msg| {
65+
sp.set_message(msg);
66+
})?;
67+
sp.finish_and_clear();
8268
if sources.is_empty() {
8369
return Err(miette::miette!("no files to compact"));
8470
}
@@ -253,8 +239,7 @@ pub fn execute(args: &CompactArgs, output: &mut OutputConfig) -> miette::Result<
253239
.map_err(|e| miette::miette!("cannot create output directory '{}': {}", out_dir, e))?;
254240
}
255241

256-
// ── Progress bar ──────────────────────────────────────────────────
257-
let progress = make_progress_bar(output, groups.len() as u64);
242+
let progress = output.progress_bar("Compacting", groups.len() as u64);
258243

259244
// ── Execute compaction ────────────────────────────────────────────
260245
let mut succeeded: usize = 0;
@@ -285,9 +270,7 @@ pub fn execute(args: &CompactArgs, output: &mut OutputConfig) -> miette::Result<
285270
}
286271
}
287272
succeeded += 1;
288-
if let Some(ref pb) = progress {
289-
pb.inc(1);
290-
}
273+
progress.inc(1);
291274
writeln!(
292275
output.writer,
293276
" Group {}: {} files -> {}",
@@ -299,18 +282,14 @@ pub fn execute(args: &CompactArgs, output: &mut OutputConfig) -> miette::Result<
299282
}
300283
Err(e) => {
301284
failed += 1;
302-
if let Some(ref pb) = progress {
303-
pb.inc(1);
304-
}
285+
progress.inc(1);
305286
writeln!(output.writer, " Group {}: FAILED — {}", group_idx + 1, e)
306287
.map_err(|e| miette::miette!("{}", e))?;
307288
}
308289
}
309290
}
310291

311-
if let Some(ref pb) = progress {
312-
pb.finish_and_clear();
313-
}
292+
drop(progress);
314293

315294
writeln!(
316295
output.writer,

src/commands/convert.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use arrow::csv as arrow_csv;
44
use arrow::datatypes::Schema;
55
use arrow::ipc;
66
use arrow::json as arrow_json;
7-
use indicatif::{ProgressBar, ProgressStyle};
87
use parquet::arrow::ArrowWriter;
98
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
109
use parquet::basic::Compression;
@@ -66,24 +65,10 @@ pub fn execute(args: &ConvertArgs, output: &mut OutputConfig) -> miette::Result<
6665
} else {
6766
let target = crate::input::cloud::resolve_output_path(&out_path)?;
6867
let input = &args.files[0];
69-
let spinner = if output.is_tty && !output.quiet {
70-
let pb = ProgressBar::new_spinner();
71-
pb.set_style(
72-
ProgressStyle::default_spinner()
73-
.template("{spinner} Converting {msg}...")
74-
.unwrap(),
75-
);
76-
pb.set_message(input.clone());
77-
pb.enable_steady_tick(std::time::Duration::from_millis(100));
78-
Some(pb)
79-
} else {
80-
None
81-
};
68+
let sp = output.spinner("Converting");
8269
convert_single(input, target.local_path(), args)?;
8370
target.finalize(&output.cloud_config)?;
84-
if let Some(pb) = spinner {
85-
pb.finish_and_clear();
86-
}
71+
sp.finish_and_clear();
8772
eprintln!("Converted {} → {}", input, out_path);
8873
}
8974

src/commands/count.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::cli::CountArgs;
2-
use crate::input::resolve_inputs_with_config;
2+
use crate::input::resolve_inputs_report;
33
use crate::output::{OutputConfig, OutputFormat};
44
use crate::parquet_ext::metadata;
55
use num_format::{Locale, ToFormattedString};
@@ -12,7 +12,11 @@ struct CountResult {
1212
}
1313

1414
pub fn execute(args: &CountArgs, output: &mut OutputConfig) -> miette::Result<()> {
15-
let sources = resolve_inputs_with_config(&args.files, &output.cloud_config)?;
15+
let sp = output.spinner("Loading");
16+
let sources = resolve_inputs_report(&args.files, &output.cloud_config, &mut |msg| {
17+
sp.set_message(msg);
18+
})?;
19+
sp.finish_and_clear();
1620
let single_file = sources.len() == 1;
1721

1822
let mut results: Vec<CountResult> = Vec::new();

src/commands/diff.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::cli::DiffArgs;
2-
use crate::input::resolve_inputs_with_config;
2+
use crate::input::resolve_inputs_report;
33
use crate::output::table;
44
use crate::output::{OutputConfig, OutputFormat};
55
use crate::parquet_ext::metadata;
@@ -56,7 +56,11 @@ struct DataChange {
5656
}
5757

5858
pub fn execute(args: &DiffArgs, output: &mut OutputConfig) -> miette::Result<()> {
59-
let sources = resolve_inputs_with_config(&args.files, &output.cloud_config)?;
59+
let sp = output.spinner("Loading");
60+
let sources = resolve_inputs_report(&args.files, &output.cloud_config, &mut |msg| {
61+
sp.set_message(msg);
62+
})?;
63+
sp.finish_and_clear();
6064
if sources.len() < 2 {
6165
return Err(miette::miette!(
6266
"diff requires exactly 2 files (got {})",
@@ -149,7 +153,10 @@ pub fn execute(args: &DiffArgs, output: &mut OutputConfig) -> miette::Result<()>
149153
};
150154

151155
let data_diff = if args.data {
152-
Some(compute_data_diff(args, &sources)?)
156+
let sp = output.spinner("Comparing data");
157+
let result = compute_data_diff(args, &sources)?;
158+
sp.finish_and_clear();
159+
Some(result)
153160
} else {
154161
None
155162
};

src/commands/head.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::cli::HeadArgs;
22
use crate::input::column_selector::resolve_projection;
3-
use crate::input::resolve_inputs_with_config;
3+
use crate::input::resolve_inputs_report;
44
use crate::output::{OutputConfig, OutputFormat};
55
use arrow::array::{Array, ArrayRef, AsArray, StringArray};
66
use arrow::datatypes::DataType;
@@ -14,7 +14,11 @@ use std::path::Path;
1414
use std::sync::Arc;
1515

1616
pub fn execute(args: &HeadArgs, output: &mut OutputConfig) -> miette::Result<()> {
17-
let sources = resolve_inputs_with_config(&args.files, &output.cloud_config)?;
17+
let sp = output.spinner("Loading");
18+
let sources = resolve_inputs_report(&args.files, &output.cloud_config, &mut |msg| {
19+
sp.set_message(msg);
20+
})?;
21+
sp.finish_and_clear();
1822

1923
for source in &sources {
2024
let file = File::open(source.path())

src/commands/inspect.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::cli::InspectArgs;
2-
use crate::input::resolve_inputs_with_config;
2+
use crate::input::resolve_inputs_report;
33
use crate::output::table;
44
use crate::output::{OutputConfig, OutputFormat};
55
use crate::parquet_ext::metadata;
@@ -35,7 +35,11 @@ struct SchemaColumn {
3535
}
3636

3737
pub fn execute(args: &InspectArgs, output: &mut OutputConfig) -> miette::Result<()> {
38-
let sources = resolve_inputs_with_config(&args.files, &output.cloud_config)?;
38+
let sp = output.spinner("Loading");
39+
let sources = resolve_inputs_report(&args.files, &output.cloud_config, &mut |msg| {
40+
sp.set_message(msg);
41+
})?;
42+
sp.finish_and_clear();
3943

4044
if args.raw {
4145
for source in &sources {
@@ -302,6 +306,7 @@ fn execute_multi_table(
302306
let mut mismatch_count: usize = 0;
303307
let mut first_num_columns: Option<usize> = None;
304308

309+
let progress = output.progress_bar("Inspecting", sources.len() as u64);
305310
for source in sources {
306311
let meta = metadata::read_metadata(source.path())?;
307312
let file_size = source.file_size();
@@ -341,7 +346,9 @@ fn execute_multi_table(
341346
compression,
342347
created_by,
343348
]);
349+
progress.inc(1);
344350
}
351+
drop(progress);
345352

346353
writeln!(
347354
output.writer,

0 commit comments

Comments
 (0)