Skip to content

Commit fc7c98c

Browse files
authored
feat(low code): Add GroupingPartitionRouter (#354)
1 parent 8be3833 commit fc7c98c

File tree

8 files changed

+950
-3
lines changed

8 files changed

+950
-3
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+42
Original file line numberDiff line numberDiff line change
@@ -3260,12 +3260,14 @@ definitions:
32603260
- "$ref": "#/definitions/CustomPartitionRouter"
32613261
- "$ref": "#/definitions/ListPartitionRouter"
32623262
- "$ref": "#/definitions/SubstreamPartitionRouter"
3263+
- "$ref": "#/definitions/GroupingPartitionRouter"
32633264
- type: array
32643265
items:
32653266
anyOf:
32663267
- "$ref": "#/definitions/CustomPartitionRouter"
32673268
- "$ref": "#/definitions/ListPartitionRouter"
32683269
- "$ref": "#/definitions/SubstreamPartitionRouter"
3270+
- "$ref": "#/definitions/GroupingPartitionRouter"
32693271
decoder:
32703272
title: Decoder
32713273
description: Component decoding the response so records can be extracted.
@@ -3431,12 +3433,14 @@ definitions:
34313433
- "$ref": "#/definitions/CustomPartitionRouter"
34323434
- "$ref": "#/definitions/ListPartitionRouter"
34333435
- "$ref": "#/definitions/SubstreamPartitionRouter"
3436+
- "$ref": "#/definitions/GroupingPartitionRouter"
34343437
- type: array
34353438
items:
34363439
anyOf:
34373440
- "$ref": "#/definitions/CustomPartitionRouter"
34383441
- "$ref": "#/definitions/ListPartitionRouter"
34393442
- "$ref": "#/definitions/SubstreamPartitionRouter"
3443+
- "$ref": "#/definitions/GroupingPartitionRouter"
34403444
decoder:
34413445
title: Decoder
34423446
description: Component decoding the response so records can be extracted.
@@ -3553,6 +3557,44 @@ definitions:
35533557
$parameters:
35543558
type: object
35553559
additionalProperties: true
3560+
GroupingPartitionRouter:
3561+
title: Grouping Partition Router
3562+
description: >
3563+
A decorator on top of a partition router that groups partitions into batches of a specified size.
3564+
This is useful for APIs that support filtering by multiple partition keys in a single request.
3565+
Note that per-partition incremental syncs may not work as expected because the grouping
3566+
of partitions might change between syncs, potentially leading to inconsistent state tracking.
3567+
type: object
3568+
required:
3569+
- type
3570+
- group_size
3571+
- underlying_partition_router
3572+
properties:
3573+
type:
3574+
type: string
3575+
enum: [GroupingPartitionRouter]
3576+
group_size:
3577+
title: Group Size
3578+
description: The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.
3579+
type: integer
3580+
examples:
3581+
- 10
3582+
- 50
3583+
underlying_partition_router:
3584+
title: Underlying Partition Router
3585+
description: The partition router whose output will be grouped. This can be any valid partition router component.
3586+
anyOf:
3587+
- "$ref": "#/definitions/CustomPartitionRouter"
3588+
- "$ref": "#/definitions/ListPartitionRouter"
3589+
- "$ref": "#/definitions/SubstreamPartitionRouter"
3590+
deduplicate:
3591+
title: Deduplicate Partitions
3592+
description: If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.
3593+
type: boolean
3594+
default: true
3595+
$parameters:
3596+
type: object
3597+
additionalProperties: true
35563598
WaitUntilTimeFromHeader:
35573599
title: Wait Until Time Defined In Response Header
35583600
description: Extract time at which we can retry the request from response header and wait for the difference between now and that time.

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def __init__(
7979
connector_state_manager: ConnectorStateManager,
8080
connector_state_converter: AbstractStreamStateConverter,
8181
cursor_field: CursorField,
82+
use_global_cursor: bool = False,
8283
) -> None:
8384
self._global_cursor: Optional[StreamState] = {}
8485
self._stream_name = stream_name
@@ -106,7 +107,7 @@ def __init__(
106107
self._lookback_window: int = 0
107108
self._parent_state: Optional[StreamState] = None
108109
self._number_of_partitions: int = 0
109-
self._use_global_cursor: bool = False
110+
self._use_global_cursor: bool = use_global_cursor
110111
self._partition_serializer = PerPartitionKeySerializer()
111112
# Track the last time a state message was emitted
112113
self._last_emission_time: float = 0.0

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+41-2
Original file line numberDiff line numberDiff line change
@@ -2303,7 +2303,15 @@ class SimpleRetriever(BaseModel):
23032303
CustomPartitionRouter,
23042304
ListPartitionRouter,
23052305
SubstreamPartitionRouter,
2306-
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
2306+
GroupingPartitionRouter,
2307+
List[
2308+
Union[
2309+
CustomPartitionRouter,
2310+
ListPartitionRouter,
2311+
SubstreamPartitionRouter,
2312+
GroupingPartitionRouter,
2313+
]
2314+
],
23072315
]
23082316
] = Field(
23092317
[],
@@ -2385,7 +2393,15 @@ class AsyncRetriever(BaseModel):
23852393
CustomPartitionRouter,
23862394
ListPartitionRouter,
23872395
SubstreamPartitionRouter,
2388-
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
2396+
GroupingPartitionRouter,
2397+
List[
2398+
Union[
2399+
CustomPartitionRouter,
2400+
ListPartitionRouter,
2401+
SubstreamPartitionRouter,
2402+
GroupingPartitionRouter,
2403+
]
2404+
],
23892405
]
23902406
] = Field(
23912407
[],
@@ -2437,6 +2453,29 @@ class SubstreamPartitionRouter(BaseModel):
24372453
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
24382454

24392455

2456+
class GroupingPartitionRouter(BaseModel):
2457+
type: Literal["GroupingPartitionRouter"]
2458+
group_size: int = Field(
2459+
...,
2460+
description="The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.",
2461+
examples=[10, 50],
2462+
title="Group Size",
2463+
)
2464+
underlying_partition_router: Union[
2465+
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
2466+
] = Field(
2467+
...,
2468+
description="The partition router whose output will be grouped. This can be any valid partition router component.",
2469+
title="Underlying Partition Router",
2470+
)
2471+
deduplicate: Optional[bool] = Field(
2472+
True,
2473+
description="If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.",
2474+
title="Deduplicate Partitions",
2475+
)
2476+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2477+
2478+
24402479
class HttpComponentsResolver(BaseModel):
24412480
type: Literal["HttpComponentsResolver"]
24422481
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+40
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@
227227
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
228228
FlattenFields as FlattenFieldsModel,
229229
)
230+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
231+
GroupingPartitionRouter as GroupingPartitionRouterModel,
232+
)
230233
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
231234
GzipDecoder as GzipDecoderModel,
232235
)
@@ -385,6 +388,7 @@
385388
)
386389
from airbyte_cdk.sources.declarative.partition_routers import (
387390
CartesianProductStreamSlicer,
391+
GroupingPartitionRouter,
388392
ListPartitionRouter,
389393
PartitionRouter,
390394
SinglePartitionRouter,
@@ -638,6 +642,7 @@ def _init_mappings(self) -> None:
638642
UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy,
639643
RateModel: self.create_rate,
640644
HttpRequestRegexMatcherModel: self.create_http_request_matcher,
645+
GroupingPartitionRouterModel: self.create_grouping_partition_router,
641646
}
642647

643648
# Needed for the case where we need to perform a second parse on the fields of a custom component
@@ -1355,6 +1360,9 @@ def create_concurrent_cursor_from_perpartition_cursor(
13551360
)
13561361
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
13571362

1363+
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
1364+
use_global_cursor = isinstance(partition_router, GroupingPartitionRouter)
1365+
13581366
# Return the concurrent cursor and state converter
13591367
return ConcurrentPerPartitionCursor(
13601368
cursor_factory=cursor_factory,
@@ -1366,6 +1374,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
13661374
connector_state_manager=state_manager,
13671375
connector_state_converter=connector_state_converter,
13681376
cursor_field=cursor_field,
1377+
use_global_cursor=use_global_cursor,
13691378
)
13701379

13711380
@staticmethod
@@ -3370,3 +3379,34 @@ def set_api_budget(self, component_definition: ComponentDefinition, config: Conf
33703379
self._api_budget = self.create_component(
33713380
model_type=HTTPAPIBudgetModel, component_definition=component_definition, config=config
33723381
)
3382+
3383+
def create_grouping_partition_router(
3384+
self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any
3385+
) -> GroupingPartitionRouter:
3386+
underlying_router = self._create_component_from_model(
3387+
model=model.underlying_partition_router, config=config
3388+
)
3389+
if model.group_size < 1:
3390+
raise ValueError(f"Group size must be greater than 0, got {model.group_size}")
3391+
3392+
# Request options in underlying partition routers are not supported for GroupingPartitionRouter
3393+
# because they are specific to individual partitions and cannot be aggregated or handled
3394+
# when grouping, potentially leading to incorrect API calls. Any request customization
3395+
# should be managed at the stream level through the requester's configuration.
3396+
if isinstance(underlying_router, SubstreamPartitionRouter):
3397+
if any(
3398+
parent_config.request_option
3399+
for parent_config in underlying_router.parent_stream_configs
3400+
):
3401+
raise ValueError("Request options are not supported for GroupingPartitionRouter.")
3402+
3403+
if isinstance(underlying_router, ListPartitionRouter):
3404+
if underlying_router.request_option:
3405+
raise ValueError("Request options are not supported for GroupingPartitionRouter.")
3406+
3407+
return GroupingPartitionRouter(
3408+
group_size=model.group_size,
3409+
underlying_partition_router=underlying_router,
3410+
deduplicate=model.deduplicate if model.deduplicate is not None else True,
3411+
config=config,
3412+
)

airbyte_cdk/sources/declarative/partition_routers/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import (
99
CartesianProductStreamSlicer,
1010
)
11+
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
12+
GroupingPartitionRouter,
13+
)
1114
from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import (
1215
ListPartitionRouter,
1316
)
@@ -22,6 +25,7 @@
2225
__all__ = [
2326
"AsyncJobPartitionRouter",
2427
"CartesianProductStreamSlicer",
28+
"GroupingPartitionRouter",
2529
"ListPartitionRouter",
2630
"SinglePartitionRouter",
2731
"SubstreamPartitionRouter",

0 commit comments

Comments
 (0)