Skip to content

Commit 61ea508

Browse files
committed
Reapply "Fix checkpoint determinism: bound coalescing to checkpoint interval (#187)"
This reverts commit 5ea1ddb.
1 parent 0336dea commit 61ea508

1 file changed

Lines changed: 64 additions & 11 deletions

File tree

crates/etl/src/lib.rs

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -244,18 +244,26 @@ fn coalesce_batches(batches: &[RecordBatch]) -> anyhow::Result<Vec<RecordBatch>>
244244
/// Removes and returns the next batch ID (greater than `after_batch_id`) that
245245
/// still has pending work for `table_name`.
246246
///
247+
/// When `max_batch_id` is `Some(max)`, candidates with a batch ID greater than
248+
/// `max` are not considered. This is used to prevent coalescing across
249+
/// checkpoint interval boundaries.
250+
///
247251
/// This is used by the ETL runner to coalesce very small reads across multiple
248252
/// source batch IDs for the same table while ensuring consumed IDs are not
249253
/// replayed in later steps.
250254
fn reserve_next_batch_id_for_table(
251255
work_state: &mut PipelineWorkState,
252256
table_name: &str,
253257
after_batch_id: u64,
258+
max_batch_id: Option<u64>,
254259
) -> Option<(u64, bool)> {
255260
let mut found: Option<(u64, bool)> = None;
256261
let start = after_batch_id.saturating_add(1);
257262

258263
for (candidate_batch_id, tables) in work_state.steps.range_mut(start..) {
264+
if max_batch_id.is_some_and(|max| *candidate_batch_id > max) {
265+
break;
266+
}
259267
if let Some(pos) = tables.iter().position(|t| t == table_name) {
260268
tables.remove(pos);
261269
found = Some((*candidate_batch_id, tables.is_empty()));
@@ -371,6 +379,11 @@ async fn read_logical_batch(
371379
/// reserving and reading subsequent batch IDs for that table until at least
372380
/// [`target_batch_rows()`] rows have been accumulated (or no further work exists).
373381
///
382+
/// When `max_batch_id` is `Some(max)`, coalescing will not reserve batch IDs
383+
/// beyond `max`. This prevents reads from crossing a checkpoint interval
384+
/// boundary, ensuring checkpoint results are deterministic regardless of
385+
/// the configured [`target_batch_rows()`] value.
386+
///
374387
/// Returns `(raw_batches, key_columns, table_finished, consumed_work_units, rows_read)` where
375388
/// `table_finished=true` means a read returned `None` and the table should be
376389
/// marked as fully consumed. `consumed_work_units` counts how many table+batch
@@ -382,6 +395,7 @@ async fn read_batches_until_min_rows(
382395
logical_steps_consumed: &StdArc<AtomicU64>,
383396
table_name: &str,
384397
start_batch_id: u64,
398+
max_batch_id: Option<u64>,
385399
) -> Result<(Vec<RecordBatch>, Vec<String>, bool, u64, u64), String> {
386400
let mut all_batches: Vec<RecordBatch> = Vec::new();
387401
let mut total_rows: usize = 0;
@@ -450,7 +464,12 @@ async fn read_batches_until_min_rows(
450464
{
451465
let reservation = {
452466
let mut state = work_state.lock().expect("work_state lock poisoned");
453-
reserve_next_batch_id_for_table(&mut state, table_name, reserve_cursor)
467+
reserve_next_batch_id_for_table(
468+
&mut state,
469+
table_name,
470+
reserve_cursor,
471+
max_batch_id,
472+
)
454473
};
455474

456475
let Some((next_batch_id, removed_step_entry)) = reservation else {
@@ -1614,6 +1633,16 @@ async fn run_pipeline(
16141633
})
16151634
};
16161635

1636+
// When running with a step budget (checkpoint mode), compute the highest
1637+
// batch ID that belongs to this checkpoint interval. Coalescing in
1638+
// read_batches_until_min_rows is allowed within the interval but will not
1639+
// reserve batch IDs beyond this boundary, ensuring checkpoint results are
1640+
// deterministic regardless of the configured target_batch_rows() value.
1641+
let checkpoint_max_batch_id: Option<u64> = step_limit.and_then(|limit| {
1642+
let state = work_state.lock().expect("work_state lock poisoned");
1643+
state.steps.keys().nth(limit.saturating_sub(1)).copied()
1644+
});
1645+
16171646
let mut outer_steps_processed: usize = 0;
16181647

16191648
loop {
@@ -1652,20 +1681,26 @@ async fn run_pipeline(
16521681
return PipelineState::Stopped(StopReason::Cancelled);
16531682
}
16541683

1655-
// Pop the next step from the shared work state.
1684+
// Pop the next step from the shared work state. If the next batch ID
1685+
// is beyond the checkpoint interval boundary, do not pop it — pause
1686+
// instead so the checkpoint can be taken with exact data.
16561687
let next_step = {
16571688
let mut state = work_state.lock().expect("work_state lock poisoned");
16581689
if let Some(entry) = state.steps.first_entry() {
16591690
let batch_id = *entry.key();
1660-
let tables = entry.remove();
1661-
// Filter out already-finished tables.
1662-
let total_tables = tables.len();
1663-
let active: Vec<String> = tables
1664-
.into_iter()
1665-
.filter(|t| !state.finished_tables.contains(t))
1666-
.collect();
1667-
let skipped = (total_tables - active.len()) as u64;
1668-
Some((batch_id, active, skipped))
1691+
if checkpoint_max_batch_id.is_some_and(|max| batch_id > max) {
1692+
None
1693+
} else {
1694+
let tables = entry.remove();
1695+
// Filter out already-finished tables.
1696+
let total_tables = tables.len();
1697+
let active: Vec<String> = tables
1698+
.into_iter()
1699+
.filter(|t| !state.finished_tables.contains(t))
1700+
.collect();
1701+
let skipped = (total_tables - active.len()) as u64;
1702+
Some((batch_id, active, skipped))
1703+
}
16691704
} else {
16701705
None
16711706
}
@@ -1685,6 +1720,23 @@ async fn run_pipeline(
16851720
}
16861721
(bid, tables)
16871722
}
1723+
None if checkpoint_max_batch_id.is_some() => {
1724+
// Reached the checkpoint interval boundary (or all work
1725+
// within the interval was consumed by coalescing). Pause so
1726+
// the checkpoint validation can run against exact data.
1727+
info!(
1728+
outer_steps_processed,
1729+
logical_steps_consumed = logical_steps_consumed.load(Ordering::Relaxed),
1730+
"Checkpoint interval boundary reached, pausing pipeline"
1731+
);
1732+
progress_logger.abort();
1733+
if let Err(e) = data_sink.flush().await {
1734+
return PipelineState::Stopped(StopReason::Error(format!(
1735+
"Failed to flush sink at checkpoint boundary: {e}"
1736+
)));
1737+
}
1738+
return PipelineState::Paused;
1739+
}
16881740
None => {
16891741
// No more work — pipeline is done.
16901742
break;
@@ -1723,6 +1775,7 @@ async fn run_pipeline(
17231775
&logical_steps_consumed,
17241776
&table_name,
17251777
batch_id,
1778+
checkpoint_max_batch_id,
17261779
)
17271780
.await
17281781
{

0 commit comments

Comments
 (0)