-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathcommon.py
More file actions
1149 lines (941 loc) · 41.6 KB
/
common.py
File metadata and controls
1149 lines (941 loc) · 41.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import abc
import asyncio
import functools
from enum import Enum, StrEnum
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
import inspect
from logging import Logger
from typing import (
Any,
AsyncGenerator,
Awaitable,
Callable,
cast,
ClassVar,
Generic,
Iterable,
Literal,
TypeVar,
Tuple,
)
from pydantic import AwareDatetime, BaseModel, Field, NonNegativeInt
from ..cron import next_fire
from ..utils import json_merge_patch
from ..flow import (
AccessToken,
BaseOAuth2Credentials,
CaptureBinding,
ClientCredentialsOAuth2Credentials,
OAuth2TokenFlowSpec,
OAuth2RotatingTokenSpec,
AuthorizationCodeFlowOAuth2Credentials,
LongLivedClientCredentialsOAuth2Credentials,
ResourceOwnerPasswordOAuth2Credentials,
RotatingOAuth2Credentials,
OAuth2Spec,
ValidationError,
BasicAuth,
)
from ..pydantic_polyfill import GenericModel
from . import Task, request, response
LogCursor = Tuple[str | int] | AwareDatetime | NonNegativeInt
"""LogCursor is a cursor into a logical log of changes.
The two predominant strategies for accessing logs are:
a) fetching entities which were created / updated / deleted since a given datetime.
b) fetching changes by their offset in a sequential log (Kafka partition or Gazette journal).
Note that `str` cannot be added to this type union, as it makes parsing states ambiguous, however
the tuple type allows for str or integer values to be used
"""
# Cursor marker for dict-based cursors.
# This marker is necessary to distinguish between regular dictionaries (documents)
# and cursor dictionaries when yielded by fetch functions. Without this marker,
# the runtime cannot determine whether a dict should be treated as a document to
# capture or as a PageCursor for checkpointing progress.
CURSOR_MARKER = "__flow_cursor__"
def is_cursor_dict(item: dict) -> bool:
"""Check if a dict is a cursor by looking for the cursor marker."""
return item.get(CURSOR_MARKER) is True
def make_cursor_dict(data: dict) -> dict:
"""Convert a regular dict to a cursor dict by adding the marker.
The intended way of using this is to first call `make_cursor_dict` on a dictionary that
represents the full state of a resource's `PageCursor` and then subsequent calls representing changes
to that state. This allows JSON merge patches to be used for efficient incremental updates.
IMPORTANT: When checkpointing, connectors should yield only the changes/patches that need to be
merged into the backfill state's next_page, not the entire dictionary. This approach:
- Keeps checkpoints relatively small
- Reduces the impact checkpoints have on the recovery log size
- Enables efficient incremental state updates via JSON merge patches
Example:
```python
# Initial full state - use make_cursor_dict for the complete state
initial_full_page_cursor = make_cursor_dict({"board1": {"items": 10}, "board2": {"items": 20}})
yield initial_full_page_cursor
# do some processing for a single invocation of fetch_page
# yield a patch for processed boards - just the changes, not the full state
completed_boards = {"board1": None} # marks board1 as completed
yield completed_boards
```
Args:
data: The dictionary data to mark as a cursor. For initial state, this should be
the complete cursor state. For subsequent calls, this should contain only
the changes to be merged.
"""
return {CURSOR_MARKER: True, **data}
def pop_cursor_marker(cursor: dict) -> dict:
"""Remove the cursor marker from a cursor dict."""
cursor.pop(CURSOR_MARKER, None)
return cursor
PageCursor = str | int | dict | None
"""PageCursor is a cursor into a paged result set.
These cursors are predominantly an opaque string or an internal offset integer.
When a dict is used, it represents a structured cursor that supports JSON merge patches
for efficient incremental updates.
None means "begin a new iteration" in a request context,
and "no pages remain" in a response context.
"""
class Triggers(Enum):
BACKFILL = "BACKFILL"
@dataclass
class SourcedSchema:
"""
SourcedSchema encapsulates a source-defined schema for a specific binding.
SourcedSchemas are used to inform the runtime about types & metadata that
aren't easily inferred by inspecting specific documents. The runtime
widens inferred schemas to accommodate the current inferred schema along
with any SourcedSchemas.
"""
value: dict[str, Any]
class BaseDocument(BaseModel):
class Meta(BaseModel):
op: Literal["c", "u", "d"] = Field(
default="u",
description="Operation type (c: Create, u: Update, d: Delete)",
)
row_id: int = Field(
default=-1,
description="Row ID of the Document, counting up from zero, or -1 if not known",
)
meta_: Meta = Field(
default_factory=lambda: BaseDocument.Meta(op="u"),
alias="_meta",
description="Document metadata",
)
_BaseDocument = TypeVar("_BaseDocument", bound=BaseDocument)
class BaseResourceConfig(abc.ABC, BaseModel, extra="forbid"):
"""
AbstractResourceConfig is a base class for ResourceConfig classes.
"""
PATH_POINTERS: ClassVar[list[str]]
meta_: dict | None = Field(
default=None, alias="_meta", title="Meta",
)
@abc.abstractmethod
def path(self) -> list[str]:
raise NotImplementedError()
_BaseResourceConfig = TypeVar("_BaseResourceConfig", bound=BaseResourceConfig)
class ResourceConfig(BaseResourceConfig):
"""ResourceConfig is a common resource configuration shape."""
PATH_POINTERS: ClassVar[list[str]] = ["/name"]
name: str = Field(description="Name of this resource")
interval: timedelta = Field(
default=timedelta(), description="Interval between updates for this resource"
)
# NOTE(johnny): If we need a namespace, introduce an ExtResourceConfig (?)
# which adds a `namespace` field like:
# namespace: str | None = Field(
# default=None, description="Enclosing schema namespace of this resource"
# )
def path(self) -> list[str]:
return [self.name]
_ResourceConfig = TypeVar("_ResourceConfig", bound=ResourceConfig)
CRON_REGEX = (
r"^"
r"((?:[0-5]?\d(?:-[0-5]?\d)?|\*(?:/[0-5]?\d)?)(?:,(?:[0-5]?\d(?:-[0-5]?\d)?|\*(?:/[0-5]?\d)?))*)\s+" # minute
r"((?:[01]?\d|2[0-3]|(?:[01]?\d|2[0-3])-(?:[01]?\d|2[0-3])|\*(?:/[01]?\d|/2[0-3])?)(?:,(?:[01]?\d|2[0-3]|(?:[01]?\d|2[0-3])-(?:[01]?\d|2[0-3])|\*(?:/[01]?\d|/2[0-3])?))*)\s+" # hour
r"((?:0?[1-9]|[12]\d|3[01]|(?:0?[1-9]|[12]\d|3[01])-(?:0?[1-9]|[12]\d|3[01])|\*(?:/[0-9]|/1[0-9]|/2[0-9]|/3[01])?)(?:,(?:0?[1-9]|[12]\d|3[01]|(?:0?[1-9]|[12]\d|3[01])-(?:0?[1-9]|[12]\d|3[01])|\*(?:/[0-9]|/1[0-9]|/2[0-9]|/3[01])?))*)\s+" # day of month
r"((?:[1-9]|1[0-2]|(?:[1-9]|1[0-2])-(?:[1-9]|1[0-2])|\*(?:/[1-9]|/1[0-2])?)(?:,(?:[1-9]|1[0-2]|(?:[1-9]|1[0-2])-(?:[1-9]|1[0-2])|\*(?:/[1-9]|/1[0-2])?))*)\s+" # month
r"((?:[0-6]|(?:[0-6])-(?:[0-6])|\*(?:/[0-6])?)(?:,(?:[0-6]|(?:[0-6])-(?:[0-6])|\*(?:/[0-6])?))*)" # day of week
r"$|^$" # Empty string to signify no schedule
)
class ResourceConfigWithSchedule(ResourceConfig):
schedule: str = Field(
default="",
title="Schedule",
description="Schedule to automatically rebackfill this binding. Accepts a cron expression.",
pattern=CRON_REGEX,
)
async def scheduled_stop(task: Task, future_dt: datetime) -> None:
sleep_duration = future_dt - datetime.now(tz=UTC)
await asyncio.sleep(sleep_duration.total_seconds())
task.stopping.event.set()
class BaseResourceState(abc.ABC, BaseModel, extra="forbid"):
"""
AbstractResourceState is a base class for ResourceState classes.
"""
pass
_BaseResourceState = TypeVar("_BaseResourceState", bound=BaseResourceState)
class ResourceState(BaseResourceState, BaseModel, extra="forbid"):
"""ResourceState composes separate incremental, backfill, and snapshot states.
Inner states can be updated independently, so long as sibling states are left unset.
The Flow runtime will merge-patch partial checkpoint states into an aggregated state.
"""
class Incremental(BaseModel, extra="forbid"):
"""Partial state of a resource which is being incrementally captured"""
cursor: LogCursor = Field(
description="Cursor of the last-synced document in the logical log"
)
class Backfill(BaseModel, extra="forbid"):
"""Partial state of a resource which is being backfilled"""
cutoff: LogCursor = Field(
description="LogCursor at which incremental replication began"
)
next_page: PageCursor = Field(
description="PageCursor of the next page to fetch", default=None
)
class Snapshot(BaseModel, extra="forbid"):
"""Partial state of a resource for which periodic snapshots are taken"""
updated_at: AwareDatetime = Field(description="Time of the last snapshot")
last_count: int = Field(
description="Number of documents captured from this resource by the last snapshot"
)
last_digest: str = Field(
description="The xxh3_128 hex digest of documents of this resource in the last snapshot"
)
inc: Incremental | dict[str, Incremental | None] | None = Field(
default=None, description="Incremental capture progress"
)
backfill: Backfill | dict[str, Backfill | None] | None = Field(
default=None,
description="Backfill progress, or None if no backfill is occurring",
)
snapshot: Snapshot | None = Field(default=None, description="Snapshot progress")
last_initialized: datetime | None = Field(
default=None, description="The last time this state was initialized."
)
is_connector_initiated: bool = Field(
default=False, description="Indicates if this backfill was initiated by the connector.",
)
_ResourceState = TypeVar("_ResourceState", bound=ResourceState)
class ConnectorState(GenericModel, Generic[_BaseResourceState], extra="forbid"):
"""ConnectorState represents a number of ResourceStates, keyed by binding state key."""
bindingStateV1: dict[str, _BaseResourceState | None] = {}
backfillRequests: dict[str, bool | None] = {}
# A refresh token that's more recent than the one in the connector's spec. It's used when
# a connector requires periodically rotating refresh tokens, otherwise it's None.
refresh_token: str | None = None
_ConnectorState = TypeVar("_ConnectorState", bound=ConnectorState)
@dataclass
class AssociatedDocument(Generic[_BaseDocument]):
"""
Emitting AssociatedDocument allows you to represent capturing document for other bindings.
You might use this if your data model requires you to load "child" documents when capturing a "parent" document,
instead of independently loading the child data stream.
"""
doc: _BaseDocument
binding: int
FetchSnapshotFn = Callable[[Logger], AsyncGenerator[_BaseDocument | dict, None]]
"""
FetchSnapshotFn is a function which fetches a complete snapshot of a resource.
Snapshot resources are typically "small" -- they fit easily on disk -- and are
gathered in a single shot. Its content is digested to determine if its
changed since the last snapshot. If it hasn't, the snapshot is discarded and
not emitted by the connector.
"""
FetchPageFn = Callable[
[Logger, PageCursor, LogCursor],
AsyncGenerator[_BaseDocument | dict | AssociatedDocument | PageCursor, None],
]
"""
FetchPageFn fetches available checkpoints since the provided last PageCursor.
It will typically fetch just one page, though it may fetch multiple pages.
The argument PageCursor is None if a new iteration is being started.
Otherwise it is the last PageCursor yielded by FetchPageFn.
The argument LogCursor is the "cutoff" log position at which incremental
replication started, and should be used to suppress documents which were
modified at-or-after the cutoff, as such documents are
already observed through incremental replication.
Checkpoints consist of a yielded sequence of documents followed by a
non-None PageCursor, which checkpoints those preceding documents,
or by simply returning if the iteration is complete.
It's an error if FetchPageFn yields a PageCursor of None.
Instead, mark the end of the sequence by yielding documents and then
returning without yielding a final PageCursor.
"""
RecurringFetchPageFn = Callable[
[Logger, PageCursor, LogCursor, bool],
AsyncGenerator[_BaseDocument | dict | AssociatedDocument | PageCursor, None],
]
"""
RecurringFetchPagesFn fetches available checkpoints since the provided last PageCursor.
It will typically fetch just one page, though it may fetch multiple pages.
RecurringFetchPagesFn is intended to start new iterations on some cadence,
often based on a schedule set in ResourceConfigWithSchedule.
The argument PageCursor is None if a new iteration is being started.
Otherwise it is the last PageCursor yielded by RecurringFetchPagesFn.
The argument LogCursor is the "cutoff" log position at which incremental
replication started, and should be used to suppress documents which were
modified at-or-after the cutoff, as such documents are
already observed through incremental replication.
The boolean argument signals if this iteration/backfill was connector-initiated.
It is True if this iteration was connector-initiated, and False otherwise.
Checkpoints consist of a yielded sequence of documents followed by a
non-None PageCursor, which checkpoints those preceding documents,
or by simply returning if the iteration is complete.
It's an error if RecurringFetchPagesFn yields a PageCursor of None.
Instead, mark the end of the sequence by yielding documents and then
returning without yielding a final PageCursor.
"""
FetchChangesFn = Callable[
[Logger, LogCursor],
AsyncGenerator[_BaseDocument | dict | AssociatedDocument | LogCursor, None],
]
"""
FetchChangesFn fetches available checkpoints since the provided last LogCursor.
Checkpoints consist of a yielded sequence of documents followed by a LogCursor,
where the LogCursor checkpoints those preceding documents.
Yielded LogCursors MUST be strictly increasing relative to the argument
LogCursor and also to previously yielded LogCursors.
It's an error if FetchChangesFn yields documents, and then returns without
yielding a final LogCursor. NOTE(johnny): if needed, we could extend the
contract to allow an explicit "roll back" sentinel.
FetchChangesFn yields until no further checkpoints are readily available,
and then returns. If no checkpoints are available at all,
it yields nothing and returns.
Implementations may block for brief periods to await checkpoints, such as while
awaiting a server response, but MUST NOT block forever as it prevents the
connector from exiting.
Implementations MAY return early, such as if it's convenient to fetch only
a next page of recent changes. If an implementation yields any checkpoints,
then it is immediately re-invoked.
Otherwise if it returns without yielding a checkpoint, then
`ResourceConfig.interval` is respected between invocations.
Implementations SHOULD NOT sleep or implement their own coarse rate limit
(use `ResourceConfig.interval`).
"""
def is_recurring_fetch_page_fn(fn: FetchPageFn | RecurringFetchPageFn, log: Logger, page: PageCursor, cutoff: LogCursor, is_connector_initiated: bool) -> bool:
"""Check if the function signature accepts the arguments of a RecurringFetchPageFn."""
try:
inspect.signature(fn).bind(log, page, cutoff, is_connector_initiated)
return True
except TypeError:
return False
class ReductionStrategy(StrEnum):
APPEND = "append"
FIRST_WRITE_WINS = "firstWriteWins"
LAST_WRITE_WINS = "lastWriteWins"
MERGE = "merge"
MINIMIZE = "minimize"
MAXIMIZE = "maximize"
SET = "set"
SUM = "sum"
@dataclass
class Resource(Generic[_BaseDocument, _BaseResourceConfig, _BaseResourceState]):
"""Resource is a high-level description of an available capture resource,
encapsulating metadata for catalog discovery as well as a capability
to open() the resource for capture."""
@dataclass
class FixedSchema:
"""
FixedSchema encapsulates a prior JSON schema which should be used
as the model schema, rather than dynamically generating a schema.
"""
value: dict
name: str
key: list[str]
model: type[_BaseDocument] | FixedSchema
open: Callable[
[
CaptureBinding[_BaseResourceConfig],
int,
ResourceState,
Task,
list[
tuple[
CaptureBinding[_ResourceConfig],
"Resource[_BaseDocument, _ResourceConfig, _ResourceState]",
]
],
],
None,
]
initial_state: _BaseResourceState
initial_config: _BaseResourceConfig
schema_inference: bool
reduction_strategy: ReductionStrategy | None = None
disable: bool = False
def discovered(
resources: list["Resource[_BaseDocument, _BaseResourceConfig, _BaseResourceState]"],
) -> response.Discovered[_BaseResourceConfig]:
bindings: list[response.DiscoveredBinding] = []
for resource in resources:
if isinstance(resource.model, Resource.FixedSchema):
schema = resource.model.value
else:
schema = resource.model.model_json_schema(mode="serialization")
if resource.schema_inference:
schema["x-infer-schema"] = True
if resource.reduction_strategy:
schema["reduce"] = {"strategy": resource.reduction_strategy}
bindings.append(
response.DiscoveredBinding(
documentSchema=schema,
key=resource.key,
recommendedName=resource.name,
resourceConfig=resource.initial_config,
disable=resource.disable,
)
)
return response.Discovered(bindings=bindings)
_ResolvableBinding = TypeVar(
"_ResolvableBinding", bound=CaptureBinding | request.ValidateBinding
)
"""_ResolvableBinding is either a CaptureBinding or a request.ValidateBinding"""
def resolve_bindings(
bindings: list[_ResolvableBinding],
resources: list[Resource[Any, _BaseResourceConfig, Any]],
resource_term="Resource",
) -> list[tuple[_ResolvableBinding, Resource[Any, _BaseResourceConfig, Any]]]:
resolved: list[
tuple[_ResolvableBinding, Resource[Any, _BaseResourceConfig, Any]]
] = []
errors: list[str] = []
for binding in bindings:
path = binding.resourceConfig.path()
# Find a resource which matches this binding.
found = False
for resource in resources:
if path == resource.initial_config.path():
resolved.append((binding, resource))
found = True
break
if not found:
errors.append(f"{resource_term} '{'.'.join(path)}' was not found.")
if errors:
raise ValidationError(errors)
return resolved
def validated(
resolved_bindings: list[
tuple[
request.ValidateBinding[_BaseResourceConfig],
Resource[Any, _BaseResourceConfig, Any],
]
],
) -> response.Validated:
return response.Validated(
bindings=[
response.ValidatedBinding(resourcePath=b[0].resourceConfig.path())
for b in resolved_bindings
],
)
def open(
open: request.Open[Any, _ResourceConfig, _ConnectorState],
resolved_bindings: list[
tuple[
CaptureBinding[_ResourceConfig],
Resource[_BaseDocument, _ResourceConfig, _ResourceState],
]
],
) -> tuple[response.Opened, Callable[[Task], Awaitable[None]]]:
async def _run(task: Task):
backfill_requests = []
if open.state.backfillRequests is not None:
for stateKey in open.state.backfillRequests.keys():
task.log.info(
"clearing checkpoint due to backfill trigger",
{"stateKey": stateKey},
)
backfill_requests.append(stateKey)
task.checkpoint(
ConnectorState(
bindingStateV1={stateKey: None},
backfillRequests={stateKey: None},
)
)
soonest_future_scheduled_initialization: datetime | None = None
NOW = datetime.now(tz=UTC)
for index, (binding, resource) in enumerate(resolved_bindings):
state: _ResourceState | None = open.state.bindingStateV1.get(
binding.stateKey
)
should_initialize = state is None or binding.stateKey in backfill_requests
is_connector_initiated = False
if state:
if state.last_initialized is None:
state.last_initialized = NOW
task.checkpoint(
ConnectorState(bindingStateV1={binding.stateKey: state})
)
if not state.backfill and isinstance(binding.resourceConfig, ResourceConfigWithSchedule):
cron_schedule = binding.resourceConfig.schedule
missed_scheduled_initialization = next_fire(
cron_schedule, state.last_initialized, NOW
)
future_scheduled_initialization = next_fire(
cron_schedule, NOW,
)
if missed_scheduled_initialization:
should_initialize = True
is_connector_initiated = True
if future_scheduled_initialization:
if not soonest_future_scheduled_initialization:
soonest_future_scheduled_initialization = future_scheduled_initialization
else:
soonest_future_scheduled_initialization = min(soonest_future_scheduled_initialization, future_scheduled_initialization)
if should_initialize:
if is_connector_initiated:
# In the most commmon case of a single fetch_changes and a single fetch_pages,
# coordinate the initialized backfill's cutoff with the current incremental state's cursor.
if (
isinstance(resource.initial_state.backfill, ResourceState.Backfill) and
isinstance(resource.initial_state.backfill.cutoff, datetime) and
state and
isinstance(state.inc, ResourceState.Incremental) and
isinstance(state.inc.cursor, datetime)
):
initialized_backfill_state = resource.initial_state.backfill
initialized_backfill_state.cutoff = state.inc.cursor
state.backfill = initialized_backfill_state.model_copy(deep=True)
# In all other cases, wipe the state back to the initial state.
else:
state = resource.initial_state.model_copy(deep=True)
state.is_connector_initiated = True
else:
state = resource.initial_state.model_copy(deep=True)
state.last_initialized = NOW
# Checkpoint the binding's initialized state prior to any processing.
task.checkpoint(
ConnectorState(
bindingStateV1={binding.stateKey: state},
)
)
resource.open(
binding,
index,
state,
task,
resolved_bindings,
)
if soonest_future_scheduled_initialization:
# Gracefully exit to ensure relatively close adherence to any bindings'
# re-initialization schedules.
asyncio.create_task(scheduled_stop(task, soonest_future_scheduled_initialization))
return (response.Opened(explicitAcknowledgements=False), _run)
def open_binding(
binding: CaptureBinding[_ResourceConfig],
binding_index: int,
resource_state: _ResourceState,
task: Task,
fetch_changes: FetchChangesFn[_BaseDocument]
| dict[str, FetchChangesFn[_BaseDocument]]
| None = None,
fetch_page: FetchPageFn[_BaseDocument]
| RecurringFetchPageFn[_BaseDocument]
| dict[str, FetchPageFn[_BaseDocument]]
| None = None,
fetch_snapshot: FetchSnapshotFn[_BaseDocument] | None = None,
tombstone: _BaseDocument | None = None,
):
"""
open_binding() is intended to be called by closures set as Resource.open Callables.
It does 'heavy lifting' to actually capture a binding.
When fetch_changes, fetch_page, or fetch_snapshot are provided as dictionaries,
each function will be run as a separate subtask with its own independent state.
The dictionary keys are used as subtask IDs and are used to store and retrieve
the state for each subtask in state.inc, state.backfill, or state.snapshot.
"""
task.connector_status.inc_binding_count()
prefix = ".".join(binding.resourceConfig.path())
if fetch_changes:
async def incremental_closure(
task: Task,
fetch_changes: FetchChangesFn[_BaseDocument],
state: ResourceState.Incremental,
subtask_id: str | None = None,
):
assert state and not isinstance(state, dict)
await _binding_incremental_task(
binding,
binding_index,
fetch_changes,
state,
task,
subtask_id,
)
if isinstance(fetch_changes, dict):
assert resource_state.inc and isinstance(resource_state.inc, dict)
for subtask_id, subtask_fetch_changes in fetch_changes.items():
inc_state = resource_state.inc.get(subtask_id)
assert inc_state
task.spawn_child(
f"{prefix}.incremental.{subtask_id}",
functools.partial(
incremental_closure,
fetch_changes=subtask_fetch_changes,
state=inc_state,
subtask_id=subtask_id,
),
)
else:
assert resource_state.inc and not isinstance(resource_state.inc, dict)
task.spawn_child(
f"{prefix}.incremental",
functools.partial(
incremental_closure,
fetch_changes=fetch_changes,
state=resource_state.inc,
),
)
if fetch_page and resource_state.backfill:
async def backfill_closure(
task: Task,
fetch_page: FetchPageFn[_BaseDocument] | RecurringFetchPageFn,
state: ResourceState.Backfill,
subtask_id: str | None = None,
):
assert state and not isinstance(state, dict)
task.connector_status.inc_backfilling(binding_index)
await _binding_backfill_task(
binding,
binding_index,
fetch_page,
state,
resource_state.last_initialized,
resource_state.is_connector_initiated,
task,
subtask_id,
)
task.connector_status.dec_backfilling(binding_index)
if isinstance(fetch_page, dict):
assert resource_state.backfill and isinstance(resource_state.backfill, dict)
for subtask_id, subtask_fetch_page in fetch_page.items():
backfill_state = resource_state.backfill.get(subtask_id)
if not backfill_state:
continue
task.spawn_child(
f"{prefix}.backfill.{subtask_id}",
functools.partial(
backfill_closure,
fetch_page=subtask_fetch_page,
state=backfill_state,
subtask_id=subtask_id,
),
)
else:
assert resource_state.backfill and not isinstance(resource_state.backfill, dict)
task.spawn_child(
f"{prefix}.backfill",
functools.partial(
backfill_closure,
fetch_page=fetch_page,
state=resource_state.backfill,
),
)
if fetch_snapshot:
async def closure(task: Task):
assert tombstone
await _binding_snapshot_task(
binding,
binding_index,
fetch_snapshot,
resource_state.snapshot,
task,
tombstone,
)
task.spawn_child(f"{prefix}.snapshot", closure)
async def _binding_snapshot_task(
binding: CaptureBinding[_ResourceConfig],
binding_index: int,
fetch_snapshot: FetchSnapshotFn[_BaseDocument],
state: ResourceState.Snapshot | None,
task: Task,
tombstone: _BaseDocument,
):
"""Snapshot the content of a resource at a regular interval."""
if not state:
state = ResourceState.Snapshot(
updated_at=datetime.fromtimestamp(0, tz=UTC),
last_count=0,
last_digest="",
)
connector_state = ConnectorState(
bindingStateV1={binding.stateKey: ResourceState(snapshot=state)}
)
while True:
# Yield to the event loop to prevent starvation.
# Note that wait_for does *not* yield if sleep_for has already elapsed.
await asyncio.sleep(0)
next_sync = state.updated_at + binding.resourceConfig.interval
sleep_for = next_sync - datetime.now(tz=UTC)
task.log.debug(
"awaiting next snapshot",
{"sleep_for": sleep_for, "next": next_sync},
)
try:
if not task.stopping.event.is_set():
await asyncio.wait_for(
task.stopping.event.wait(), timeout=sleep_for.total_seconds()
)
task.log.debug(f"periodic snapshot is idle and is yielding to stop")
return
except asyncio.TimeoutError:
# `sleep_for` elapsed.
state.updated_at = datetime.now(tz=UTC)
count = 0
async for doc in fetch_snapshot(task.log):
if isinstance(doc, dict):
doc["meta_"] = {
"op": "u" if count < state.last_count else "c",
"row_id": count,
}
else:
doc.meta_ = BaseDocument.Meta(
op="u" if count < state.last_count else "c", row_id=count
)
task.captured(binding_index, doc)
count += 1
digest = task.pending_digest()
task.log.debug(
"polled snapshot",
{
"count": count,
"digest": digest,
"last_count": state.last_count,
"last_digest": state.last_digest,
},
)
if digest != state.last_digest:
for del_id in range(count, state.last_count):
tombstone.meta_ = BaseDocument.Meta(op="d", row_id=del_id)
task.captured(binding_index, tombstone)
state.last_count = count
state.last_digest = digest
else:
# Suppress all captured documents, as they're unchanged.
task.reset()
task.checkpoint(connector_state)
async def _binding_backfill_task(
binding: CaptureBinding[_ResourceConfig],
binding_index: int,
fetch_page: FetchPageFn[_BaseDocument] | RecurringFetchPageFn[_BaseDocument],
state: ResourceState.Backfill,
last_initialized: datetime | None,
is_connector_initiated: bool,
task: Task,
subtask_id: str | None = None,
):
def _initialize_connector_state(state: ResourceState.Backfill) -> ConnectorState:
if subtask_id is not None:
connector_state = ConnectorState(
bindingStateV1={
binding.stateKey: ResourceState(backfill={subtask_id: state})
}
)
else:
connector_state = ConnectorState(
bindingStateV1={binding.stateKey: ResourceState(backfill=state)}
)
return connector_state
connector_state = _initialize_connector_state(state)
if state.next_page is not None:
task.log.info("resuming backfill", {"state": state, "subtask_id": subtask_id})
else:
task.log.info("beginning backfill", {"state": state, "subtask_id": subtask_id})
while True:
# Yield to the event loop to prevent starvation.
await asyncio.sleep(0)
if task.stopping.event.is_set():
task.log.debug("backfill is yielding to stop", {"subtask_id": subtask_id})
return
# Track if fetch_page returns without having yielded a PageCursor.
done = True
# Distinguish between FetchPageFn and RecurringFetchPageFn to provide the correct arguments.
if is_recurring_fetch_page_fn(fetch_page, task.log, state.next_page, state.cutoff, is_connector_initiated):
fn = cast(RecurringFetchPageFn, fetch_page)
pages = fn(task.log, state.next_page, state.cutoff, is_connector_initiated)
else:
fn = cast(FetchPageFn, fetch_page)
pages = fn(task.log, state.next_page, state.cutoff)
async for item in pages:
if isinstance(item, BaseDocument) or (
isinstance(item, dict) and not is_cursor_dict(item)
):
task.captured(binding_index, item)
done = True
elif isinstance(item, AssociatedDocument):
task.captured(item.binding, item.doc)
done = True
elif item is None:
raise RuntimeError(
"Implementation error: FetchPageFn yielded PageCursor None. To represent end-of-sequence, yield documents and return without a final PageCursor."
)
else:
if isinstance(item, dict) and is_cursor_dict(item):
# For dict-based cursors, create a separate state to checkpoint with just the item
# This ensures the Flow runtime gets the merge patch, not the merged result.
pop_cursor_marker(item)
if state.next_page is None:
state.next_page = item
state_to_checkpoint = connector_state
elif isinstance(state.next_page, dict):
# Perform JSON merge patch on the in-memory state.
json_merge_patch(state.next_page, item)
# Only emit a checkpoint containing the patch.
checkpoint_state = ResourceState.Backfill(
cutoff=state.cutoff, next_page=item
)
state_to_checkpoint = _initialize_connector_state(
checkpoint_state
)
else:
raise RuntimeError(
f"Implementation error: dictionary PageCursor was yielded but the previous PageCursor was a {type(state.next_page)}"
)
else:
state.next_page = item
state_to_checkpoint = connector_state
task.checkpoint(state_to_checkpoint)
done = False
if done:
break
if subtask_id is not None:
task.checkpoint(
ConnectorState(
bindingStateV1={
binding.stateKey: ResourceState(backfill={subtask_id: None})
}
)
)
else:
task.checkpoint(