Skip to content

Commit 17ef83e

Browse files
committed
estuary-cdk: coordinate scheduled backfills for multi-cursor incremental states
Previously, connector-initiated backfills only coordinated the backfill cutoff with the incremental cursor when state.inc was a single ResourceState.Incremental. When state.inc was a dict (multiple incremental tasks, e.g., REALTIME/LOOKBACK), the code would fall through to the else branch and wipe the entire state, resetting the incremental cursors to their initial values. This could cause the incremental cursors to jump ahead when a connector-initiated backfill began. If the backfill and incremental tasks always capture all fields, skipping the incremental cursor ahead would at worst cause a slight delay in when documents in the skipped portion were captured. However, if scheduled backfills do not capture all fields (like with formula field refreshes), fields for those skipped records could be permanently missed. This commit updates the CDK to handle the case where there are multiple incremental subtasks and single backfill task, and all incremental subtasks are using datetime cursors. The minimum incremental cursor across all incremental subtasks is now used as the cutoff for the single backfill task. The incremental cursors are left untouched, meaning they do not skip ahead and prevents the potential to skip data during a formula field refresh.
1 parent 1f1d5e2 commit 17ef83e

File tree

1 file changed

+32
-9
lines changed

1 file changed

+32
-9
lines changed

estuary-cdk/estuary_cdk/capture/common.py

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,26 @@ def validated(
555555
)
556556

557557

558+
def _get_min_incremental_cursor(state: ResourceState) -> datetime | None:
559+
"""Extract the minimum incremental cursor from a ResourceState.
560+
561+
Returns the minimum datetime cursor if a valid one can be determined, None otherwise.
562+
Handles both single incremental task (state.inc is ResourceState.Incremental)
563+
and multiple incremental tasks (state.inc is a dict).
564+
"""
565+
if isinstance(state.inc, ResourceState.Incremental):
566+
if isinstance(state.inc.cursor, datetime):
567+
return state.inc.cursor
568+
elif isinstance(state.inc, dict):
569+
min_cursor: datetime | None = None
570+
for inc_state in state.inc.values():
571+
if isinstance(inc_state, ResourceState.Incremental) and isinstance(inc_state.cursor, datetime):
572+
if min_cursor is None or inc_state.cursor < min_cursor:
573+
min_cursor = inc_state.cursor
574+
return min_cursor
575+
return None
576+
577+
558578
def open(
559579
open: request.Open[Any, _ResourceConfig, _ConnectorState],
560580
resolved_bindings: list[
@@ -619,18 +639,21 @@ async def _run(task: Task):
619639

620640
if should_initialize:
621641
if is_connector_initiated:
622-
# In the most commmon case of a single fetch_changes and a single fetch_pages,
623-
# coordinate the initialized backfill's cutoff with the current incremental state's cursor.
642+
# Attempt to coordinate the initialized backfill's cutoff with the current incremental
643+
# state's cursor(s), preserving the incremental cursor position(s).
644+
min_cursor = _get_min_incremental_cursor(state) if state else None
645+
initial_backfill_state = resource.initial_state.backfill
646+
647+
# Check if we can coordinate the backfill cutoff with the incremental cursor.
648+
# We currently only perform this coordination when there's a single backfill task.
624649
if (
625-
isinstance(resource.initial_state.backfill, ResourceState.Backfill) and
626-
isinstance(resource.initial_state.backfill.cutoff, datetime) and
627650
state and
628-
isinstance(state.inc, ResourceState.Incremental) and
629-
isinstance(state.inc.cursor, datetime)
651+
min_cursor is not None and
652+
isinstance(initial_backfill_state, ResourceState.Backfill) and
653+
isinstance(initial_backfill_state.cutoff, datetime)
630654
):
631-
initialized_backfill_state = resource.initial_state.backfill
632-
initialized_backfill_state.cutoff = state.inc.cursor
633-
state.backfill = initialized_backfill_state.model_copy(deep=True)
655+
state.backfill = initial_backfill_state.model_copy(deep=True)
656+
state.backfill.cutoff = min_cursor
634657
# In all other cases, wipe the state back to the initial state.
635658
else:
636659
state = resource.initial_state.model_copy(deep=True)

0 commit comments

Comments
 (0)