@@ -244,18 +244,26 @@ fn coalesce_batches(batches: &[RecordBatch]) -> anyhow::Result<Vec<RecordBatch>>
244244/// Removes and returns the next batch ID (greater than `after_batch_id`) that
245245/// still has pending work for `table_name`.
246246///
247+ /// When `max_batch_id` is `Some(max)`, candidates with a batch ID greater than
248+ /// `max` are not considered. This is used to prevent coalescing across
249+ /// checkpoint interval boundaries.
250+ ///
247251/// This is used by the ETL runner to coalesce very small reads across multiple
248252/// source batch IDs for the same table while ensuring consumed IDs are not
249253/// replayed in later steps.
250254fn reserve_next_batch_id_for_table (
251255 work_state : & mut PipelineWorkState ,
252256 table_name : & str ,
253257 after_batch_id : u64 ,
258+ max_batch_id : Option < u64 > ,
254259) -> Option < ( u64 , bool ) > {
255260 let mut found: Option < ( u64 , bool ) > = None ;
256261 let start = after_batch_id. saturating_add ( 1 ) ;
257262
258263 for ( candidate_batch_id, tables) in work_state. steps . range_mut ( start..) {
264+ if max_batch_id. is_some_and ( |max| * candidate_batch_id > max) {
265+ break ;
266+ }
259267 if let Some ( pos) = tables. iter ( ) . position ( |t| t == table_name) {
260268 tables. remove ( pos) ;
261269 found = Some ( ( * candidate_batch_id, tables. is_empty ( ) ) ) ;
@@ -371,6 +379,11 @@ async fn read_logical_batch(
371379/// reserving and reading subsequent batch IDs for that table until at least
372380/// [`target_batch_rows()`] rows have been accumulated (or no further work exists).
373381///
382+ /// When `max_batch_id` is `Some(max)`, coalescing will not reserve batch IDs
383+ /// beyond `max`. This prevents reads from crossing a checkpoint interval
384+ /// boundary, ensuring checkpoint results are deterministic regardless of
385+ /// the configured [`target_batch_rows()`] value.
386+ ///
374387/// Returns `(raw_batches, key_columns, table_finished, consumed_work_units, rows_read)` where
375388/// `table_finished=true` means a read returned `None` and the table should be
376389/// marked as fully consumed. `consumed_work_units` counts how many table+batch
@@ -382,6 +395,7 @@ async fn read_batches_until_min_rows(
382395 logical_steps_consumed : & StdArc < AtomicU64 > ,
383396 table_name : & str ,
384397 start_batch_id : u64 ,
398+ max_batch_id : Option < u64 > ,
385399) -> Result < ( Vec < RecordBatch > , Vec < String > , bool , u64 , u64 ) , String > {
386400 let mut all_batches: Vec < RecordBatch > = Vec :: new ( ) ;
387401 let mut total_rows: usize = 0 ;
@@ -450,7 +464,12 @@ async fn read_batches_until_min_rows(
450464 {
451465 let reservation = {
452466 let mut state = work_state. lock ( ) . expect ( "work_state lock poisoned" ) ;
453- reserve_next_batch_id_for_table ( & mut state, table_name, reserve_cursor)
467+ reserve_next_batch_id_for_table (
468+ & mut state,
469+ table_name,
470+ reserve_cursor,
471+ max_batch_id,
472+ )
454473 } ;
455474
456475 let Some ( ( next_batch_id, removed_step_entry) ) = reservation else {
@@ -1614,6 +1633,16 @@ async fn run_pipeline(
16141633 } )
16151634 } ;
16161635
1636+ // When running with a step budget (checkpoint mode), compute the highest
1637+ // batch ID that belongs to this checkpoint interval. Coalescing in
1638+ // read_batches_until_min_rows is allowed within the interval but will not
1639+ // reserve batch IDs beyond this boundary, ensuring checkpoint results are
1640+ // deterministic regardless of the configured target_batch_rows() value.
1641+ let checkpoint_max_batch_id: Option < u64 > = step_limit. and_then ( |limit| {
1642+ let state = work_state. lock ( ) . expect ( "work_state lock poisoned" ) ;
1643+ state. steps . keys ( ) . nth ( limit. saturating_sub ( 1 ) ) . copied ( )
1644+ } ) ;
1645+
16171646 let mut outer_steps_processed: usize = 0 ;
16181647
16191648 loop {
@@ -1652,20 +1681,26 @@ async fn run_pipeline(
16521681 return PipelineState :: Stopped ( StopReason :: Cancelled ) ;
16531682 }
16541683
1655- // Pop the next step from the shared work state.
1684+ // Pop the next step from the shared work state. If the next batch ID
1685+ // is beyond the checkpoint interval boundary, do not pop it — pause
1686+ // instead so the checkpoint can be taken with exact data.
16561687 let next_step = {
16571688 let mut state = work_state. lock ( ) . expect ( "work_state lock poisoned" ) ;
16581689 if let Some ( entry) = state. steps . first_entry ( ) {
16591690 let batch_id = * entry. key ( ) ;
1660- let tables = entry. remove ( ) ;
1661- // Filter out already-finished tables.
1662- let total_tables = tables. len ( ) ;
1663- let active: Vec < String > = tables
1664- . into_iter ( )
1665- . filter ( |t| !state. finished_tables . contains ( t) )
1666- . collect ( ) ;
1667- let skipped = ( total_tables - active. len ( ) ) as u64 ;
1668- Some ( ( batch_id, active, skipped) )
1691+ if checkpoint_max_batch_id. is_some_and ( |max| batch_id > max) {
1692+ None
1693+ } else {
1694+ let tables = entry. remove ( ) ;
1695+ // Filter out already-finished tables.
1696+ let total_tables = tables. len ( ) ;
1697+ let active: Vec < String > = tables
1698+ . into_iter ( )
1699+ . filter ( |t| !state. finished_tables . contains ( t) )
1700+ . collect ( ) ;
1701+ let skipped = ( total_tables - active. len ( ) ) as u64 ;
1702+ Some ( ( batch_id, active, skipped) )
1703+ }
16691704 } else {
16701705 None
16711706 }
@@ -1685,6 +1720,23 @@ async fn run_pipeline(
16851720 }
16861721 ( bid, tables)
16871722 }
1723+ None if checkpoint_max_batch_id. is_some ( ) => {
1724+ // Reached the checkpoint interval boundary (or all work
1725+ // within the interval was consumed by coalescing). Pause so
1726+ // the checkpoint validation can run against exact data.
1727+ info ! (
1728+ outer_steps_processed,
1729+ logical_steps_consumed = logical_steps_consumed. load( Ordering :: Relaxed ) ,
1730+ "Checkpoint interval boundary reached, pausing pipeline"
1731+ ) ;
1732+ progress_logger. abort ( ) ;
1733+ if let Err ( e) = data_sink. flush ( ) . await {
1734+ return PipelineState :: Stopped ( StopReason :: Error ( format ! (
1735+ "Failed to flush sink at checkpoint boundary: {e}"
1736+ ) ) ) ;
1737+ }
1738+ return PipelineState :: Paused ;
1739+ }
16881740 None => {
16891741 // No more work — pipeline is done.
16901742 break ;
@@ -1723,6 +1775,7 @@ async fn run_pipeline(
17231775 & logical_steps_consumed,
17241776 & table_name,
17251777 batch_id,
1778+ checkpoint_max_batch_id,
17261779 )
17271780 . await
17281781 {
0 commit comments