-
Notifications
You must be signed in to change notification settings - Fork 30
feat(connector builder): allow connector builder to process requests through the concurrent CDK by reworking message routing and order through the queue and message repositories #688
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…K by reworking message routing and order through the queue and message repositories
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@brian/connector_builder_handler_use_concurrent#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch brian/connector_builder_handler_use_concurrent Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
PyTest Results (Fast)3 695 tests ±0 3 684 ✅ ±0 6m 30s ⏱️ +2s Results for commit 1b3a595. ± Comparison against base commit d2262a5. This pull request removes 2 and adds 2 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
PyTest Results (Full)3 698 tests ±0 3 687 ✅ ±0 11m 58s ⏱️ +16s Results for commit 1b3a595. ± Comparison against base commit d2262a5. This pull request removes 2 and adds 2 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
📝 WalkthroughWalkthroughThis change overhauls the concurrency and state management mechanisms for declarative sources in the connector builder. It introduces explicit concurrency limits, replaces Changes
Sequence Diagram(s)sequenceDiagram
participant Handler as connector_builder_handler
participant Source as ConcurrentDeclarativeSource
participant Factory as ModelToComponentFactory
participant Repo as ConcurrentMessageRepository
participant Queue as Queue
participant Processor as ConcurrentReadProcessor
participant PartitionReader as PartitionReader
participant Cursor as Cursor
Handler->>Source: create_source(config, limits, catalog, state)
Source->>Queue: Instantiate shared queue
Source->>Repo: Instantiate ConcurrentMessageRepository(queue)
Source->>Factory: Instantiate ModelToComponentFactory(..., message_repository=Repo, ...)
Source->>Processor: Create ConcurrentSource(queue=Queue, ...)
Handler->>Source: Start reading
Source->>Processor: read()
Processor->>PartitionReader: process_partition(partition, cursor)
PartitionReader->>Cursor: observe(record)
PartitionReader->>Queue: Put record
PartitionReader->>Cursor: close_partition(partition)
PartitionReader->>Queue: Put PartitionCompleteSentinel
Processor->>Handler: Yield AirbyteMessages from Queue
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Would you like me to break down the test changes more granularly for easier review, or is this summary sufficient for your needs, wdyt? Note ⚡️ Unit Test Generation is now available in beta!Learn more here, or try it out under "Finishing Touches" below. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (8)
unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py (1)
75-88
: Preferpytest.skip
over commenting-out entire testsFully commenting out the memory-usage test hides useful coverage and makes future re-enabling harder.
Would adding@pytest.mark.skip(reason="memory-intensive – disabled for now")
(or gating behind an env flag) keep the test visible while excluding it from CI, wdyt?airbyte_cdk/connector_builder/test_reader/helpers.py (1)
725-743
: Helper mutates the input object in-place – intentional?
convert_state_blob_to_mapping
rewritesstate_message.stream.stream_state
directly.
If callers reuse the sameAirbyteStateMessage
elsewhere, they now receive the mutated variant.
Should we instead deep-copy the message (e.g. viacopy.deepcopy
) before mutation to avoid accidental side-effects, or is the in-place change guaranteed safe, wdyt?airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (1)
23-25
: Minor: guard against wrapped slicer exhaustion
islice(self.wrapped_slicer.stream_slices(), …)
silently stops when the underlying iterator is shorter.
If you need an explicit “truncated” flag for debugging, perhaps wrap withitertools.islice
+ length check or log a warning.
Probably fine as-is, just raising the idea, wdyt?airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
38-44
: Docstring driftThe triple-quoted comment above still mentions “ThreadBasedConcurrentStream” but the type is now shared more widely.
Update the wording to avoid confusion, wdyt?unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py (1)
34-101
: Keep dormant tests discoverableSimilar to the extractor test, consider
@pytest.mark.skip
orxfail
instead of commenting out to retain history, IDE discoverability and easy re-activation.
Happy to craft the decorator if useful, wdyt?unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)
363-364
: Consider cleaning up commented code.The commented-out queue and message repository initialization suggests exploratory work. Should we remove these lines or are they planned for future use, wdyt?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1456-1483
: Minor inconsistency in default slice limit.I noticed that line 1478 uses a hardcoded
5
instead of theMAX_SLICES
constant defined above. For consistency, would it be better to useMAX_SLICES
here as well, similar to how it's used increate_concurrent_cursor_from_incrementing_count_cursor
? wdyt?- maximum_number_of_slices=self._limit_slices_fetched or 5, + maximum_number_of_slices=self._limit_slices_fetched or MAX_SLICES,airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
106-114
: Queue initialization with bounded size is a good safety measure.The maxsize of 10,000 prevents unbounded memory growth. The detailed comment explains the reasoning well. However, since the comment mentions this might need to be configurable, should we add a TODO or make it configurable through TestLimits now to avoid future breaking changes? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (18)
airbyte_cdk/connector_builder/connector_builder_handler.py
(3 hunks)airbyte_cdk/connector_builder/main.py
(1 hunks)airbyte_cdk/connector_builder/test_reader/helpers.py
(4 hunks)airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
(2 hunks)airbyte_cdk/sources/concurrent_source/concurrent_source.py
(6 hunks)airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(5 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(5 hunks)airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
(1 hunks)airbyte_cdk/sources/message/concurrent_repository.py
(1 hunks)airbyte_cdk/sources/streams/concurrent/partition_reader.py
(3 hunks)airbyte_cdk/sources/streams/concurrent/partitions/types.py
(2 hunks)unit_tests/connector_builder/test_connector_builder_handler.py
(16 hunks)unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py
(1 hunks)unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py
(1 hunks)unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
(2 hunks)unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py
(1 hunks)unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
(2 hunks)unit_tests/sources/streams/concurrent/test_partition_reader.py
(3 hunks)
🧰 Additional context used
🧠 Learnings (9)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
📚 Learning: the files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from ...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
unit_tests/sources/streams/concurrent/test_partition_reader.py
unit_tests/connector_builder/test_connector_builder_handler.py
airbyte_cdk/connector_builder/main.py
airbyte_cdk/sources/streams/concurrent/partitions/types.py
airbyte_cdk/sources/concurrent_source/concurrent_source.py
airbyte_cdk/connector_builder/connector_builder_handler.py
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
airbyte_cdk/connector_builder/test_reader/helpers.py
📚 Learning: when code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repositor...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.
Applied to files:
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
unit_tests/connector_builder/test_connector_builder_handler.py
airbyte_cdk/sources/concurrent_source/concurrent_source.py
airbyte_cdk/connector_builder/connector_builder_handler.py
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
📚 Learning: when modifying the `yamldeclarativesource` class in `airbyte_cdk/sources/declarative/yaml_declarativ...
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py
unit_tests/connector_builder/test_connector_builder_handler.py
airbyte_cdk/connector_builder/main.py
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
airbyte_cdk/sources/concurrent_source/concurrent_source.py
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
airbyte_cdk/connector_builder/connector_builder_handler.py
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: in the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-g...
Learnt from: pnilan
PR: airbytehq/airbyte-python-cdk#0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
Applied to files:
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
📚 Learning: the custompageincrement class in unit_tests/source_declarative_manifest/resources/source_the_guardia...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
Applied to files:
unit_tests/connector_builder/test_connector_builder_handler.py
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: in the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict modu...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
airbyte_cdk/connector_builder/connector_builder_handler.py
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
📚 Learning: copying files from `site-packages` in the dockerfile maintains compatibility with both the old file ...
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#90
File: Dockerfile:16-21
Timestamp: 2024-12-02T18:36:04.346Z
Learning: Copying files from `site-packages` in the Dockerfile maintains compatibility with both the old file structure that manifest-only connectors expect and the new package-based structure where SDM is part of the CDK.
Applied to files:
airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: in the typetransformer class, the data being transformed comes from api responses or source systems,...
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#221
File: airbyte_cdk/sources/utils/transform.py:0-0
Timestamp: 2025-01-16T00:50:39.069Z
Learning: In the TypeTransformer class, the data being transformed comes from API responses or source systems, so only standard JSON-serializable types are expected. The python_to_json mapping covers all expected types, and it's designed to fail fast (KeyError) on unexpected custom types rather than providing fallbacks.
Applied to files:
airbyte_cdk/connector_builder/test_reader/helpers.py
🧬 Code Graph Analysis (5)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (5)
airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
cursor
(101-102)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
cursor
(93-96)airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
on_partition
(89-105)airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (1)
submit
(45-46)airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
process_partition
(56-79)
airbyte_cdk/sources/message/concurrent_repository.py (3)
airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage
(79-88)airbyte_cdk/connector_builder/models.py (1)
LogMessage
(25-29)airbyte_cdk/sources/message/repository.py (1)
MessageRepository
(45-60)
airbyte_cdk/connector_builder/connector_builder_handler.py (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
ConcurrentDeclarativeSource
(85-549)TestLimits
(71-82)unit_tests/connector_builder/test_connector_builder_handler.py (1)
manifest_declarative_source
(956-957)airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteStateMessage
(67-75)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
ConcurrentCursor
(134-502)airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (1)
StreamSlicerTestReadDecorator
(14-28)
airbyte_cdk/connector_builder/test_reader/helpers.py (2)
airbyte_cdk/models/airbyte_protocol.py (2)
AirbyteStateBlob
(15-50)AirbyteStateMessage
(67-75)unit_tests/connector_builder/test_message_grouper.py (1)
state_message
(966-974)
🪛 GitHub Actions: Pytest (Fast)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
[error] 167-167: Exception while syncing stream stream_with_custom_requester: Bad request. Please check your request parameters.
[error] 167-167: Exception while syncing stream stream_with_custom_requester: Too many requests.
[error] 219-219: During the sync, the following streams did not sync successfully: stream_with_custom_requester: UserDefinedBackoffException('Too many requests.')
[error] 167-167: Exception while syncing stream stream_with_custom_requester: Invalid URL endpoint: '10.0.27.27'
belongs to a private network
[error] 219-219: During the sync, the following streams did not sync successfully: stream_with_custom_requester: AirbyteTracedException("Invalid URL endpoint: '10.0.27.27'
belongs to a private network")
[error] 167-167: Exception while syncing stream stream_with_custom_requester: Invalid Protocol Schema: The endpoint that data is being requested from is using an invalid or insecure protocol 'http'. Valid protocol schemes: https
[error] 219-219: During the sync, the following streams did not sync successfully: stream_with_custom_requester: MessageRepresentationAirbyteTracedErrors("'GET' request to 'http://unsecured.protocol/api/v1/v3/marketing/lists?a_param=10&page_size=2' failed with exception: 'Invalid Protocol Scheme: The endpoint that data is being requested from is using an invalid or insecure protocol 'http'. Valid protocol schemes: https'")
[error] 167-167: Exception while syncing stream stream_with_custom_requester: Invalid URL specified or DNS error occurred: The endpoint that data is being requested from is not a valid URL.
[error] 219-219: During the sync, the following streams did not sync successfully: stream_with_custom_requester: UserDefinedBackoffException('Invalid URL specified or DNS error occurred: The endpoint that data is being requested from is not a valid URL.')
[error] 167-167: Exception while syncing stream stream_with_custom_requester: Exception: Token refresh API response was missing access token access_token
[error] 219-219: During the sync, the following streams did not sync successfully: stream_with_custom_requester: Exception('Token refresh API response was missing access token access_token')
[error] 166-166: Encountered an error while checking availability of stream pokemon. Error: This is an intentional failure for testing purposes.
[error] 219-219: During the sync, the following streams did not sync successfully: pokemon: IntentionalException('This is an intentional failure for testing purposes.')
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
[error] 167-167: Exception while syncing stream stream_with_custom_requester: Bad request. Please check your request parameters.
[error] 167-167: Exception while syncing stream stream_with_custom_requester: Too many requests.
[error] 219-219: During the sync, the following streams did not sync successfully: stream_with_custom_requester: UserDefinedBackoffException('Too many requests.')
[error] 167-167: Exception while syncing stream stream_with_custom_requester: Invalid URL endpoint: '10.0.27.27'
belongs to a private network
[error] 219-219: During the sync, the following streams did not sync successfully: stream_with_custom_requester: AirbyteTracedException("Invalid URL endpoint: '10.0.27.27'
belongs to a private network")
[error] 167-167: Exception while syncing stream stream_with_custom_requester: Invalid Protocol Schema: The endpoint that data is being requested from is using an invalid or insecure protocol 'http'. Valid protocol schemes: https
[error] 219-219: During the sync, the following streams did not sync successfully: stream_with_custom_requester: MessageRepresentationAirbyteTracedErrors("'GET' request to 'http://unsecured.protocol/api/v1/v3/marketing/lists?a_param=10&page_size=2' failed with exception: 'Invalid Protocol Scheme: The endpoint that data is being requested from is using an invalid or insecure protocol 'http'. Valid protocol schemes: https'")
[error] 167-167: Exception while syncing stream stream_with_custom_requester: Invalid URL specified or DNS error occurred: The endpoint that data is being requested from is not a valid URL.
[error] 219-219: During the sync, the following streams did not sync successfully: stream_with_custom_requester: UserDefinedBackoffException('Invalid URL specified or DNS error occurred: The endpoint that data is being requested from is not a valid URL.')
[error] 167-167: Exception while syncing stream stream_with_custom_requester: Exception: Token refresh API response was missing access token access_token
[error] 219-219: During the sync, the following streams did not sync successfully: stream_with_custom_requester: Exception('Token refresh API response was missing access token access_token')
[error] 166-166: Encountered an error while checking availability of stream pokemon. Error: This is an intentional failure for testing purposes.
[error] 219-219: During the sync, the following streams did not sync successfully: pokemon: IntentionalException('This is an intentional failure for testing purposes.')
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Check: source-shopify
🔇 Additional comments (41)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
7-8
: ConfirmAirbyteMessage
overlap withRecord
Record
is already anAirbyteMessage
wrapper in many parts of the CDK.
Including both in theQueueItem
union may create ambiguous overloads in type-narrowing code.
Would narrowingRecord
’s alias (or adding atyping.cast
where needed) help static tools distinguish the two, wdyt?unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1)
52-57
: LGTM! Keyword arguments improve clarity.The change from positional to keyword arguments in the
ConcurrentSource
constructor makes the code more readable and explicit. This aligns well with the updated constructor signature.airbyte_cdk/connector_builder/main.py (2)
94-94
: Good addition of required parameters.Adding
catalog
andstate
parameters to thecreate_source
call aligns with the updated function signature and enables proper concurrent processing with state management.
95-99
: Appropriate type ignore comment.The
type: ignore[no-any-return]
comment is well-justified sinceSerializer.dump()
always returns anAirbyteMessage
and the.decode()
separation improves readability.unit_tests/sources/streams/concurrent/test_partition_reader.py (3)
31-31
: Good addition of PartitionLogger parameter.Passing
None
for thePartitionLogger
parameter keeps the test focused while adapting to the updated constructor signature.
34-39
: Excellent use of real cursor for integration testing.Using a
FinalStateCursor
withInMemoryMessageRepository
provides better test coverage than a mock would, ensuring the cursor actually functions correctly in this scenario, wdyt?
57-58
: Good verification of cursor lifecycle methods.The assertions on
cursor.observe
andcursor.close_partition
ensure that the partition reader properly manages cursor state during processing.unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)
371-371
: Good update to new constructor signature.Replacing
component_factory=ModelToComponentFactory(disable_cache=True)
withlimits=TestLimits()
aligns with the updatedConcurrentDeclarativeSource
constructor and maintains test isolation.airbyte_cdk/sources/concurrent_source/concurrent_source.py (4)
46-46
: Good addition of queue injection capability.Adding the optional
queue
parameter enables external queue management while maintaining backward compatibility with the default internal queue creation.
97-101
: Reasonable bounded queue implementation.The maxsize=10_000 provides a sensible default to prevent unbounded memory growth. The comment clearly explains the rationale and acknowledges it may need tuning based on usage patterns, wdyt?
115-118
: Good integration of PartitionLogger.Passing the
PartitionLogger
toPartitionReader
enables proper logging integration while maintaining the separation of concerns between queue management and logging.
170-171
: Appropriate extension of message handling.Adding direct
AirbyteMessage
handling to_handle_item
extends the concurrent source's capability to process pre-formed messages alongside records, which aligns with the broader message routing improvements.unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (3)
179-185
: Test update looks good for the new cursor parameter!The addition of the
expected_cursor
retrieval and its inclusion in thesubmit
call correctly reflects the updatedPartitionReader.process_partition
signature. This maintains proper test coverage for the new API, wdyt?
206-212
: Consistent test update for cursor parameter!Great consistency with the previous test method - the cursor retrieval and parameter passing follows the same pattern and correctly tests the updated API signature.
1-795
: Cursor closing logic is covered in PartitionReader testsI’ve verified that
cursor.close_partition
is asserted inunit_tests/sources/streams/concurrent/test_partition_reader.py
(both in normal processing and exception scenarios). No additional tests are needed for this move—coverage remains intact.airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)
98-105
: Clean integration of cursor parameter into partition processing!The cursor retrieval from the stream instance and its inclusion in the
process_partition
call looks solid. This nicely enables the partition reader to handle cursor operations directly, which should improve the separation of concerns, wdyt?
118-127
: Cursor closing logic verified in PartitionReader.process_partitionI confirmed that
process_partition
inairbyte_cdk/sources/streams/concurrent/partition_reader.py
callscursor.close_partition(partition)inside its main
try/except
block, so any errors during close are caught and sent as aStreamThreadException
. The cursor lifecycle and error handling look solid. Would you like to add any extra logging around the close call for additional visibility, or is this sufficient? wdyt?unit_tests/connector_builder/test_connector_builder_handler.py (4)
56-59
: LGTM on the import migration!The import changes correctly replace
ManifestDeclarativeSource
withConcurrentDeclarativeSource
and add theTestLimits
import. This aligns well with the PR's goal of introducing concurrency-aware processing.
533-535
: Source instantiation looks good!The migration to
ConcurrentDeclarativeSource
with the new signature (includingcatalog
,config
,state
, andsource_config
parameters) appears correct and consistent with the concurrent processing changes.
890-891
: Nice update to the create_source call!The addition of
catalog
andstate
parameters to thecreate_source
call is consistent with the new function signature mentioned in the AI summary. This ensures proper initialization of the concurrent source.
1278-1278
: Question about the error expectation changesI noticed the error expectations changed from "AirbyteTracedException" to "StreamThreadException" on these lines. This suggests the concurrent processing introduces different exception handling. Could you confirm this is the expected behavior change with the new concurrent architecture? The change makes sense given we're now dealing with threaded execution, but I want to make sure this aligns with your expectations, wdyt?
Also applies to: 1284-1284
airbyte_cdk/sources/message/concurrent_repository.py (3)
11-22
: Excellent documentation of the ordering problem!The class documentation clearly explains why this wrapper is needed - the non-deterministic processing between main thread and partitions could cause incorrect message ordering, which is crucial for the connector builder's grouping logic.
28-31
: Question about potential message duplicationI noticed both
emit_message
andlog_message
callconsume_queue()
on the decorated repository after forwarding the message. If the decorated repository buffers multiple messages internally, this could potentially put duplicate messages on the queue across multiple calls.Have you verified that the decorated
InMemoryMessageRepository
only returns newly added messages fromconsume_queue()
, or does it return all buffered messages each time? If it's the latter, we might need to track what's already been forwarded, wdyt?Also applies to: 33-36
38-43
: Smart design choice on consume_queue!Returning an empty iterator makes perfect sense here since messages are immediately forwarded to the shared queue. The comment clearly explains why this method shouldn't be called in normal operation.
airbyte_cdk/sources/streams/concurrent/partition_reader.py (5)
18-37
: Nice encapsulation with PartitionLogger!The
PartitionLogger
class provides a clean way to inject logging dependencies and handles the slice logging logic appropriately. The conditional logging based onshould_log_slice_message
is a good optimization.
47-49
: Good integration of optional loggingThe optional
PartitionLogger
parameter allows for flexible usage while maintaining backward compatibility. Nice design choice!Also applies to: 54-54
56-56
: Question about cursor parameter additionThe addition of the
Cursor
parameter toprocess_partition
makes sense for the concurrency changes. I can see from the AI summary that this aligns with changes in the concurrent read processor. This breaking change seems necessary for proper cursor lifecycle management, wdyt?
69-70
: Clean logging integrationThe conditional logging of partitions is well-implemented and follows the established pattern of checking if logging should occur before emitting messages.
74-75
: Question about cursor lifecycle managementI see that
cursor.observe(record)
is called for every record andcursor.close_partition(partition)
is called after processing. A couple of questions:
- Have you considered the performance impact of calling
observe
for every record, especially for high-volume streams?- I notice
close_partition
is only called in the success path (line 75) but not in the exception handler. Is this intentional, or should we also close the partition on failure to ensure proper cleanup, wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
625-628
: LGTM! Good approach to avoid circular imports.The constant definition and explanation are clear. This provides a reasonable default for slice limiting.
1537-1560
: Consistent implementation of slice limiting for incrementing count cursor.The implementation properly mirrors the datetime-based cursor approach, correctly using the
MAX_SLICES
constant for the default value.
1473-1475
: Type casting is appropriately documented.The explicit type casting with explanatory comments is helpful. The mismatch between
ConcurrentCursor
and the low-codeStreamSlicer
interface is a known issue, and your comments clearly explain why this casting is necessary.Also applies to: 1551-1553
airbyte_cdk/connector_builder/connector_builder_handler.py (6)
6-7
: LGTM! Import changes align with the concurrent source migration.The removal of unused imports and addition of
asdict
is appropriate for the new implementation.
18-21
: Good import restructuring for concurrent source support.Importing
ConcurrentDeclarativeSource
andTestLimits
from the concurrent module is the right approach.
34-43
: Proper use of the imported TestLimits class.The function correctly instantiates
TestLimits
with appropriate defaults from class constants. The implementation maintains backward compatibility with the previous local dataclass structure.
73-78
: Smart enforcement of single-threaded processing for connector builder!Setting
default_concurrency
to 1 ensures message ordering is preserved for the connector builder UI, which is crucial for grouping records correctly. The code handles both existing and missing concurrency_level configurations properly.
80-87
: Proper instantiation of ConcurrentDeclarativeSource.All required parameters are correctly passed, including the new
limits
parameter andemit_connector_builder_messages=True
flag. The removal of the customModelToComponentFactory
is appropriate since it's now handled internally byConcurrentDeclarativeSource
.
65-70
: create_source signature is correctly propagated across all call sitesI verified that every caller—including all unit tests in
unit_tests/connector_builder/test_connector_builder_handler.py
and the main entry point inairbyte_cdk/connector_builder/main.py
—now passes bothcatalog
andstate
. There are no remaining references to the old signature.Looks good to merge! wdyt?
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
70-82
: Well-designed TestLimits dataclass!Good use of
ClassVar
for constants and the__test__
flag to prevent pytest confusion. The default values seem reasonable for testing scenarios. Nice touch with the descriptive constant names.
119-129
: Excellent configuration of ModelToComponentFactory with concurrent support!The setup properly:
- Wraps repositories with
ConcurrentMessageRepository
for ordered message handling- Applies test limits when provided
- Disables retries and caching during testing (smart choice!)
This aligns perfectly with the PR objective to control message routing and ordering.
165-165
: Good addition of queue parameter to ConcurrentSource.Passing the shared queue enables proper coordination between the concurrent source and message repositories, which is key to the PR's objective of controlling processing order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of questions but it seems overall very good
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
18-42
: Transitional solution looks good, but what's the timeline for the replacement?The PartitionLogger implementation is clean and serves its purpose well as a bridge between SliceLogger and MessageRepository. The comment mentions this should be replaced by emitting messages directly onto the repository - do you have a timeline for this refactor? It would help avoid accumulating technical debt. wdyt?
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
88-106
: Consider adding validation for slice_limit parameterThe implementation looks good for conditionally wrapping the stream slicer with a test decorator. However, should we validate that
slice_limit
is positive when provided? Also, I notice we're usingcast
here - isStreamSlicerTestReadDecorator
guaranteed to implement all methods of theStreamSlicer
interface, wdyt?def __init__( self, partition_factory: DeclarativePartitionFactory, stream_slicer: StreamSlicer, slice_limit: Optional[int] = None, ) -> None: self._partition_factory = partition_factory if slice_limit: + if slice_limit <= 0: + raise ValueError("slice_limit must be positive") self._stream_slicer = cast( StreamSlicer, StreamSlicerTestReadDecorator( wrapped_slicer=stream_slicer, maximum_number_of_slices=slice_limit, ), ) else: self._stream_slicer = stream_slicerairbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
70-83
: Consider using a different naming pattern to avoid pytest conflictsThe
TestLimits
dataclass looks well-structured with sensible defaults. However, using__test__ = False
to prevent pytest from treating this as a test class feels like a workaround. Would it make sense to rename this to something likeConcurrencyLimits
orProcessingLimits
to avoid the naming conflict altogether, wdyt?@dataclass -class TestLimits: - __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name +class ProcessingLimits: DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5 DEFAULT_MAX_SLICES: ClassVar[int] = 5 DEFAULT_MAX_RECORDS: ClassVar[int] = 100 DEFAULT_MAX_STREAMS: ClassVar[int] = 100 max_records: int = field(default=DEFAULT_MAX_RECORDS) max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE) max_slices: int = field(default=DEFAULT_MAX_SLICES) max_streams: int = field(default=DEFAULT_MAX_STREAMS)
98-132
: Queue size configuration and error handlingThe queue implementation with bounded size is a good approach to prevent memory issues. The comment indicates that 10,000 is arbitrary and might need to be configurable. Should we make this configurable through the
limits
parameter? Also, should we consider what happens when the queue is full - will producers block appropriately, wdyt?Consider adding queue size to the limits configuration:
@dataclass class TestLimits: __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5 DEFAULT_MAX_SLICES: ClassVar[int] = 5 DEFAULT_MAX_RECORDS: ClassVar[int] = 100 DEFAULT_MAX_STREAMS: ClassVar[int] = 100 + DEFAULT_QUEUE_SIZE: ClassVar[int] = 10_000 max_records: int = field(default=DEFAULT_MAX_RECORDS) max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE) max_slices: int = field(default=DEFAULT_MAX_SLICES) max_streams: int = field(default=DEFAULT_MAX_STREAMS) + queue_size: int = field(default=DEFAULT_QUEUE_SIZE)Then use it in the constructor:
- queue: Queue[QueueItem] = Queue(maxsize=10_000) + queue_size = limits.queue_size if limits else 10_000 + queue: Queue[QueueItem] = Queue(maxsize=queue_size)
329-331
: Consider extracting slice_limit logic to reduce duplicationI notice the same pattern
self._limits.max_slices if self._limits else None
is repeated in multiple places throughout_group_streams
. Would it be cleaner to extract this to a property or method to reduce duplication, wdyt?+ @property + def _slice_limit(self) -> Optional[int]: + """Get the slice limit from limits configuration if available.""" + return self._limits.max_slices if self._limits else None + def _group_streams( self, config: Mapping[str, Any] ) -> Tuple[List[AbstractStream], List[Stream]]:Then replace all occurrences:
- slice_limit=self._limits.max_slices - if self._limits - else None, # technically not needed because create_declarative_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later + slice_limit=self._slice_limit, # technically not needed because create_declarative_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed laterAlso applies to: 363-363, 395-397, 459-459
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(9 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(1 hunks)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
(2 hunks)airbyte_cdk/sources/streams/concurrent/partition_reader.py
(3 hunks)airbyte_cdk/sources/utils/slice_logger.py
(1 hunks)unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
(2 hunks)
✅ Files skipped from review due to trivial changes (2)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
- airbyte_cdk/sources/utils/slice_logger.py
🚧 Files skipped from review as they are similar to previous changes (1)
- unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: when modifying the `yamldeclarativesource` class in `airbyte_cdk/sources/declarative/yaml_declarativ...
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: the files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from ...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (9)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (3)
5-5
: LGTM on the new imports!The added imports (Optional, MessageRepository, Cursor, SliceLogger) are all necessary for the new functionality and correctly placed.
Also applies to: 8-9, 15-15
52-59
: Nice backward-compatible approach with the optional PartitionLogger!The constructor change maintains compatibility while enabling the new logging functionality. Clean implementation.
74-81
: Cursor integration looks solid, but should we handle close_partition exceptions?The cursor lifecycle management (observe + close_partition) aligns perfectly with the PR objectives. One thought - what happens if
cursor.close_partition(partition)
throws an exception? Currently it would bubble up and not be caught by the try-catch, potentially leaving the partition in an inconsistent state.Should we wrap the close_partition call in its own try-catch to ensure the PartitionCompleteSentinel is always queued? wdyt?
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2)
1-1
: LGTM!Standard copyright year update.
3-8
: LGTM!The new imports are appropriate for the slice limiting functionality being added.
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
5-11
: LGTM!The new imports are necessary for the queue-based concurrent processing and message repository implementation.
55-57
: LGTM!Good addition of the concurrent message repository imports for proper message ordering.
67-68
: LGTM!Necessary import for the queue item type used in concurrent processing.
167-167
: LGTM!Good integration of the queue with the concurrent source to enable proper coordination of concurrent processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the discussions we had, I'm good with this change
…through the concurrent CDK by reworking message routing and order through the queue and message repositories (#688)
Problem
Although it is not a substantial lift to swap out the existing
ManifestDeclarativeSource
with theConcurrentDeclarativeSource
for use by theconnector_builder_handler
and set the threading to1
.However, the main issue is that we can't control when we process records from the primary message queue vs processing the next partition of data to extract records. This in turn will lead to incorrect grouping of records because we might insert the next partition of records before we close the previous partition and emit a state message.
Implementation Details
The main changes included in this PR are:
ConcurrentMessageRepository
which basically wraps the main message queue that records are emitted onConcurrentReadProcessor
to thePartitionReader
so that we immediately put state messages onto the queue after finishing a partitionAirbyteStateBlob
and the fact that its actually a dynamic field dataclass. It doesn't serialize into a dict properly and always ends up being{}
. I unwrapped the object to a dict which now renders the state valuetodo: fixing the last few tests
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests