Skip to content

Commit d8b6222

Browse files
Fix ETL step counting, ADBC initialization, checkpoint validation, and convergence timing (#165)
- Fix ETL coalescing race condition: use outer_steps_processed instead of logical_steps_consumed for step limit check. Coalescing internally consumes steps from future batches, inflating logical_steps_consumed and causing the pipeline to stop one step early. - Add missing pipeline.initialize() call in ADBC sink path, ensuring batch 0 is loaded for all tables before the pipeline run starts. - Widen schema equivalence in checkpoint validation: allow Decimal128 with same scale but different precision (DuckDB vs DataFusion), and add reverse Decimal128 to Int64/Float64 matching. - Measure checkpoint convergence latency from the first passing query rather than from when the full validation sweep completes. The first pass indicates when the data was correctly ingested; the remaining queries confirm the result. Replace fixed 5s polling with watch channel notification. Checkpoint expected data in S3 regenerated for versions 1, 2, and 3.
1 parent b7fcee8 commit d8b6222

6 files changed

Lines changed: 85 additions & 15 deletions

File tree

crates/etl/src/lib.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1463,12 +1463,19 @@ async fn run_pipeline(
14631463
let mut outer_steps_processed: usize = 0;
14641464

14651465
loop {
1466-
// Check step budget.
1466+
// Check step budget using outer_steps_processed rather than
1467+
// logical_steps_consumed so that steps consumed internally by
1468+
// batch coalescing (read_batches_until_min_rows) do not count
1469+
// against the budget. logical_steps_consumed can be incremented
1470+
// when a coalescing reservation removes the last table from a
1471+
// future step, effectively "consuming" that step without the outer
1472+
// loop ever processing it for the remaining tables.
14671473
if let Some(limit) = step_limit
1468-
&& logical_steps_consumed.load(Ordering::Relaxed) >= limit as u64
1474+
&& outer_steps_processed >= limit
14691475
{
14701476
info!(
1471-
steps_processed = logical_steps_consumed.load(Ordering::Relaxed),
1477+
outer_steps_processed,
1478+
logical_steps_consumed = logical_steps_consumed.load(Ordering::Relaxed),
14721479
"Step limit reached, pausing pipeline"
14731480
);
14741481
progress_logger.abort();

crates/test-framework/src/queries/validation/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ fn datatype_equivalent(expected_type: &DataType, actual_type: &DataType) -> bool
145145
(None, Some("UTC" | "+00:00")) | (Some("UTC" | "+00:00"), None)
146146
)
147147
}
148+
// Decimal128 with same scale but different precision (e.g. DuckDB returns
149+
// Decimal128(38,2) while DataFusion returns Decimal128(25,2)). The stringified
150+
// values are identical because scale controls the fractional digits.
151+
(DataType::Decimal128(_, s1), DataType::Decimal128(_, s2)) => s1 == s2,
148152
// Existing numeric and string type equivalences
149153
_ => matches!(
150154
(expected_type, actual_type),

crates/test-framework/src/spicetest/datasets/checkpoint_validation.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ limitations under the License.
2828
2929
use std::collections::HashMap;
3030
use std::sync::Arc;
31+
use std::time::Instant;
3132

3233
use arrow::array::RecordBatch;
3334

@@ -85,6 +86,12 @@ pub enum ValidationStatus {
8586
/// every validated query passing. Once set, stays `true` for the
8687
/// remainder of this validation window.
8788
converged: bool,
89+
/// The instant at which the first query passed in the iteration
90+
/// that ultimately converged. `None` until a pass is recorded in
91+
/// the current (non-converged) iteration. Reset at the start of
92+
/// each new iteration so that a failing iteration's timestamp is
93+
/// discarded.
94+
first_pass_instant: Option<Instant>,
8895
},
8996
}
9097

@@ -131,6 +138,18 @@ impl ValidationStatus {
131138
} => *completed_iterations,
132139
}
133140
}
141+
142+
/// Returns the [`Instant`] at which the first query passed in the
143+
/// converging iteration, or `None` if not yet available.
144+
#[must_use]
145+
pub fn first_pass_instant(&self) -> Option<Instant> {
146+
match self {
147+
ValidationStatus::Inactive => None,
148+
ValidationStatus::Active {
149+
first_pass_instant, ..
150+
} => *first_pass_instant,
151+
}
152+
}
134153
}
135154

136155
/// Handles for the load runner to interact with checkpoint validation.

crates/test-framework/src/spicetest/datasets/worker.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ struct CheckpointValidationState {
9797
/// Set to `true` once a complete iteration finishes with every query
9898
/// passing. Sticky — once set, stays `true` for the validation window.
9999
converged: bool,
100+
/// The instant at which the first query passed in the current
101+
/// (non-converged) iteration. Reset at each iteration boundary.
102+
first_pass_instant: Option<Instant>,
100103
}
101104

102105
impl CheckpointValidationState {
@@ -111,6 +114,7 @@ impl CheckpointValidationState {
111114
command_rx: handles.command_rx,
112115
current_iteration_results: HashMap::new(),
113116
converged: false,
117+
first_pass_instant: None,
114118
}
115119
}
116120

@@ -134,6 +138,7 @@ impl CheckpointValidationState {
134138
self.completed_iterations = 0;
135139
self.current_iteration_results.clear();
136140
self.converged = false;
141+
self.first_pass_instant = None;
137142
eprintln!(
138143
"Checkpoint validation enabled for checkpoint {}",
139144
checkpoint_idx
@@ -147,6 +152,7 @@ impl CheckpointValidationState {
147152
self.outcomes.clear();
148153
self.current_iteration_results.clear();
149154
self.converged = false;
155+
self.first_pass_instant = None;
150156
eprintln!("Checkpoint validation disabled");
151157
let _ = self.status_tx.send(ValidationStatus::Inactive);
152158
true
@@ -182,6 +188,9 @@ impl CheckpointValidationState {
182188
if !self.converged {
183189
self.current_iteration_results
184190
.insert(Arc::clone(query_name), true);
191+
if self.first_pass_instant.is_none() {
192+
self.first_pass_instant = Some(Instant::now());
193+
}
185194
}
186195
}
187196
Ok(QueryValidationResult::Fail(reason)) => {
@@ -261,11 +270,17 @@ impl CheckpointValidationState {
261270
let all_passed = self.current_iteration_results.values().all(|&v| v);
262271
if all_passed {
263272
self.converged = true;
273+
// first_pass_instant is preserved — it holds the instant
274+
// the first query passed (possibly in an earlier iteration)
275+
// indicating when the data first became correct.
264276
eprintln!(
265277
"Checkpoint {} validation converged after {} iterations",
266278
self.checkpoint_idx, self.completed_iterations
267279
);
268280
}
281+
// Do NOT reset first_pass_instant on failure — it marks
282+
// when the data first became queryable, even if a mixed
283+
// iteration didn't fully converge yet.
269284
}
270285
self.current_iteration_results.clear();
271286

@@ -278,6 +293,7 @@ impl CheckpointValidationState {
278293
outcomes: self.outcomes.values().cloned().collect(),
279294
completed_iterations: self.completed_iterations,
280295
converged: self.converged,
296+
first_pass_instant: self.first_pass_instant,
281297
};
282298
let _ = self.status_tx.send(status);
283299
}

src/commands/load/mod.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ pub(crate) async fn run(
492492
// Always create validation channels so we can track query-set iteration
493493
// completions. When --validate-results is enabled with checkpoint data,
494494
// these channels are also used for checkpoint-based results validation.
495-
let (validation_controller, validation_worker_handles) = create_validation_channels();
495+
let (mut validation_controller, validation_worker_handles) = create_validation_channels();
496496
test_builder = test_builder.with_checkpoint_validation(validation_worker_handles);
497497

498498
let has_checkpoint_validation =
@@ -581,6 +581,10 @@ pub(crate) async fn run(
581581
);
582582

583583
let etl_pause_time = tokio::time::Instant::now();
584+
// Capture std::time::Instant at the same point so
585+
// we can compare against the worker's
586+
// first_pass_instant (which uses std::time::Instant).
587+
let etl_pause_time_std = std::time::Instant::now();
584588

585589
// Tell worker 0 to start validating.
586590
let _ = validation_controller.command_tx.send(Some(
@@ -593,16 +597,28 @@ pub(crate) async fn run(
593597
// Poll the validation status until convergence
594598
// (a complete iteration where every query passes)
595599
// or until the timeout is reached.
596-
const POLL_INTERVAL: Duration = Duration::from_secs(5);
597600
const MAX_WAIT: Duration = Duration::from_secs(600);
601+
let deadline =
602+
tokio::time::Instant::now() + MAX_WAIT;
598603
let mut timed_out = false;
599-
let mut interrupted = false;
604+
let interrupted = false;
600605
loop {
601606
let status =
602607
validation_controller.status_rx.borrow().clone();
603608
if status.converged() {
604-
let latency_ms =
605-
etl_pause_time.elapsed().as_secs_f64() * 1000.0;
609+
// Use the instant the first query passed
610+
// as the latency reference point. This
611+
// measures when the data was fully ingested
612+
// (first correct answer), not when the full
613+
// validation sweep finished.
614+
let latency_ms = status
615+
.first_pass_instant()
616+
.map_or_else(
617+
|| etl_pause_time_std.elapsed(),
618+
|fpi| fpi.duration_since(etl_pause_time_std),
619+
)
620+
.as_secs_f64()
621+
* 1000.0;
606622
println!(
607623
"Checkpoint {} converged in {:.1}s ({} iterations)",
608624
checkpoint_idx,
@@ -617,12 +633,17 @@ pub(crate) async fn run(
617633
timed_out = true;
618634
break;
619635
}
620-
tokio::select! {
621-
_ = tokio::time::sleep(POLL_INTERVAL) => {}
622-
_ = signal::ctrl_c() => {
623-
interrupted = true;
624-
break;
625-
}
636+
// Wait for the worker to publish a new status
637+
// update rather than polling on a fixed interval.
638+
if tokio::time::timeout_at(
639+
deadline,
640+
validation_controller.status_rx.changed(),
641+
)
642+
.await
643+
.is_err()
644+
{
645+
timed_out = true;
646+
break;
626647
}
627648
}
628649

@@ -633,6 +654,7 @@ pub(crate) async fn run(
633654
outcomes,
634655
completed_iterations: iters,
635656
converged,
657+
..
636658
} = &status
637659
{
638660
let total_pass: usize =

src/main.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,14 +231,16 @@ async fn run_benchmark(
231231
target_db_schema,
232232
)?);
233233

234-
let pipeline = ETLPipeline::new(
234+
let mut pipeline = ETLPipeline::new(
235235
dataset_source,
236236
&generation_config,
237237
Arc::clone(&data_source),
238238
target_sink,
239239
&mutations,
240240
)?;
241241

242+
pipeline.initialize().await?;
243+
242244
(setup_response, pipeline)
243245
}
244246
};

0 commit comments

Comments
 (0)