-
Notifications
You must be signed in to change notification settings - Fork 30
chore: incremental without partition router as defaultstream #697
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: incremental without partition router as defaultstream #697
Conversation
…ed' into maxi297/availability_strategy_to_support_abstract_stream
…m' into maxi297/bland_stream_instantiated_as_defaultstream
…m' into maxi297/bland_stream_instantiated_as_defaultstream
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/incremental_without_partition_router_as_defaultstream#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/incremental_without_partition_router_as_defaultstream Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
…o maxi297/incremental_without_partition_router_as_defaultstream
…o maxi297/incremental_without_partition_router_as_defaultstream
…o maxi297/incremental_without_partition_router_as_defaultstream
…o maxi297/incremental_without_partition_router_as_defaultstream
…o maxi297/incremental_without_partition_router_as_defaultstream
@brianjlai is there a reason why airbyte-python-cdk/unit_tests/connector_builder/test_property_chunking.py Lines 208 to 210 in addd443
|
probably an oversight, but this is addressed in https://github.com/airbytehq/airbyte-python-cdk/pull/704/files#diff-a0cf4a1d832d23490722285fffb5964b6ea08d67941348be673a8fa3dec97768R221 which moves all the relevant tests to |
📝 WalkthroughWalkthroughMigrates declarative stream construction to be concurrency-aware: removes legacy incremental cursor imports, introduces a ConcurrentStreamSlicer alias, builds combined slicers and a concurrent cursor, wires retrievers and DefaultStream with cursor-derived fields, and updates tests to use DefaultStream and new access helpers. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Config as DeclarativeModel
participant Factory as ModelToComponentFactory
participant Slicer as ConcurrentStreamSlicer
participant Cursor as ConcurrentCursor
participant Retriever as SimpleRetriever
participant Stream as DefaultStream
Config->>Factory: create_declarative_stream(model)
Note right of Factory: merge slicers & build partition router
Factory->>Slicer: _merge_stream_slicers(...) -> combined_slicers
Factory->>Factory: _build_stream_slicer_from_partition_router(...)
Factory->>Factory: _build_concurrent_cursor(model, partition_router)
Factory->>Cursor: instantiate concurrent cursor (if applicable)
Factory->>Retriever: create(..., stream_slicer=combined_slicers, stop_condition_cursor=Cursor?)
Note right of Retriever #lightblue: Retriever uses cursor for stop-condition/pagination
Factory->>Stream: DefaultStream(retriever, cursor=Cursor?)
Stream->>Stream: derive cursor_field from Cursor (if present)
alt client-side filtering enabled
Stream->>Cursor: client-side filter operations
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Want me to expand the related-PR list or surface specific conflicting files to preempt merge conflicts, wdyt? Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (9)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (7)
613-615
: Alias for concurrent StreamSlicer is fine, but watch downstream typing.You use ConcurrentStreamSlicer later as a variable annotation that sometimes holds a declarative PartitionRouter. This will trip static typing unless we adjust the annotation/casts there. See my suggestion on Lines 2070–2088 and 2099 to make mypy happy, wdyt?
2000-2009
: Build partition_router once (reuse when possible) to avoid divergence.You compute combined_slicers (which may already embed the partition router) and then build partition_router again for the concurrent cursor. Reusing the already-built instance avoids creating multiple routers with potentially distinct state/parameters and reduces overhead. Would you consider reusing combined_slicers when it’s a PartitionRouter and only building if needed, wdyt?
Proposed minimal change:
- combined_slicers = self._merge_stream_slicers(model=model, config=config) - partition_router = self._build_stream_slicer_from_partition_router( - model.retriever, config, stream_name=model.name - ) - concurrent_cursor = self._build_concurrent_cursor(model, partition_router, config) + combined_slicers = self._merge_stream_slicers(model=model, config=config) + partition_router = ( + combined_slicers + if isinstance(combined_slicers, PartitionRouter) + else self._build_stream_slicer_from_partition_router( + model.retriever, config, stream_name=model.name + ) + ) + concurrent_cursor = self._build_concurrent_cursor( + model, cast(PartitionRouter, partition_router), config + )
2016-2021
: Good wiring for stop-condition and client-side incremental.Passing the concurrent cursor down to paginator (stop condition) and record filtering is the right abstraction. Consider a tiny typing polish to avoid mypy friction (optional):
- stop_condition_cursor=concurrent_cursor + stop_condition_cursor=cast(Cursor, concurrent_cursor) if self._is_stop_condition_on_cursor(model) else None,This matches the SimpleRetriever signature expecting Optional[Cursor]. Wdyt?
2054-2065
: Broaden the cursor check to include all concurrent cursors.The condition only checks for ConcurrentCursor, but _build_concurrent_cursor can also return ConcurrentPerPartitionCursor (still a Cursor). Would you switch to Cursor to be future-proof and clearer, wdyt?
- isinstance(combined_slicers, PartitionRouter) - or isinstance(concurrent_cursor, ConcurrentCursor) + isinstance(combined_slicers, PartitionRouter) + or isinstance(concurrent_cursor, Cursor)
2070-2088
: Fix mixed-type annotation and decorator cast to satisfy static typing.stream_slicer is sometimes a concurrent cursor and sometimes a declarative PartitionRouter. Annotating it as ConcurrentStreamSlicer alone is inaccurate and will upset mypy. Shall we annotate as a union and cast at the decorator call site, wdyt?
- stream_slicer: ConcurrentStreamSlicer = ( + stream_slicer: Union[PartitionRouter, ConcurrentStreamSlicer] = ( concurrent_cursor if concurrent_cursor else SinglePartitionRouter(parameters={}) )And when wrapping for test reads, cast the wrapped_slicer too:
- stream_slicer=cast( - StreamSlicer, - StreamSlicerTestReadDecorator( - wrapped_slicer=stream_slicer, - maximum_number_of_slices=self._limit_slices_fetched or 5, - ), - ), + stream_slicer=cast( + StreamSlicer, + StreamSlicerTestReadDecorator( + wrapped_slicer=cast(StreamSlicer, stream_slicer), + maximum_number_of_slices=self._limit_slices_fetched or 5, + ), + ),Also applies to: 2099-2102
2110-2115
: Preserve cursor_field when FinalStateCursor is used (fallback to model).When the active cursor doesn’t expose cursor_field (e.g., FinalStateCursor), we currently pass "". To keep behavior closer to pre-migration DeclarativeStream (and to aid consumers/UI), would you fallback to the model’s cursor_field if available, wdyt?
- cursor_field=cursor.cursor_field.cursor_field_key - if hasattr(cursor, "cursor_field") - else "", # FIXME we should have the cursor field has part of the interface of cursor, + cursor_field=( + getattr(getattr(cursor, "cursor_field", None), "cursor_field_key", None) + or ( + model.incremental_sync.cursor_field + if model.incremental_sync + and hasattr(model.incremental_sync, "cursor_field") + else "" + ) + ),Also, the logger name change is noted as a breaking change. Do we want a follow-up todo or release note to call this out explicitly, wdyt?
2292-2310
: Prefer isinstance over type equality for model checks.Using isinstance is more idiomatic and resilient to any future subclassing. Would you consider this small cleanup, wdyt?
- elif model.incremental_sync: - if type(model.incremental_sync) == IncrementingCountCursorModel: + elif model.incremental_sync: + if isinstance(model.incremental_sync, IncrementingCountCursorModel): return self.create_concurrent_cursor_from_incrementing_count_cursor( model_type=IncrementingCountCursorModel, component_definition=model.incremental_sync.__dict__, stream_name=model.name or "", stream_namespace=None, config=config or {}, stream_state_migrations=state_transformations, ) - elif type(model.incremental_sync) == DatetimeBasedCursorModel: + elif isinstance(model.incremental_sync, DatetimeBasedCursorModel): return self.create_concurrent_cursor_from_datetime_based_cursor( model_type=type(model.incremental_sync), component_definition=model.incremental_sync.__dict__, stream_name=model.name or "", stream_namespace=None, config=config or {}, stream_state_migrations=state_transformations, attempt_to_create_cursor_if_not_provided=True, )unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)
353-356
: Shift to DefaultStream + accessor helpers is correct; consider consolidating helpers to avoid private-attr couplingThe assertions reflect the new wiring: DefaultStream for incremental-without-partition-router, stream.cursor_field exposure, and accessing retriever/paginator/requester via the new helpers. This all aligns with the PR intent and the concurrent model.
Would you consider moving get_schema_loader/get_retriever into a shared test utility (e.g., unit_tests/test_utils/streams.py) so multiple test modules don’t re-encode long private attribute chains, reducing fragility if internals change again, wdyt? You could then import those helpers here and in connector_builder tests to keep things DRY.
Also applies to: 357-360, 361-371, 372-383, 386-405, 406-411, 412-433, 436-439
1192-1198
: Use identity for boolean assertionMinor nit: prefer identity for booleans. Would you switch to
is True
for clarity, wdyt?- assert get_retriever(stream).record_selector.transform_before_filtering == True + assert get_retriever(stream).record_selector.transform_before_filtering is True
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(7 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(8 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
🧬 Code Graph Analysis (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)
airbyte_cdk/sources/declarative/requesters/requester.py (1)
HttpMethod
(18-26)airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py (1)
StreamSlicer
(27-39)airbyte_cdk/sources/declarative/partition_routers/partition_router.py (1)
PartitionRouter
(14-62)airbyte_cdk/sources/streams/concurrent/cursor.py (4)
ConcurrentCursor
(134-502)Cursor
(50-86)FinalStateCursor
(89-131)cursor_field
(184-185)airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py (1)
SinglePartitionRouter
(13-63)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4)
airbyte_cdk/sources/streams/concurrent/default_stream.py (3)
DefaultStream
(17-123)name
(44-45)cursor_field
(52-53)unit_tests/connector_builder/test_connector_builder_handler.py (4)
name
(835-836)cursor_field
(831-832)get_retriever
(447-452)primary_key
(827-828)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (5)
name
(106-114)name
(117-119)SimpleRetriever
(53-602)primary_key
(290-292)primary_key
(295-297)airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py (1)
_get_json_filepath
(69-70)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
504-504
: Import narrowing looks good.Switching to import only HttpMethod is correct; no usages of Requester remain in this file. LGTM.
2138-2151
: Helper predicates read cleanly.The intent behind stop-condition-on-cursor and client-side filtering reads well and centralizes the logic. LGTM.
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4)
1109-1112
: LGTM: Data feed pagination strategy check is accurateAsserting StopConditionPaginationStrategyDecorator via get_retriever(stream) matches the intended behavior for data-feed incremental streams. Looks good to me.
4125-4153
: LGTM: QueryProperties correctly surfaced via retriever and excluded from request options providerGood assertions around:
- retriever.additional_query_properties construction
- property_chunking and merge strategy types/values
- query_properties_key presence while excluding the “fields” param from the RequestOptionsProvider
This tracks the new responsibility split cleanly.
4226-4251
: LGTM: PropertiesFromEndpoint wiring through retriever is validated end-to-endVerifying the nested retriever/requester under PropertiesFromEndpoint and the absence of always_include_properties is on point. Nice coverage of both construction and values.
4315-4335
: LGTM: Requester-level PropertiesFromEndpoint path validated without chunkingThis covers the alternative placement (requester.query_properties) well and confirms no property_chunking is attached while still validating property list resolution.
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
Show resolved
Hide resolved
…_as_defaultstream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
613-615
: Two StreamSlicer classes in scope — consider clarifying the alias or downstream variable names.Having both declarative
StreamSlicer
and concurrentStreamSlicer
in scope can be confusing. Would you consider either:
- renaming the alias to
ConcurrentPartitionSlicer
, or- renaming the local variable used later (e.g.,
stream_slicer
at Lines 2070–2088) toeffective_stream_slicer
for readability, wdyt?
2054-2065
: DefaultStream gating omits the “both partition router and cursor” case.This block enables DefaultStream when either a partition router or a concurrent cursor exists, but the initialization below prefers one or the other due to an elif chain. For streams that have both, do you want to pass both the partition router (as slicer) and the concurrent cursor to DefaultStream now (instead of only the slicer), wdyt?
2070-2109
: Initialize DefaultStream with both slicer and cursor when available; prefer None for missing cursor_field.Right now the elif chain prioritizes the slicer over the cursor; this prevents passing the concurrent cursor when both exist. Also, the
cursor_field
fallback uses an empty string; DefaultStream accepts Optional[str], so None better communicates absence.Would you consider this refactor to always set both when available and improve readability (also renames the local variable to avoid confusion with the two StreamSlicer types), wdyt?
- stream_name = model.name or "" - stream_slicer: ConcurrentStreamSlicer = ( - concurrent_cursor if concurrent_cursor else SinglePartitionRouter(parameters={}) - ) - cursor: Cursor = FinalStateCursor(stream_name, None, self._message_repository) - if isinstance(retriever, AsyncRetriever): - # The AsyncRetriever only ever worked with a cursor from the concurrent package. Hence, the method - # `_build_incremental_cursor` which we would usually think would return only declarative stuff has a - # special clause and return a concurrent cursor. This stream slicer is passed to AsyncRetriever when - # built because the async retriever has a specific partition router which relies on this stream slicer. - # We can't re-use `concurrent_cursor` because it is a different instance than the one passed in - # AsyncJobPartitionRouter. - stream_slicer = retriever.stream_slicer - if isinstance(combined_slicers, Cursor): - cursor = combined_slicers - elif isinstance(combined_slicers, PartitionRouter): - stream_slicer = combined_slicers - elif concurrent_cursor: - cursor = concurrent_cursor + stream_name = model.name or "" + # Use a dedicated name to avoid confusion with the two StreamSlicer types in scope. + effective_stream_slicer: ConcurrentStreamSlicer = SinglePartitionRouter(parameters={}) + cursor: Cursor = FinalStateCursor(stream_name, None, self._message_repository) + + if isinstance(retriever, AsyncRetriever): + # AsyncRetriever has its own slicer wired via AsyncJobPartitionRouter + effective_stream_slicer = retriever.stream_slicer + # If combined_slicers produced a concurrent cursor (e.g., datetime-based), use it + if isinstance(combined_slicers, Cursor): + cursor = combined_slicers + elif isinstance(combined_slicers, PartitionRouter): + # When a partition router exists, prefer it for slicing. + effective_stream_slicer = combined_slicers + + # If a concurrent cursor exists, pass it irrespective of slicer selection. + if concurrent_cursor: + cursor = concurrent_cursor @@ - stream_slicer=stream_slicer, + stream_slicer=effective_stream_slicer, @@ - cursor_field=cursor.cursor_field.cursor_field_key - if hasattr(cursor, "cursor_field") - else "", # FIXME we should have the cursor field has part of the interface of cursor, + cursor_field=( + cursor.cursor_field.cursor_field_key if hasattr(cursor, "cursor_field") else None + ), # Prefer None over empty string
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(5 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
airbyte_cdk/sources/streams/concurrent/default_stream.py (4)
name
(44-45)cursor
(92-93)DefaultStream
(17-123)cursor_field
(52-53)airbyte_cdk/sources/streams/concurrent/cursor.py (4)
ConcurrentCursor
(134-502)Cursor
(50-86)FinalStateCursor
(89-131)cursor_field
(184-185)airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py (1)
SinglePartitionRouter
(13-63)airbyte_cdk/sources/streams/concurrent/helpers.py (1)
get_primary_key_from_stream
(8-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: preview_docs
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
2000-2009
: Nice: centralizing concurrent cursor and slicer assembly.The split into
combined_slicers
,partition_router
, andconcurrent_cursor
makes the wiring easier to reason about and extend. This aligns with the PR goal for incremental streams without partition routers. LGTM.
2016-2021
: Stop condition and client-side filtering correctly hook into the concurrent cursor; verify expected runtime behavior.Plumbing
stop_condition_cursor
andclient_side_incremental_sync={"cursor": concurrent_cursor}
looks right for data-feeds and client-side incremental filtering. Do we have test coverage that exercises both branches (is_data_feed and is_client_side_incremental) using DefaultStream paths, wdyt?If helpful, I can sketch tests asserting:
- paginator wrapping with
StopConditionPaginationStrategyDecorator
whenis_data_feed
is true,ClientSideIncrementalRecordFilterDecorator
is used whenis_client_side_incremental
is true.
2132-2145
: Good separation of concerns with helpers.The
*_is_*
helpers make the decision logic explicit and testable. Nice.
504-504
: Verified: no lingeringRequester
in model_to_component_factory.pyI searched the updated file and can confirm there are no remaining
Requester
references after swapping inHttpMethod
. Downstream modules still import and useRequester
where needed, but this change is isolated and safe. wdyt?
What
Following #691, this tackles streams that are incremental but do not have partition routing
Summary by CodeRabbit
New Features
Refactor
Tests