Skip to content

Commit db65dd0

Browse files
authored
Merge branch 'trunk' into peasee/260218-refactor-duplicated-s3-sources
2 parents b5942e5 + f57d3b6 commit db65dd0

3 files changed

Lines changed: 108 additions & 17 deletions

File tree

crates/etl/src/lib.rs

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ use data_generation::dataset::tpch::TpchDataset;
2424
use data_generation::source::Source;
2525
use data_generation::target::Target;
2626
use std::collections::{BTreeMap, HashSet};
27+
use std::sync::Arc as StdArc;
28+
use std::sync::atomic::{AtomicU64, Ordering};
29+
use std::time::Instant;
2730
use system_adapter_protocol::{DatasetConfig as ProtocolDatasetConfig, EtlType};
2831
use tokio::sync::watch;
2932
use tokio::task::{JoinHandle, JoinSet};
3033
use tokio_util::sync::CancellationToken;
31-
use tracing::{error, info, warn};
34+
use tracing::{debug, error, info, warn};
3235

3336
type DynSource = Arc<dyn Source>;
3437
type DynTarget = Arc<dyn Target>;
@@ -239,7 +242,7 @@ impl ETLPipeline {
239242
.map_err(|e| format!("write {table_name} batch {first_batch_id}: {e}"))?;
240243
}
241244

242-
info!(
245+
debug!(
243246
table = %table_name,
244247
batch_id = first_batch_id,
245248
"Initial batch processed"
@@ -363,6 +366,46 @@ async fn run_pipeline(
363366
let total_batches = work.len();
364367
info!(total_steps, total_batches, "ETL pipeline started");
365368

369+
// Shared progress counters for periodic logging.
370+
let steps_completed = StdArc::new(AtomicU64::new(0));
371+
let batches_processed = StdArc::new(AtomicU64::new(0));
372+
let tables_finished = StdArc::new(AtomicU64::new(0));
373+
let pipeline_start = Instant::now();
374+
375+
// Spawn periodic progress logger (every 5 seconds).
376+
let progress_logger = {
377+
let steps_completed = StdArc::clone(&steps_completed);
378+
let batches_processed = StdArc::clone(&batches_processed);
379+
let tables_finished = StdArc::clone(&tables_finished);
380+
let cancel = cancel.clone();
381+
tokio::spawn(async move {
382+
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
383+
loop {
384+
tokio::select! {
385+
_ = interval.tick() => {
386+
let elapsed = pipeline_start.elapsed();
387+
let secs = elapsed.as_secs_f64();
388+
if secs < 0.001 {
389+
continue;
390+
}
391+
let steps_done = steps_completed.load(Ordering::Relaxed);
392+
let batches_done = batches_processed.load(Ordering::Relaxed);
393+
let tables_done = tables_finished.load(Ordering::Relaxed);
394+
info!(
395+
elapsed_secs = format!("{secs:.1}"),
396+
steps = format!("{steps_done}/{total_steps}"),
397+
batches = format!("{batches_done}/{total_batches}"),
398+
tables_finished = tables_done,
399+
batches_per_sec = format!("{:.1}", batches_done as f64 / secs),
400+
"ETL progress"
401+
);
402+
}
403+
() = cancel.cancelled() => break,
404+
}
405+
}
406+
})
407+
};
408+
366409
// Tables whose data has been fully consumed (source returned `None`).
367410
let mut finished_tables: HashSet<String> = HashSet::new();
368411

@@ -394,7 +437,7 @@ async fn run_pipeline(
394437
let read_result = match source.read_batch(&table_name, batch_id).await {
395438
Ok(Some(r)) => r,
396439
Ok(None) => {
397-
info!(
440+
debug!(
398441
table = %table_name,
399442
batch_id,
400443
"No more batches for table, marking as finished"
@@ -439,7 +482,7 @@ async fn run_pipeline(
439482
}
440483
}
441484

442-
info!(
485+
debug!(
443486
table = %table_name,
444487
batch_id,
445488
"Table batch processed"
@@ -449,29 +492,43 @@ async fn run_pipeline(
449492
}
450493

451494
// Collect results from all concurrent table tasks in this step.
495+
let mut step_batch_count: u64 = 0;
452496
while let Some(result) = join_set.join_next().await {
453497
match result {
454498
Ok(Ok((table_name, is_finished))) => {
499+
step_batch_count += 1;
455500
if is_finished {
456501
finished_tables.insert(table_name);
502+
tables_finished.fetch_add(1, Ordering::Relaxed);
457503
}
458504
}
459505
Ok(Err(err_msg)) => {
506+
progress_logger.abort();
460507
return StopReason::Error(err_msg);
461508
}
462509
Err(e) => {
510+
progress_logger.abort();
463511
return StopReason::Error(format!("Task panicked: {e}"));
464512
}
465513
}
466514
}
467515

468-
info!(
516+
steps_completed.fetch_add(1, Ordering::Relaxed);
517+
batches_processed.fetch_add(step_batch_count, Ordering::Relaxed);
518+
519+
debug!(
469520
batch_id,
470521
progress = format!("{}/{}", step_idx + 1, total_steps),
471522
"Step completed"
472523
);
473524
}
474525

475-
info!("ETL pipeline completed successfully");
526+
progress_logger.abort();
527+
info!(
528+
elapsed = ?pipeline_start.elapsed(),
529+
steps = total_steps,
530+
batches = total_batches,
531+
"ETL pipeline completed successfully"
532+
);
476533
StopReason::Completed
477534
}

src/commands/load/mod.rs

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ limitations under the License.
1616
#![allow(dead_code)]
1717

1818
use crate::{args::CommonArgs, commands::adbc_executor, scenario::Scenario};
19-
use etl::ETLPipeline;
19+
use etl::{ETLPipeline, PipelineState, StopReason};
2020
use std::sync::Arc;
2121
use std::time::Duration;
2222
use system_adapter_protocol::MetricsResponse;
@@ -188,23 +188,59 @@ pub(crate) async fn run(
188188

189189
// --- Start the ETL pipeline (remaining batches) ---
190190
tracing::info!("Starting ETL pipeline (remaining batches)...");
191+
let mut etl_state_rx = etl_pipeline.state_watch();
191192
etl_pipeline.start()?;
192193

193194
let test_future = throughput_test.wait();
194195
tokio::pin!(test_future);
195-
let test = match tokio::select! {
196-
res = &mut test_future => res,
197-
_ = signal::ctrl_c() => {
198-
println!("Interrupt received, stopping benchmark...");
199-
shutdown_token.cancel();
200-
test_future.await
196+
197+
// Wait for ETL pipeline completion, then cancel the test.
198+
// If interrupted (ctrl-c), cancel both the test and the ETL pipeline.
199+
let etl_error: Option<String> = loop {
200+
tokio::select! {
201+
// ETL state changed — check if stopped
202+
_ = etl_state_rx.changed() => {
203+
let state = etl_state_rx.borrow_and_update().clone();
204+
match state {
205+
PipelineState::Stopped(StopReason::Completed) => {
206+
println!("ETL pipeline completed, stopping benchmark...");
207+
shutdown_token.cancel();
208+
break None;
209+
}
210+
PipelineState::Stopped(StopReason::Error(ref e)) => {
211+
eprintln!("ETL pipeline failed: {e}");
212+
shutdown_token.cancel();
213+
break Some(e.clone());
214+
}
215+
PipelineState::Stopped(StopReason::Cancelled) => {
216+
println!("ETL pipeline was cancelled, stopping benchmark...");
217+
shutdown_token.cancel();
218+
break None;
219+
}
220+
_ => { /* still running, keep waiting */ }
221+
}
222+
}
223+
// ctrl-c: stop everything
224+
_ = signal::ctrl_c() => {
225+
println!("Interrupt received, stopping benchmark...");
226+
shutdown_token.cancel();
227+
etl_pipeline.cancel();
228+
break None;
229+
}
201230
}
202-
} {
231+
};
232+
233+
let test = match test_future.await {
203234
Ok(test) => test,
204235
Err(e) => {
205236
return Err(e);
206237
}
207238
};
239+
240+
// Propagate ETL error after collecting the test result
241+
if let Some(etl_err) = etl_error {
242+
return Err(anyhow::anyhow!("ETL pipeline failed: {etl_err}"));
243+
}
208244
test.get_query_durations().statistical_set()?;
209245

210246
// Get all query durations for overall statistics before ending the test

src/scenario.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,7 @@ impl Scenario {
4242

4343
pub fn end_condition(&self) -> test_framework::spicetest::datasets::EndCondition {
4444
match self {
45-
Scenario::TPCH => test_framework::spicetest::datasets::EndCondition::Duration(
46-
std::time::Duration::from_secs(60),
47-
),
45+
Scenario::TPCH => test_framework::spicetest::datasets::EndCondition::Unlimited,
4846
}
4947
}
5048
}

0 commit comments

Comments
 (0)