-
Notifications
You must be signed in to change notification settings - Fork 30
chore: non-incremental stream instantiated as defaultstream #691
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: non-incremental stream instantiated as defaultstream #691
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/bland_stream_instantiated_as_defaultstream#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/bland_stream_instantiated_as_defaultstream Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
Show resolved
Hide resolved
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
Show resolved
Hide resolved
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
Show resolved
Hide resolved
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
Show resolved
Hide resolved
/autofix |
PyTest Results (Fast)3 694 tests - 2 3 683 ✅ - 2 6m 37s ⏱️ +9s Results for commit dc61e45. ± Comparison against base commit 3244eec. This pull request removes 3 and adds 1 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
/autofix |
PyTest Results (Full)3 697 tests - 2 3 686 ✅ - 2 9m 53s ⏱️ - 1m 48s Results for commit dc61e45. ± Comparison against base commit 3244eec. This pull request removes 3 and adds 1 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
…ed' into maxi297/availability_strategy_to_support_abstract_stream
…m' into maxi297/bland_stream_instantiated_as_defaultstream
…m' into maxi297/bland_stream_instantiated_as_defaultstream
📝 WalkthroughWalkthroughWiden stream typing to accept AbstractStream/DefaultStream, introduce SchemaLoader-based on-demand schema with caching, route partitioned streams through DefaultStream (removing resumable full-refresh plumbing), update partition factory/partition signatures, and adapt multiple call sites and tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant ConcurrentSource as ConcurrentDeclarativeSource
participant Factory as ModelToComponentFactory
participant Stream as DefaultStream/DeclarativeStream
participant PartGen as PartitionGenerator
participant Partition as DeclarativePartition
participant SchemaLoader
participant Retriever
Caller->>ConcurrentSource: streams(config)
ConcurrentSource->>Factory: create_declarative_stream(model, config, is_parent)
alt partitioned → DefaultStream
Factory-->>ConcurrentSource: DefaultStream(partition_generator, ...)
else
Factory-->>ConcurrentSource: DeclarativeStream(...)
end
Caller->>Stream: read(...)
Stream->>PartGen: generate_partitions()
PartGen-->>Stream: partitions[]
loop for each partition
Stream->>Partition: read()
Partition->>SchemaLoader: get_json_schema()
SchemaLoader-->>Partition: schema
Partition->>Retriever: read_records(schema, slice)
Retriever-->>Partition: records
Partition-->>Stream: records
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Would you like to mark this as a breaking change in typing (streams return type and DeclarativePartitionFactory/DeclarativePartition signatures) to alert downstream implementers, wdyt? Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. 📜 Recent review detailsConfiguration used: CodeRabbit UI 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🔭 Outside diff range comments (2)
airbyte_cdk/sources/streams/concurrent/adapters.py (1)
84-104
: Defer schema evaluation across all DefaultStream instantiationsDefaultStream now supports lazy schema via a zero-arg callable. Rather than materializing the schema at construction, could we pass something like:
json_schema=lambda: stream.get_json_schema()to defer the work until
read
/as_airbyte_stream
? Wdyt?Instances to update:
airbyte_cdk/sources/streams/concurrent/adapters.py
: line 98airbyte_cdk/sources/declarative/concurrent_declarative_source.py
: lines 386, 426, 482airbyte_cdk/sources/file_based/stream/concurrent/adapters.py
: line 98For example, change:
- json_schema=stream.get_json_schema(), + json_schema=lambda: stream.get_json_schema(),airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
334-346
: Add a publicschema_loader
accessor onDeclarativeStream
.I didn’t find an existing public getter for the loader, so let’s add something like this in
airbyte_cdk/sources/declarative/declarative_stream.py
:@property def schema_loader(self) -> SchemaLoader: """Expose the underlying SchemaLoader without reaching into a private field.""" return self._schema_loaderThen update all of the following sites in
concurrent_declarative_source.py
to use.schema_loader
instead of._schema_loader
:
- Line 334
- Line 371
- Line 405
- Line 469
If you’d rather keep the loader fully encapsulated, another option is to pass a callable (e.g.
declarative_stream.get_json_schema
) intoDefaultStream
for itsjson_schema
parameter and let the partition factory be the only code that ever deals with the loader itself. What do you think? wdyt?
🧹 Nitpick comments (9)
unit_tests/sources/declarative/resolvers/test_config_components_resolver.py (1)
386-387
: Reduce test brittleness by avoiding a deep private-attribute chainThis assertion reaches through several private attrs. It works, but it’s fragile against future refactors. Would you consider introducing a small helper in this test that resolves the requester parameters across both legacy and new shapes, then using it here, wdyt?
Add this helper near the top of the file (e.g., after imports):
def get_requester_parameters(stream): # Try known shapes to locate the requester and return its parameters candidate_paths = [ ("_stream_partition_generator", "_partition_factory", "_retriever", "requester"), # new structure ("retriever", "requester"), # legacy structure ] for path in candidate_paths: obj = stream for attr in path: obj = getattr(obj, attr, None) if obj is None: break else: params = getattr(obj, "_parameters", None) if params is not None: return params raise AssertionError(f"Could not resolve requester parameters for stream {getattr(stream, 'name', '<unknown>')}")And update the assertion to use it:
- stream._stream_partition_generator._partition_factory._retriever.requester._parameters - == expected_conditional_params[stream.name] + get_requester_parameters(stream) == expected_conditional_params[stream.name]unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py (1)
96-101
: LGTM. Optional: derive partition count dynamically to make the test resilientThe partition-based iteration looks good. To avoid hard-coding 4, would you consider computing the number of partitions and asserting counter == lines_in_response * num_partitions, wdyt?
Example:
num_partitions = sum(1 for _ in stream.generate_partitions()) counter = 0 for partition in stream.generate_partitions(): for _ in partition.read(): counter += 1 assert counter == lines_in_response * num_partitionsairbyte_cdk/sources/streams/concurrent/default_stream.py (1)
24-25
: Question: should cursor_field support nested paths?cursor_field is Optional[str]. If we ever need multi-hop paths (e.g., ["data", "updated_at"]), should we expand this to Optional[Union[str, List[str]]] and handle default_cursor_field accordingly, or are we intentionally constraining DefaultStream given it targets non-incremental streams, wdyt?
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2)
23-33
: Tighten typing: avoid the type ignore in caching decoratorNice addition. We can drop the type: ignore with a minor tweak. Would you like to adjust as below, wdyt?
class SchemaLoaderCachingDecorator(SchemaLoader): def __init__(self, schema_loader: SchemaLoader): self._decorated = schema_loader self._loaded_schema: Optional[Mapping[str, Any]] = None - def get_json_schema(self) -> Mapping[str, Any]: - if self._loaded_schema is None: - self._loaded_schema = self._decorated.get_json_schema() - - return self._loaded_schema # type: ignore # at that point, we assume the schema will be populated + def get_json_schema(self) -> Mapping[str, Any]: + if self._loaded_schema is None: + self._loaded_schema = self._decorated.get_json_schema() + loaded = self._loaded_schema + assert loaded is not None + return loaded
28-33
: Consider returning a shallow copy to prevent accidental mutation of cached schemaIf any downstream step mutates the schema in-place, future reads will observe mutated state. Do we want to return a shallow copy (e.g., return dict(loaded)) to isolate callers, or is sharing-by-reference intentional for performance, wdyt?
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
23-23
: Test updates to use InlineSchemaLoader look goodThe InlineSchemaLoader import and the _EMPTY_SCHEMA_LOADER constant align the tests with the new API. If isolation ever matters, we could construct a fresh InlineSchemaLoader per test, but the shared empty one is fine here.
Also applies to: 35-36
unit_tests/connector_builder/test_connector_builder_handler.py (1)
788-793
: Enable builder messages in test_config_updatePassing emit_connector_builder_messages=True here makes sense to exercise the config update path. Would you consider asserting that at least one connector-builder message is emitted (e.g., by inspecting the message repository or the response payload) to guard against regressions, wdyt?
unit_tests/sources/streams/concurrent/test_default_stream.py (1)
48-68
: Support callable json_schema in DefaultStream testsNice addition validating that DefaultStream.get_json_schema handles a callable. Would you also add a quick test that the callable is invoked on each call (no caching) and that as_airbyte_stream() reflects the callable-returned schema (e.g., using a Mock to count invocations), wdyt?
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
386-386
: Defer schema evaluation by passing a callable to DefaultStreamDefaultStream now supports json_schema as a Callable, which lets us defer schema loading to read-time and aligns with the PR objective. Would you pass the bound method instead of eagerly calling it, wdyt?
Apply this change in each DefaultStream construction:
- json_schema=declarative_stream.get_json_schema(), + json_schema=declarative_stream.get_json_schema,Also applies to: 426-426, 482-482
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(7 hunks)airbyte_cdk/sources/declarative/manifest_declarative_source.py
(3 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(14 hunks)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
(5 hunks)airbyte_cdk/sources/streams/concurrent/adapters.py
(2 hunks)airbyte_cdk/sources/streams/concurrent/default_stream.py
(3 hunks)unit_tests/connector_builder/test_connector_builder_handler.py
(1 hunks)unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py
(1 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(16 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(14 hunks)unit_tests/sources/declarative/resolvers/test_config_components_resolver.py
(1 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py
(0 hunks)unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
(4 hunks)unit_tests/sources/declarative/test_manifest_declarative_source.py
(8 hunks)unit_tests/sources/streams/concurrent/test_default_stream.py
(1 hunks)
💤 Files with no reviewable changes (1)
- unit_tests/sources/declarative/retrievers/test_simple_retriever.py
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
unit_tests/sources/declarative/test_manifest_declarative_source.py
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
unit_tests/sources/declarative/test_manifest_declarative_source.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
🧬 Code Graph Analysis (10)
unit_tests/sources/declarative/resolvers/test_config_components_resolver.py (3)
unit_tests/connector_builder/test_connector_builder_handler.py (1)
name
(827-828)airbyte_cdk/sources/streams/concurrent/adapters.py (1)
name
(180-181)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
name
(44-45)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (7)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
streams
(231-240)unit_tests/connector_builder/test_connector_builder_handler.py (1)
streams
(831-832)airbyte_cdk/sources/abstract_source.py (1)
streams
(74-79)unit_tests/sources/test_source.py (2)
streams
(58-61)streams
(148-150)unit_tests/sources/test_abstract_source.py (1)
streams
(84-87)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
AbstractStream
(21-96)airbyte_cdk/sources/streams/core.py (1)
Stream
(118-703)
unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py (4)
airbyte_cdk/sources/types.py (1)
partition
(99-104)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
generate_partitions
(40-41)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
read
(84-111)airbyte_cdk/sources/streams/concurrent/adapters.py (2)
read
(138-147)read
(259-303)
unit_tests/sources/streams/concurrent/test_default_stream.py (1)
airbyte_cdk/sources/streams/concurrent/default_stream.py (2)
DefaultStream
(17-123)get_json_schema
(55-56)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (5)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
streams
(301-338)airbyte_cdk/sources/abstract_source.py (1)
streams
(74-79)airbyte_cdk/sources/streams/core.py (1)
Stream
(118-703)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
AbstractStream
(21-96)unit_tests/sources/declarative/test_declarative_stream.py (1)
_schema_loader
(243-246)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (1)
airbyte_cdk/sources/types.py (2)
Record
(21-72)StreamSlice
(75-169)
airbyte_cdk/sources/streams/concurrent/default_stream.py (8)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (3)
as_airbyte_stream
(74-77)name
(54-57)get_json_schema
(68-71)airbyte_cdk/sources/streams/concurrent/adapters.py (3)
as_airbyte_stream
(208-209)name
(180-181)get_json_schema
(201-202)airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py (2)
name
(186-187)get_json_schema
(146-150)airbyte_cdk/sources/file_based/stream/concurrent/adapters.py (2)
name
(139-140)get_json_schema
(152-153)unit_tests/sources/declarative/test_concurrent_declarative_source.py (2)
name
(625-626)get_json_schema
(658-659)unit_tests/sources/test_source_read.py (2)
name
(45-46)get_json_schema
(75-76)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
get_json_schema
(28-32)unit_tests/sources/streams/test_stream_read.py (1)
get_json_schema
(80-81)
unit_tests/sources/declarative/test_manifest_declarative_source.py (5)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
ConcurrentDeclarativeSource
(88-574)streams
(231-240)read
(183-221)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
DeclarativeStream
(2426-2495)SimpleRetriever
(2808-2862)airbyte_cdk/sources/declarative/manifest_declarative_source.py (3)
ManifestDeclarativeSource
(98-616)streams
(301-338)read
(416-424)airbyte_cdk/sources/streams/concurrent/default_stream.py (2)
DefaultStream
(17-123)name
(44-45)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
get_retriever
(4686-4691)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
SchemaLoaderCachingDecorator
(23-32)airbyte_cdk/sources/streams/concurrent/default_stream.py (2)
DefaultStream
(17-123)name
(44-45)airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py (1)
_get_json_filepath
(69-70)airbyte_cdk/sources/declarative/schema/composite_schema_loader.py (1)
CompositeSchemaLoader
(12-31)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (7)
airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
SchemaLoader
(11-17)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
get_json_schema
(2685-2687)unit_tests/sources/declarative/test_concurrent_declarative_source.py (2)
get_json_schema
(658-659)read_records
(632-656)unit_tests/sources/declarative/test_declarative_stream.py (1)
_schema_loader
(243-246)airbyte_cdk/sources/types.py (1)
StreamSlice
(75-169)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
read_records
(492-534)airbyte_cdk/sources/declarative/retrievers/async_retriever.py (1)
read_records
(107-124)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-intercom
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (28)
unit_tests/sources/declarative/resolvers/test_config_components_resolver.py (1)
386-387
: LGTM: assertion updated to match new stream structureThe new path correctly reflects the retriever being under the partition generator’s factory. No semantic change, and the check still validates the expected parameters per stream.
airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
22-22
: Dynamic schema support via callable looks goodAllowing a callable for json_schema and resolving it lazily is aligned with the migration. The as_airbyte_stream() copy via dict(get_json_schema()) also avoids unintended mutation. Looks good to me.
Also applies to: 55-57, 61-61
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
49-51
: Migration to schema_loader across factory/partition/read looks consistentThe wiring through DeclarativePartitionFactory → DeclarativePartition → retriever.read_records(records_schema, slice) is coherent and leverages caching at the loader level. LGTM.
Also applies to: 58-63, 69-83, 89-91
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
3623-3623
: Callsite migrations to schema_loader=_EMPTY_SCHEMA_LOADER are consistentAll DeclarativePartition(...) callsites now pass schema_loader instead of json_schema, matching the production signature. LGTM.
Also applies to: 3707-3707, 3803-3803, 3893-3893, 3967-3967, 4172-4172, 4184-4184, 4196-4196, 4340-4340, 4351-4351
airbyte_cdk/sources/declarative/manifest_declarative_source.py (2)
11-11
: Import updates look goodAdding Union and importing AbstractStream aligns with the mixed return type during the migration. LGTM.
Also applies to: 69-69
301-306
: streams() now returns List[Union[Stream, AbstractStream]>The migration note and widened return type are clear and pragmatic for the transition. LGTM.
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
8-19
: Typing/import updatesImporting Union (and related typing cleanups) is consistent with returning mixed stream types. LGTM.
231-231
: streams() widened to List[Union[Stream, AbstractStream]]This aligns with the migration plan and the upstream changes. Given the AbstractSource base still advertises List[Stream], you’ve included type: ignore elsewhere to smooth this—looks fine for the transition. Please keep an eye on call sites and remove the ignore once the migration is complete, wdyt?
261-264
: Early path for AbstractStreamGood call to short-circuit AbstractStream instances into the concurrent set. This avoids accidental downconversion or double-wrapping. LGTM.
unit_tests/sources/declarative/test_manifest_declarative_source.py (5)
31-33
: Import structure looks good!The new imports for
ConcurrentDeclarativeSource
,DefaultStream
, and theget_retriever
helper are well-organized and align with the migration strategy.Also applies to: 40-41
288-289
: Stream type assertions correctly updatedThe assertions have been properly updated to expect
DefaultStream
instances, consistent with the migration fromDeclarativeStream
.Also applies to: 297-297, 301-301, 303-303
2187-2203
: Retriever access properly abstractedGood use of the
get_retriever
helper to handle the structural differences between stream types. The assertions correctly validate caching behavior for parent streams, wdyt?
1826-1827
: Pagination call signatures consistently updatedThe changes from
{"states": []}
to{}
in the call parameters appear to reflect the new state handling approach in the concurrent streaming implementation. All test cases have been updated consistently with this pattern.Also applies to: 1915-1917, 2023-2024
2218-2221
: Test helper correctly updated for concurrent sourceThe
_run_read
function properly instantiatesConcurrentDeclarativeSource
with the new constructor signature (catalog, config, state, manifest).unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (7)
160-162
: New imports properly addedThe imports for
SchemaLoaderCachingDecorator
andDefaultStream
are correctly placed and necessary for the updated test implementations.Also applies to: 174-174
1769-1796
: Test properly adapted to DefaultStream structureThe test correctly navigates the new DefaultStream internal structure and uses the
get_schema_loader
helper for better abstraction. The assertions maintain the same test coverage, wdyt?
2191-2193
: Transformation tests correctly migratedTests maintain their validation logic while properly using the
get_retriever
helper to access transformations through the new DefaultStream structure.Also applies to: 2219-2224, 2382-2383
2430-2431
: Parametrized test elegantly handles both stream typesThe test is well-structured to validate both the router type and stream type, with proper conditional logic to handle the structural differences. This approach ensures good test coverage during the migration period.
Also applies to: 2444-2445, 2455-2456, 2474-2475, 2500-2501, 2507-2508, 2545-2553
2731-2734
: Custom retriever test properly adaptedThe test correctly verifies that custom retrievers are properly instantiated within the DefaultStream structure through the partition factory chain.
4678-4691
: Well-designed helper functions for test abstractionThe
get_schema_loader
andget_retriever
helpers provide clean abstractions over the structural differences between stream types. The unwrapping of theSchemaLoaderCachingDecorator
inget_schema_loader
is particularly elegant. These will make future test maintenance easier, wdyt?
4678-4683
: Schema caching properly validatedThe assertion that schema loaders are wrapped in
SchemaLoaderCachingDecorator
ensures the caching behavior is consistently applied to all DefaultStream instances, which aligns with the PR's caching strategy for dynamic schema loaders.airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (7)
2127-2157
: Good refactor to always return a PartitionRouterThe change to always return a
PartitionRouter
(defaulting toSinglePartitionRouter
when no partition router is defined) simplifies the interface and eliminates potential null checks downstream. This makes the code more predictable and maintainable.
2174-2176
: Improved cursor creation logicGood improvement to the cursor creation condition. By checking for
not isinstance(stream_slicer, SinglePartitionRouter)
, you're correctly limiting per-partition cursors to scenarios with actual partitioning, avoiding unnecessary overhead for single-slice cases.
2328-2328
: Consistent slicer return valueGood change to return
stream_slicer
instead ofNone
. This maintains consistency with the new pattern where we always have a slicer available.
2622-2622
: Consistent use of partition router builderThe update to use
_build_stream_slicer_from_partition_router
for the stream_slicer parameter maintains consistency with the new partition router pattern.
2974-2979
: Correct parent stream flag propagationGood addition of the
is_parent=True
parameter when creating parent streams. This ensures parent streams are excluded from the DefaultStream migration path, which is the intended behavior according to the PR description.
3893-3893
: Consistent partition router usage in HTTP components resolverThe update maintains consistency with the new partition router pattern across the codebase.
2075-2075
: Clarify the WIP comment statusThe comment indicates this implementation might change once the Connector Builder PR is ready. Could you clarify if this creates any coordination concerns with the Connector Builder team? Should we add any feature flags or version checks to ensure smooth rollout? wdyt?
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/connector_builder/test_reader/reader.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(14 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/connector_builder/test_reader/reader.py
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (7)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (4)
DeclarativePartitionFactory
(35-63)StreamSlicerPartitionGenerator
(123-146)stream_name
(116-117)get_json_schema
(28-32)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (4)
AbstractStream
(21-96)cursor
(87-90)get_json_schema
(68-71)cursor_field
(61-65)airbyte_cdk/sources/streams/concurrent/default_stream.py (5)
cursor
(92-93)DefaultStream
(17-123)name
(44-45)get_json_schema
(55-56)cursor_field
(52-53)airbyte_cdk/sources/streams/concurrent/cursor.py (3)
Cursor
(50-86)FinalStateCursor
(89-131)cursor_field
(184-185)airbyte_cdk/sources/declarative/partition_routers/partition_router.py (1)
PartitionRouter
(14-62)airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (1)
StreamSlicerTestReadDecorator
(14-28)airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py (1)
SinglePartitionRouter
(13-63)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (9)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9)
10-10
: LGTM!The new imports are properly organized and necessary for the DefaultStream instantiation path and concurrent streaming support.
Also applies to: 547-550, 602-619
1937-1939
: Signature change looks good!The
is_parent
parameter with a default value maintains backward compatibility while enabling the distinction needed for the migration strategy. The expanded return type properly reflects that this method can now return either stream type.
2110-2117
: Good refactoring of cursor field extraction!Moving this logic outside the DefaultStream branch ensures consistent handling for DeclarativeStream creation.
2138-2159
: Excellent use of the null object pattern!Returning
SinglePartitionRouter
instead ofNone
eliminates nullable checks downstream and provides a cleaner, more consistent API.
2176-2178
: Smart optimization for cursor creation!The SinglePartitionRouter check prevents unnecessary per-partition cursor overhead for simple streams. This should improve performance for non-partitioned streams.
Also applies to: 2251-2255
2330-2330
: Consistent non-nullable return!Aligns with the pattern established in
_build_stream_slicer_from_partition_router
.
2624-2624
: Consistent partition router usage across components!Good to see the same pattern applied to dynamic schema loaders and HTTP component resolvers.
Also applies to: 3895-3895
2977-2980
: Proper parent stream identification!Correctly passes
is_parent=True
to ensure parent streams aren't migrated to DefaultStream prematurely.
2072-2108
: Add tests to cover the new DefaultStream migration pathWe didn’t find any existing tests exercising
DefaultStream(..., partition_generator=…)
, so it would be great to:
- Add a unit test that calls
create_declarative_stream
for a non-incremental stream with only aPartitionRouter
and asserts you get aDefaultStream
with the rightpartition_generator
,cursor=None
, andFinalStateCursor
.- Verify that existing connectors (those using only
DeclarativeStream
) still instantiate and behave as before undercreate_declarative_stream
.- Cover edge cases like streams with both a partition router and a cursor once the FSM for cursors is implemented.
Would you like help outlining or bootstrapping those tests? wdyt?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/connector_builder/test_reader/reader.py (1)
230-230
: Nit: close the backtick in the error messageTiny formatting fix that improves error readability. Shall we adjust, wdyt?
- raise ValueError(f"Unknown type for cursor field `{field}") + raise ValueError(f"Unknown type for cursor field `{field}`")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/connector_builder/test_reader/reader.py
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/connector_builder/test_reader/reader.py (1)
unit_tests/connector_builder/test_connector_builder_handler.py (1)
primary_key
(827-828)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (2)
airbyte_cdk/connector_builder/test_reader/reader.py (2)
128-131
: We need to inspect how_cursor_field_to_nested_and_composite_field
handles its input to confirm whether guarding against bound methods is required. Could you run a quick search for its definition? For example:rg -n "def _cursor_field_to_nested_and_composite_field" -A5 -B5 airbyte_cdk/connector_builder/test_reader/reader.py
This will show its signature and docstring so we can verify if passing a callable instead of its result could lead to issues. wdyt?
123-127
: Unnecessary primary_key guard
AllStream
implementations in the repo declareprimary_key
with@property
, sostream.primary_key
always returns the intended value (never a bound method) and theelse stream._primary_key
path is never taken. We can simplify this to:schema_inferrer = SchemaInferrer( self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None, … )wdyt?
Likely an incorrect or invalid review comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
27-44
: Docstring vs Implementation Mismatch: Switch to aretriever_factory
for Thread SafetyIt looks like the constructor’s signature currently takes a single
Retriever
instance, but the docstring clearly describes a factory to produce one retriever per thread/partition. Sharing a single retriever risks race conditions in components likeDefaultPaginator
.Could we refactor this class to accept a
retriever_factory: Callable[[], Retriever]
instead of aRetriever
? For example:• In
DeclarativePartitionFactory.__init__
(lines 27–44):
- Change parameter
retriever: Retriever
→retriever_factory: Callable[[], Retriever]
- Assign to
self._retriever_factory
instead ofself._retriever
• Increate()
:- Pass
retriever_factory=self._retriever_factory
toDeclarativePartition
• InDeclarativePartition.__init__
:- Update signature to accept
retriever_factory
and store it
• InDeclarativePartition.read()
:- Instantiate a fresh retriever via
retriever = self._retriever_factory()
before callingread_records()
Minimal diff sketch:
-from typing import Any, Iterable, Mapping, Optional +from typing import Any, Callable, Iterable, Mapping, Optional class DeclarativePartitionFactory: - def __init__(..., retriever: Retriever, ...): + def __init__(..., retriever_factory: Callable[[], Retriever], ...): """ The DeclarativePartitionFactory takes a retriever_factory (not a retriever directly). ... """ self._schema_loader = SchemaLoaderCachingDecorator(schema_loader) - self._retriever = retriever + self._retriever_factory = retriever_factory ... def create(self, stream_slice: StreamSlice) -> Partition: - return DeclarativePartition(..., retriever=self._retriever, ...) + return DeclarativePartition(..., retriever_factory=self._retriever_factory, ...)And similarly update
DeclarativePartition
to callself._retriever_factory()
insideread()
.Does this sound like a reasonable refactor to align implementation with the docstring and ensure thread-safe retriever usage? wdyt?
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
280-284
: Avoid accessing private _schema_loader; prefer a public accessor or a safe fallbackYou’re reaching into declarative_stream._schema_loader (private) to pass to DeclarativePartitionFactory. Could we either:
- add/use a public accessor on DeclarativeStream (e.g., declarative_stream.schema_loader), or
- defensively fall back to a callable based on get_json_schema if the loader isn’t exposed (still defers load)?
This would remove the type: ignore and reduce coupling, wdyt?
For example (if a public accessor exists):
- schema_loader=declarative_stream._schema_loader, # type: ignore + schema_loader=declarative_stream.schema_loader,If an accessor doesn’t exist yet, would you like me to add one to DeclarativeStream to expose the loader cleanly?
Also applies to: 311-315, 341-345, 401-405
321-334
: Pass a lazy schema provider to DefaultStream to enable on-read loadingDefaultStream now accepts a callable for schema (per PR objective). To keep schema loading consistent with the loader-based approach (and avoid eager loads during grouping), shall we pass a callable instead of a realized schema, wdyt?
Proposed change:
- json_schema=declarative_stream.get_json_schema(), + json_schema=declarative_stream._schema_loader.get_json_schema, # type: ignore # temporary until public accessor is availableIf you’d prefer not to access the private attribute, we can wire this through a public accessor once added.
356-365
: Apply the same lazy schema pattern hereMirror the change for the non-incremental concurrent stream to keep behavior consistent and defer schema loading until read, wdyt?
- json_schema=declarative_stream.get_json_schema(), + json_schema=declarative_stream._schema_loader.get_json_schema, # type: ignore
410-419
: And here as well: pass a callable for json_schemaFor the per-partition cursor case, prefer a callable to preserve laziness and avoid unnecessary schema loads up front. OK to apply the same change, wdyt?
- json_schema=declarative_stream.get_json_schema(), + json_schema=declarative_stream._schema_loader.get_json_schema, # type: ignoreairbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
15-25
: SchemaLoaderCachingDecorator: consider thread-safety and typing cleanup
- Minor typing nit: you can avoid type: ignore by returning a cast or by narrowing with an assert, wdyt?
- Optional: since this runs in concurrent contexts, a tiny race could call get_json_schema twice on first access. It’s benign, but if you want to be strict, we could guard with a lock. Not a blocker.
Typing-only tweak:
-from typing import Any, Iterable, Mapping, Optional +from typing import Any, Iterable, Mapping, Optional, cast ... - return self._loaded_schema # type: ignore # at that point, we assume the schema will be populated + return cast(Mapping[str, Any], self._loaded_schema)Thread-safety (if desired):
+import threading ... class SchemaLoaderCachingDecorator(SchemaLoader): def __init__(self, schema_loader: SchemaLoader): self._decorated = schema_loader self._loaded_schema: Optional[Mapping[str, Any]] = None + self._lock = threading.Lock() def get_json_schema(self) -> Mapping[str, Any]: - if self._loaded_schema is None: - self._loaded_schema = self._decorated.get_json_schema() - return self._loaded_schema + if self._loaded_schema is None: + with self._lock: + if self._loaded_schema is None: + self._loaded_schema = self._decorated.get_json_schema() + return cast(Mapping[str, Any], self._loaded_schema)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (7)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(7 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(14 hunks)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
(3 hunks)unit_tests/connector_builder/test_connector_builder_handler.py
(6 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(11 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py
(0 hunks)unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
(5 hunks)
💤 Files with no reviewable changes (1)
- unit_tests/sources/declarative/retrievers/test_simple_retriever.py
🚧 Files skipped from review as they are similar to previous changes (3)
- unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
- unit_tests/connector_builder/test_connector_builder_handler.py
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧬 Code Graph Analysis (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
streams
(301-338)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (2)
AbstractStream
(21-96)name
(54-57)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
stream_name
(92-93)airbyte_cdk/sources/declarative/declarative_stream.py (2)
name
(93-97)name
(100-102)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (3)
airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
SchemaLoader
(11-17)airbyte_cdk/sources/types.py (1)
StreamSlice
(75-169)airbyte_cdk/sources/declarative/retrievers/retriever.py (1)
Retriever
(13-54)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
[error] 5-5: poetry run ruff check . failed: Ruff I001 - Import block is un-sorted or un-formatted. Organize imports.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (7)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (3)
9-9
: Importing InlineSchemaLoader is the right moveSwitching to InlineSchemaLoader here aligns the test with the new loader-based API. LGTM, wdyt?
18-18
: Good fix: single, correctly constructed _SCHEMA_LOADERDefining _SCHEMA_LOADER = InlineSchemaLoader({"type": "object", "properties": {}}, {}) resolves the earlier nested-loader concern and keeps types consistent across tests. Nice catch, wdyt?
37-41
: Consistently passing schema_loader to DeclarativePartitionFactory is correctReplacing the old json_schema usage with _SCHEMA_LOADER across all factory instantiations matches the production change to loader-based schema resolution. This also future-proofs tests if schema loading is deferred. Looks good to me, wdyt?
Also applies to: 51-56, 69-74
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
7-7
: Fix Ruff I001: organize importsCI flagged “Import block is un-sorted or un-formatted.” Would you run ruff’s import auto-formatter (or apply isort) to sort/organize this file’s imports, wdyt?
You can auto-fix locally:
- poetry run ruff check --select I --fix airbyte_cdk/sources/declarative/concurrent_declarative_source.py
178-188
: Type widening for streams() looks goodReturning List[Union[Stream, AbstractStream]] matches the migration path and keeps AbstractSource call sites unblocked. This is a reasonable interim step. LGTM, wdyt?
208-211
: Early exit for already-concurrent streams is cleanDetecting AbstractStream and appending directly avoids double-wrapping and unnecessary inspection. Looks correct and keeps grouping logic simple, wdyt?
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
71-75
: Loader-based schema at read time: aligned with PR goalsReading with self._schema_loader.get_json_schema() at iteration time matches the deferred schema loading plan and plays well with dynamic schema loaders and caching. LGTM, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
responded to your one open question, but lgtm!
What
We want to move away from DeclarativeStream to DefaultStream. To kick-off the migration, we went with what we thought was the easiest to see if we would break a lot of things. This PR migrates all non-incremental streams (excluding streams used within SubstreamPartitionRouter).
Schema stuff
I'm a bit confused about all the schema stuff. There are three problems which I'll share my understanding below
Why is the schema loaded
Cases:
During read: When is the schema loaded
The schema is only loaded when the read method is called. This was not possible with the current DefaultStream implementation so I changed the init to take either a Callable or the actual schema.
Caching of the schema
This one is tricky to me. I see a couple of places where the schema is cached. Excluding file based from the discussion for now, there is:
DefaultStream.get_json_schema
which also has a cache (seeDefaultStream
above)So, do I think there is value in having caching? Yes, specifically for dynamic schema loaders. I have tested that in the low-code framework (using source-airtable, making SinglePartitionRouter yield two empty StreamSlice and checkign if
DynamicSchemaLoader.get_json_schema
is called multiple times), this was done once per slice which is not good. That being said, we have a problem with our current implementation that we get the schema even though we don't read the stream which might slow down the source instantiation.All that being said, I don't think we need to cache everything. InlineSchemaLoader for example is already in memory. So, I'm a bit afraid in terms of memory that we cache all the streams. Given a read with all the streams selected in a source like stripe which has > 40 streams, it might be non-negligible. However, we already have that and it does not seem to have cause big issues so I've added the caching in the new solution but removed it from DefaultStream and StreamFacade.
Summary by CodeRabbit
New Features
Performance
Refactor
Tests