Skip to content

Commit 5ea1ddb

Browse files
Revert "Fix checkpoint determinism: bound coalescing to checkpoint interval (#187)"
This reverts commit cd6943f.
1 parent cd6943f commit 5ea1ddb

1 file changed

Lines changed: 11 additions & 64 deletions

File tree

crates/etl/src/lib.rs

Lines changed: 11 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -244,26 +244,18 @@ fn coalesce_batches(batches: &[RecordBatch]) -> anyhow::Result<Vec<RecordBatch>>
244244
/// Removes and returns the next batch ID (greater than `after_batch_id`) that
245245
/// still has pending work for `table_name`.
246246
///
247-
/// When `max_batch_id` is `Some(max)`, candidates with a batch ID greater than
248-
/// `max` are not considered. This is used to prevent coalescing across
249-
/// checkpoint interval boundaries.
250-
///
251247
/// This is used by the ETL runner to coalesce very small reads across multiple
252248
/// source batch IDs for the same table while ensuring consumed IDs are not
253249
/// replayed in later steps.
254250
fn reserve_next_batch_id_for_table(
255251
work_state: &mut PipelineWorkState,
256252
table_name: &str,
257253
after_batch_id: u64,
258-
max_batch_id: Option<u64>,
259254
) -> Option<(u64, bool)> {
260255
let mut found: Option<(u64, bool)> = None;
261256
let start = after_batch_id.saturating_add(1);
262257

263258
for (candidate_batch_id, tables) in work_state.steps.range_mut(start..) {
264-
if max_batch_id.is_some_and(|max| *candidate_batch_id > max) {
265-
break;
266-
}
267259
if let Some(pos) = tables.iter().position(|t| t == table_name) {
268260
tables.remove(pos);
269261
found = Some((*candidate_batch_id, tables.is_empty()));
@@ -379,11 +371,6 @@ async fn read_logical_batch(
379371
/// reserving and reading subsequent batch IDs for that table until at least
380372
/// [`target_batch_rows()`] rows have been accumulated (or no further work exists).
381373
///
382-
/// When `max_batch_id` is `Some(max)`, coalescing will not reserve batch IDs
383-
/// beyond `max`. This prevents reads from crossing a checkpoint interval
384-
/// boundary, ensuring checkpoint results are deterministic regardless of
385-
/// the configured [`target_batch_rows()`] value.
386-
///
387374
/// Returns `(raw_batches, key_columns, table_finished, consumed_work_units, rows_read)` where
388375
/// `table_finished=true` means a read returned `None` and the table should be
389376
/// marked as fully consumed. `consumed_work_units` counts how many table+batch
@@ -395,7 +382,6 @@ async fn read_batches_until_min_rows(
395382
logical_steps_consumed: &StdArc<AtomicU64>,
396383
table_name: &str,
397384
start_batch_id: u64,
398-
max_batch_id: Option<u64>,
399385
) -> Result<(Vec<RecordBatch>, Vec<String>, bool, u64, u64), String> {
400386
let mut all_batches: Vec<RecordBatch> = Vec::new();
401387
let mut total_rows: usize = 0;
@@ -464,12 +450,7 @@ async fn read_batches_until_min_rows(
464450
{
465451
let reservation = {
466452
let mut state = work_state.lock().expect("work_state lock poisoned");
467-
reserve_next_batch_id_for_table(
468-
&mut state,
469-
table_name,
470-
reserve_cursor,
471-
max_batch_id,
472-
)
453+
reserve_next_batch_id_for_table(&mut state, table_name, reserve_cursor)
473454
};
474455

475456
let Some((next_batch_id, removed_step_entry)) = reservation else {
@@ -1633,16 +1614,6 @@ async fn run_pipeline(
16331614
})
16341615
};
16351616

1636-
// When running with a step budget (checkpoint mode), compute the highest
1637-
// batch ID that belongs to this checkpoint interval. Coalescing in
1638-
// read_batches_until_min_rows is allowed within the interval but will not
1639-
// reserve batch IDs beyond this boundary, ensuring checkpoint results are
1640-
// deterministic regardless of the configured target_batch_rows() value.
1641-
let checkpoint_max_batch_id: Option<u64> = step_limit.and_then(|limit| {
1642-
let state = work_state.lock().expect("work_state lock poisoned");
1643-
state.steps.keys().nth(limit.saturating_sub(1)).copied()
1644-
});
1645-
16461617
let mut outer_steps_processed: usize = 0;
16471618

16481619
loop {
@@ -1681,26 +1652,20 @@ async fn run_pipeline(
16811652
return PipelineState::Stopped(StopReason::Cancelled);
16821653
}
16831654

1684-
// Pop the next step from the shared work state. If the next batch ID
1685-
// is beyond the checkpoint interval boundary, do not pop it — pause
1686-
// instead so the checkpoint can be taken with exact data.
1655+
// Pop the next step from the shared work state.
16871656
let next_step = {
16881657
let mut state = work_state.lock().expect("work_state lock poisoned");
16891658
if let Some(entry) = state.steps.first_entry() {
16901659
let batch_id = *entry.key();
1691-
if checkpoint_max_batch_id.is_some_and(|max| batch_id > max) {
1692-
None
1693-
} else {
1694-
let tables = entry.remove();
1695-
// Filter out already-finished tables.
1696-
let total_tables = tables.len();
1697-
let active: Vec<String> = tables
1698-
.into_iter()
1699-
.filter(|t| !state.finished_tables.contains(t))
1700-
.collect();
1701-
let skipped = (total_tables - active.len()) as u64;
1702-
Some((batch_id, active, skipped))
1703-
}
1660+
let tables = entry.remove();
1661+
// Filter out already-finished tables.
1662+
let total_tables = tables.len();
1663+
let active: Vec<String> = tables
1664+
.into_iter()
1665+
.filter(|t| !state.finished_tables.contains(t))
1666+
.collect();
1667+
let skipped = (total_tables - active.len()) as u64;
1668+
Some((batch_id, active, skipped))
17041669
} else {
17051670
None
17061671
}
@@ -1720,23 +1685,6 @@ async fn run_pipeline(
17201685
}
17211686
(bid, tables)
17221687
}
1723-
None if checkpoint_max_batch_id.is_some() => {
1724-
// Reached the checkpoint interval boundary (or all work
1725-
// within the interval was consumed by coalescing). Pause so
1726-
// the checkpoint validation can run against exact data.
1727-
info!(
1728-
outer_steps_processed,
1729-
logical_steps_consumed = logical_steps_consumed.load(Ordering::Relaxed),
1730-
"Checkpoint interval boundary reached, pausing pipeline"
1731-
);
1732-
progress_logger.abort();
1733-
if let Err(e) = data_sink.flush().await {
1734-
return PipelineState::Stopped(StopReason::Error(format!(
1735-
"Failed to flush sink at checkpoint boundary: {e}"
1736-
)));
1737-
}
1738-
return PipelineState::Paused;
1739-
}
17401688
None => {
17411689
// No more work — pipeline is done.
17421690
break;
@@ -1775,7 +1723,6 @@ async fn run_pipeline(
17751723
&logical_steps_consumed,
17761724
&table_name,
17771725
batch_id,
1778-
checkpoint_max_batch_id,
17791726
)
17801727
.await
17811728
{

0 commit comments

Comments
 (0)