Skip to content

Commit a2c724a

Browse files
committed
logging
1 parent 7fcb53b commit a2c724a

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

source-stripe-native/source_stripe_native/api.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ async def fetch_incremental(
109109
# Return early and sleep if the end date is before the log cursor
110110
# or the difference between the end date and log cursor is less than
111111
# the minimum incremental interval.
112+
log.info("fetch_incremental early return", {"stream": cls.NAME, "log_cursor": log_cursor, "end": end, "end_minus_cursor": str(end - log_cursor) if end >= log_cursor else "negative"})
112113
return
113114

114115
parameters["created[lte]"] = int(end.timestamp())
@@ -185,10 +186,13 @@ async def fetch_incremental(
185186
break
186187

187188
if max_ts != log_cursor:
188-
yield max_ts + timedelta(milliseconds=1) # startTimestamp is inclusive.
189+
cursor_to_yield = max_ts + timedelta(milliseconds=1)
190+
log.info("fetch_incremental yielding cursor", {"stream": cls.NAME, "branch": "max_ts+1ms", "log_cursor": log_cursor, "max_ts": max_ts, "end": end, "cursor_to_yield": cursor_to_yield})
191+
yield cursor_to_yield # startTimestamp is inclusive.
189192
else:
190193
# No events were found. Advance the cursor to avoid re-processing the same
191194
# time window. This is safe because the early return guard ensures end > log_cursor.
195+
log.info("fetch_incremental yielding cursor", {"stream": cls.NAME, "branch": "end", "log_cursor": log_cursor, "max_ts": max_ts, "end": end, "cursor_to_yield": end})
192196
yield end
193197

194198

@@ -387,10 +391,13 @@ async def fetch_incremental_substreams(
387391
else:
388392
break
389393
if max_ts != log_cursor:
390-
yield max_ts + timedelta(milliseconds=1) # startTimestamp is inclusive.
394+
cursor_to_yield = max_ts + timedelta(milliseconds=1)
395+
log.info("fetch_incremental_substreams yielding cursor", {"stream": cls_child.NAME, "branch": "max_ts+1ms", "log_cursor": log_cursor, "max_ts": max_ts, "end": end, "cursor_to_yield": cursor_to_yield})
396+
yield cursor_to_yield # startTimestamp is inclusive.
391397
else:
392398
# No events were found. Advance the cursor to avoid re-processing the same
393399
# time window. This is safe because the early return guard ensures end > log_cursor.
400+
log.info("fetch_incremental_substreams yielding cursor", {"stream": cls_child.NAME, "branch": "end", "log_cursor": log_cursor, "max_ts": max_ts, "end": end, "cursor_to_yield": end})
394401
yield end
395402

396403

@@ -693,12 +700,15 @@ async def fetch_incremental_no_events(
693700
break
694701

695702
if max_ts != log_cursor:
696-
yield max_ts + timedelta(milliseconds=1) # startTimestamp is inclusive.
703+
cursor_to_yield = max_ts + timedelta(milliseconds=1)
704+
log.info("fetch_incremental_no_events yielding cursor", {"stream": cls.NAME, "branch": "max_ts+1ms", "log_cursor": log_cursor, "max_ts": max_ts, "end": end, "cursor_to_yield": cursor_to_yield})
705+
yield cursor_to_yield # startTimestamp is inclusive.
697706
elif end > log_cursor:
698707
# No new documents were found. Advance the cursor to `end` (now - LAG) to avoid
699708
# re-processing the same time window. This function polls the list endpoint directly
700709
# (not the Events API), so documents are filtered by their `created` timestamp.
701710
# Only yield if `end` is actually ahead of the current cursor.
711+
log.info("fetch_incremental_no_events yielding cursor", {"stream": cls.NAME, "branch": "end", "log_cursor": log_cursor, "max_ts": max_ts, "end": end, "cursor_to_yield": end})
702712
yield end
703713
# Otherwise, don't yield a cursor - the cursor is already ahead of `end`,
704714
# so let the task sleep and try again later.
@@ -807,10 +817,13 @@ async def fetch_incremental_usage_records(
807817
break
808818

809819
if max_ts != log_cursor:
810-
yield max_ts + timedelta(milliseconds=1) # startTimestamp is inclusive.
820+
cursor_to_yield = max_ts + timedelta(milliseconds=1)
821+
log.info("fetch_incremental_usage_records yielding cursor", {"stream": cls_child.NAME, "branch": "max_ts+1ms", "log_cursor": log_cursor, "max_ts": max_ts, "end": end, "cursor_to_yield": cursor_to_yield})
822+
yield cursor_to_yield # startTimestamp is inclusive.
811823
else:
812824
# No events were found. Advance the cursor to avoid re-processing the same
813825
# time window. This is safe because the early return guard ensures end > log_cursor.
826+
log.info("fetch_incremental_usage_records yielding cursor", {"stream": cls_child.NAME, "branch": "end", "log_cursor": log_cursor, "max_ts": max_ts, "end": end, "cursor_to_yield": end})
814827
yield end
815828

816829

source-stripe-native/source_stripe_native/priority_capture.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,7 @@ async def _binding_incremental_task_with_work_item(
588588
)
589589

590590
if not is_larger:
591+
task.log.error("cursor not advancing", {"yielded": item, "state_cursor": state.cursor, "subtask_id": work_item.account_id, "stateKey": binding.stateKey})
591592
raise RuntimeError(
592593
f"Implementation error: FetchChangesFn yielded LogCursor {item} which is not greater than the last LogCursor {state.cursor}",
593594
)

0 commit comments

Comments
 (0)