Skip to content

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented Aug 5, 2025

Re-adds two of the commits related to moving the connector builder to the concurrent CDK which were originally reverted in: #694

One additional fix that is included is that we now consume the remaining records in the queue in message_grouper.py because we noticed an issue where we would end up in a deadlock on the syncing thread if the amount of records retrieved exceeded the queue size which defaulted to 10,000 items

Summary by CodeRabbit

  • New Features

    • Introduced configurable concurrency and record limits for sources, allowing control over maximum records, slices, and streams during syncs.
    • Added support for injecting custom queues and improved message ordering in concurrent processing.
    • Enhanced logging and message repository integration for better visibility and debugging.
    • Added global record limit enforcement across partitions during data reads.
  • Bug Fixes

    • Improved handling and formatting of state messages for downstream processing.
    • Ensured correct message ordering and state serialization in concurrent scenarios.
  • Tests

    • Updated and added tests to validate new concurrency limits and message handling.
    • Removed outdated tests related to previous record limit implementations.
    • Adjusted tests to reflect changes in source creation signatures and cursor handling.
  • Chores

    • Refactored imports, removed unused code, and clarified parameter usage for improved maintainability.
    • Updated documentation comments to clarify future intentions and code behavior.

brianjlai and others added 3 commits August 5, 2025 09:45
…through the concurrent CDK by reworking message routing and order through the queue and message repositories (#688)
@brianjlai brianjlai requested review from lmossman and maxi297 August 5, 2025 22:58
@github-actions github-actions bot added the bug Something isn't working label Aug 5, 2025
Copy link

github-actions bot commented Aug 5, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@brian/remerge_concurrent_cdk_builder_changes#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch brian/remerge_concurrent_cdk_builder_changes

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

Copy link
Contributor

coderabbitai bot commented Aug 5, 2025

📝 Walkthrough

Walkthrough

This update migrates the connector builder and related sources from using ManifestDeclarativeSource to ConcurrentDeclarativeSource, introducing concurrency and test limit controls. It adds queue injection, logging enhancements, and global record limits across partitions. Test suites are updated to align with these changes, and some record limit tests are removed or replaced.

Changes

Cohort / File(s) Change Summary
Connector Builder Handler & Main
airbyte_cdk/connector_builder/connector_builder_handler.py, airbyte_cdk/connector_builder/main.py
Switched to ConcurrentDeclarativeSource with concurrency and test limits, updated create_source to accept catalog and state, and adjusted function calls accordingly. Removed local TestLimits in favor of imported one.
Test Reader Helpers & Message Grouper
airbyte_cdk/connector_builder/test_reader/helpers.py, airbyte_cdk/connector_builder/test_reader/message_grouper.py
Added utility to convert AirbyteStateBlob to dict, updated slice handling to apply conversion, and changed message grouping to consume all messages irrespective of record limit.
Concurrent Source Infrastructure
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py, airbyte_cdk/sources/concurrent_source/concurrent_source.py
Modified partition processing to pass cursors, removed cursor closing logic from completion sentinel, enabled optional queue injection, improved queue consumption order, and extended queue item handling to yield AirbyteMessage.
Declarative Source Concurrency & Limits
airbyte_cdk/sources/declarative/concurrent_declarative_source.py, airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py, airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py, airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
Introduced TestLimits dataclass and concurrency limits, updated ConcurrentDeclarativeSource constructor and stream grouping to enforce slice and record limits, added global record counter, and cleaned up imports and constants.
Concurrent Message Repository
airbyte_cdk/sources/message/concurrent_repository.py
Added ConcurrentMessageRepository class to synchronize message emission onto a shared queue, preserving message order across threads.
Concurrent Partition Reader & Types
airbyte_cdk/sources/streams/concurrent/partition_reader.py, airbyte_cdk/sources/streams/concurrent/partitions/types.py
Added PartitionLogger to log slices directly to message repository, updated PartitionReader to accept logger and cursor, and expanded QueueItem union to include AirbyteMessage.
Slice Logger Utility
airbyte_cdk/sources/utils/slice_logger.py
Added comment about future removal of SliceLogger in favor of direct message repository logging to avoid ordering issues.
Connector Builder & Source Tests
unit_tests/connector_builder/test_connector_builder_handler.py, unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py, unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py, unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py, unit_tests/sources/streams/concurrent/test_partition_reader.py, unit_tests/connector_builder/test_message_grouper.py, unit_tests/sources/declarative/retrievers/test_simple_retriever.py, unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
Updated tests to use ConcurrentDeclarativeSource and TestLimits, added tests for max record limits, removed some record limit tests, adapted mocks and assertions for cursor handling and concurrency changes, and removed a test for exception handling on partition completion.

Sequence Diagram(s)

sequenceDiagram
    participant Main as main.py
    participant Handler as connector_builder_handler.py
    participant Source as ConcurrentDeclarativeSource
    participant Queue as Queue
    participant Processor as ConcurrentReadProcessor
    participant Partition as PartitionReader
    participant Repo as ConcurrentMessageRepository

    Main->>Handler: handle_request(config, catalog, state)
    Handler->>Handler: get_limits(config)
    Handler->>Source: create_source(config, limits, catalog, state)
    Source->>Queue: create internal queue
    Source->>Repo: wrap message repository with queue
    Source->>Processor: start partition processing
    Processor->>Partition: process_partition(partition, cursor)
    Partition->>Repo: emit messages
    Repo->>Queue: enqueue messages to main thread
    Processor->>Source: signal completion
    Source->>Handler: return results
    Handler->>Main: response
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • airbytehq/airbyte-python-cdk#39: Refactors ConcurrentDeclarativeSource and its stream grouping logic, which is also directly modified in this PR.
  • airbytehq/airbyte-python-cdk#694: Reverts concurrency-related changes to connector_builder_handler.py, directly overlapping with the modifications in this PR.
  • airbytehq/airbyte-python-cdk#455: Introduces TestLimits and updates connector_builder_handler.py to use ConcurrentDeclarativeSource and concurrency limits, strongly related to this PR.

Would you like to review these PRs together to ensure consistency and avoid regressions, wdyt?

Suggested labels

enhancement

Suggested reviewers

  • maxi297

Would you like to add any additional reviewers, or is this list sufficient for now, wdyt?

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 77d2b59 and 4d2bb96.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (6 hunks)
  • unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-intercom
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch brian/remerge_concurrent_cdk_builder_changes

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
airbyte_cdk/sources/message/concurrent_repository.py (1)

28-36: Consider handling potential queue blocking

Both emit_message and log_message immediately put all consumed messages into the queue. If the wrapped repository yields many messages and the queue is bounded (as mentioned in the PR description with default 10k items), these methods could block. Should we consider checking queue capacity or using put_nowait with appropriate error handling, wdyt?

 def emit_message(self, message: AirbyteMessage) -> None:
     self._decorated_message_repository.emit_message(message)
     for message in self._decorated_message_repository.consume_queue():
-        self._queue.put(message)
+        try:
+            self._queue.put_nowait(message)
+        except queue.Full:
+            # Log warning or handle gracefully
+            self._queue.put(message)  # Fall back to blocking put
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)

96-105: Consider adding validation for slice_limit parameter

The implementation looks good! The conditional wrapping with StreamSlicerTestReadDecorator cleanly handles the slice limiting functionality. However, should we add validation to ensure slice_limit is positive when provided? Something like:

 if slice_limit:
+    if slice_limit <= 0:
+        raise ValueError(f"slice_limit must be positive, got {slice_limit}")
     self._stream_slicer = cast(
         StreamSlicer,
         StreamSlicerTestReadDecorator(
             wrapped_slicer=stream_slicer,
             maximum_number_of_slices=slice_limit,
         ),
     )

This would prevent potential issues with invalid limit values. wdyt?

airbyte_cdk/sources/concurrent_source/concurrent_source.py (1)

97-101: Consider making the queue size configurable.

The hardcoded queue size of 10,000 is mentioned as arbitrary and might need adjustment based on the source. Would it make sense to add a configurable parameter for this, perhaps as part of the timeout_seconds parameter group? This would allow tuning per source without code changes. wdyt?

airbyte_cdk/connector_builder/test_reader/helpers.py (1)

725-742: Good solution for the AirbyteStateBlob serialization issue.

The function correctly handles the conversion from AirbyteStateBlob to a proper dictionary, which resolves the serialization problems with asdict(). The docstring clearly explains the issue being solved.

One minor suggestion: Consider using typing.overload to provide better type hints for the two different return types based on the input type, which would eliminate the need for some of the type ignores. wdyt?

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

333-335: Consider refactoring the repeated slice_limit pattern.

The slice_limit is applied identically in 4 places. Would it be cleaner to extract this into a helper method or property? Something like:

@property
def _slice_limit(self) -> Optional[int]:
    return self._limits.max_slices if self._limits else None

This would reduce duplication and make future changes easier. wdyt?

Also applies to: 367-367, 399-401, 463-463

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cfc271d and c7562b8.

📒 Files selected for processing (19)
  • airbyte_cdk/connector_builder/connector_builder_handler.py (3 hunks)
  • airbyte_cdk/connector_builder/main.py (1 hunks)
  • airbyte_cdk/connector_builder/test_reader/helpers.py (4 hunks)
  • airbyte_cdk/connector_builder/test_reader/message_grouper.py (1 hunks)
  • airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2 hunks)
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py (7 hunks)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (9 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1 hunks)
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2 hunks)
  • airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (1 hunks)
  • airbyte_cdk/sources/message/concurrent_repository.py (1 hunks)
  • airbyte_cdk/sources/streams/concurrent/partition_reader.py (3 hunks)
  • airbyte_cdk/sources/streams/concurrent/partitions/types.py (2 hunks)
  • airbyte_cdk/sources/utils/slice_logger.py (1 hunks)
  • unit_tests/connector_builder/test_connector_builder_handler.py (16 hunks)
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (2 hunks)
  • unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1 hunks)
  • unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (2 hunks)
  • unit_tests/sources/streams/concurrent/test_partition_reader.py (5 hunks)
🧰 Additional context used
🧠 Learnings (10)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: the files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from ...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
  • airbyte_cdk/sources/utils/slice_logger.py
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
  • airbyte_cdk/sources/streams/concurrent/partitions/types.py
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • airbyte_cdk/connector_builder/main.py
  • unit_tests/connector_builder/test_connector_builder_handler.py
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • airbyte_cdk/connector_builder/test_reader/helpers.py
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
  • airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: when code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repositor...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.

Applied to files:

  • airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • unit_tests/connector_builder/test_connector_builder_handler.py
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: when modifying the `yamldeclarativesource` class in `airbyte_cdk/sources/declarative/yaml_declarativ...
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.

Applied to files:

  • airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
  • airbyte_cdk/sources/utils/slice_logger.py
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
  • unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py
  • airbyte_cdk/sources/streams/concurrent/partitions/types.py
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • airbyte_cdk/connector_builder/main.py
  • unit_tests/connector_builder/test_connector_builder_handler.py
  • airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • airbyte_cdk/sources/streams/concurrent/partition_reader.py
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
  • airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: in the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict modu...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.

Applied to files:

  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: in the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-g...
Learnt from: pnilan
PR: airbytehq/airbyte-python-cdk#0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.

Applied to files:

  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
📚 Learning: the custompageincrement class in unit_tests/source_declarative_manifest/resources/source_the_guardia...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.

Applied to files:

  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
  • unit_tests/connector_builder/test_connector_builder_handler.py
  • airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: in the project, the `run` function is defined in `airbyte_cdk/cli/source_declarative_manifest/_run.p...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: pyproject.toml:108-111
Timestamp: 2024-11-15T00:58:15.446Z
Learning: In the project, the `run` function is defined in `airbyte_cdk/cli/source_declarative_manifest/_run.py` and is imported into the module's `__init__.py`.

Applied to files:

  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
📚 Learning: in the typetransformer class, the data being transformed comes from api responses or source systems,...
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#221
File: airbyte_cdk/sources/utils/transform.py:0-0
Timestamp: 2025-01-16T00:50:39.069Z
Learning: In the TypeTransformer class, the data being transformed comes from API responses or source systems, so only standard JSON-serializable types are expected. The python_to_json mapping covers all expected types, and it's designed to fail fast (KeyError) on unexpected custom types rather than providing fallbacks.

Applied to files:

  • airbyte_cdk/connector_builder/test_reader/helpers.py
📚 Learning: copying files from `site-packages` in the dockerfile maintains compatibility with both the old file ...
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#90
File: Dockerfile:16-21
Timestamp: 2024-12-02T18:36:04.346Z
Learning: Copying files from `site-packages` in the Dockerfile maintains compatibility with both the old file structure that manifest-only connectors expect and the new package-based structure where SDM is part of the CDK.

Applied to files:

  • airbyte_cdk/connector_builder/connector_builder_handler.py
🧬 Code Graph Analysis (5)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (4)
airbyte_cdk/models/airbyte_protocol.py (1)
  • AirbyteMessage (79-88)
airbyte_cdk/sources/types.py (1)
  • Record (21-72)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
  • Partition (11-48)
airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
  • PartitionGenerationCompletedSentinel (9-24)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (4)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
  • cursor (93-96)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
  • on_partition (89-105)
airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (1)
  • submit (45-46)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
  • process_partition (61-84)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (6)
airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
  • cursor (101-102)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
  • cursor (93-96)
airbyte_cdk/sources/utils/slice_logger.py (3)
  • should_log_slice_message (41-46)
  • should_log_slice_message (50-56)
  • should_log_slice_message (60-61)
airbyte_cdk/sources/message/concurrent_repository.py (2)
  • emit_message (28-31)
  • consume_queue (38-43)
airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (1)
  • submit (45-46)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
  • process_partition (61-84)
airbyte_cdk/sources/message/concurrent_repository.py (3)
airbyte_cdk/models/airbyte_protocol.py (1)
  • AirbyteMessage (79-88)
airbyte_cdk/connector_builder/models.py (1)
  • LogMessage (25-29)
airbyte_cdk/sources/message/repository.py (1)
  • MessageRepository (45-60)
airbyte_cdk/connector_builder/test_reader/helpers.py (2)
airbyte_cdk/models/airbyte_protocol.py (2)
  • AirbyteStateBlob (15-50)
  • AirbyteStateMessage (67-75)
unit_tests/connector_builder/test_message_grouper.py (1)
  • state_message (966-974)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (31)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

625-627: LGTM! Clean solution for the circular import constraint.

The constant definition with the explanatory comment is a good approach to handle the circular import issue while maintaining clarity about the intended source. The value of 5 aligns well with the concurrency limits being introduced across the codebase, wdyt?

airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (2)

7-7: LGTM! Clean import optimization.

The removal of unused type hints (Mapping, Optional, Union) from the typing import is a good cleanup. The remaining Any and Iterable types are still being used in the class.


10-10: LGTM! Removed unused StreamState import.

The StreamState import was removed since it's no longer used in this file. Good cleanup!

airbyte_cdk/sources/utils/slice_logger.py (1)

14-17: Excellent documentation for deprecation path.

This comment provides valuable context about the future direction of slice logging in the concurrent CDK. It clearly explains why direct message repository usage is preferred over logger-based approaches for maintaining message ordering, especially in connector builder scenarios.

unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (2)

13-13: LGTM! Updated import for new concurrent architecture.

The import change from ModelToComponentFactory to TestLimits aligns with the architectural shift to the concurrent CDK where source configuration is now handled through limits rather than component factory overrides.


360-360: LGTM! Correctly adapted to new ConcurrentDeclarativeSource API.

The change from component_factory=ModelToComponentFactory(disable_cache=True) to limits=TestLimits() maintains the same testing intent (avoiding caching between tests) while using the new concurrent declarative source configuration approach. The comment explaining the purpose is helpful too!

airbyte_cdk/connector_builder/main.py (2)

94-94: LGTM! Updated to new create_source API.

The addition of catalog=catalog, state=state parameters aligns with the updated create_source function signature that now returns a ConcurrentDeclarativeSource. Using keyword arguments makes the call more explicit and maintainable.


95-99: LGTM! Reasonable type ignore for orjson.

The type ignore comment for orjson.dumps is justified since the return type checking issue is with the serializer's interaction with orjson. Moving .decode() to its own line improves readability too.

airbyte_cdk/sources/streams/concurrent/partitions/types.py (2)

7-7: LGTM! Added AirbyteMessage import for queue type extension.

Adding the AirbyteMessage import supports expanding the QueueItem union to handle protocol messages directly in the concurrent queue system.


43-43: LGTM! Expanded QueueItem to handle AirbyteMessage instances.

Adding AirbyteMessage to the QueueItem union makes sense for the concurrent streaming architecture, allowing the queue to handle Airbyte protocol messages directly alongside records, partitions, and sentinels. This aligns well with the concurrent CDK changes mentioned in the PR objectives, wdyt?

airbyte_cdk/sources/message/concurrent_repository.py (1)

30-31: QueueItem Type Includes AirbyteMessage—No Change Needed

The QueueItem union (in airbyte_cdk/sources/streams/concurrent/partitions/types.py:37–44) explicitly lists AirbyteMessage. Pushing an AirbyteMessage into Queue[QueueItem] is therefore type-safe. No changes are needed here—wdyt?

airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)

18-43: LGTM! Clean encapsulation of partition logging

The PartitionLogger class nicely encapsulates the slice logging logic. The comment suggests this might be refactored in the future to emit messages directly - is there a plan to track this potential improvement?


79-80: Good cursor lifecycle management

Nice addition of cursor observation for each record and explicit partition closure. This ensures proper tracking and cleanup of cursor state.

airbyte_cdk/connector_builder/connector_builder_handler.py (2)

73-78: Smart approach to enforce single-threaded processing

Good implementation of forcing concurrency to 1 for message ordering. The code handles both cases (existing and missing concurrency_level) correctly. This ensures deterministic message grouping for the connector builder.


34-43: Clean refactor to use imported TestLimits

Good move to use the imported TestLimits class with its default constants instead of maintaining a local definition. This reduces duplication and ensures consistency.

airbyte_cdk/connector_builder/test_reader/message_grouper.py (1)

98-108: Critical deadlock fix looks good!

Excellent fix for the deadlock issue. Continuing to consume messages from the queue even after hitting the record limit prevents the producer threads from blocking on a full queue. The comment clearly explains the rationale and alternative approaches considered. This is indeed the simplest and safest solution.

unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1)

52-57: Good improvement using keyword arguments

Nice refactor to use keyword arguments instead of positional ones. This makes the code more readable and less prone to errors when the constructor signature changes.

unit_tests/sources/streams/concurrent/test_partition_reader.py (3)

32-43: LGTM! Proper cursor initialization for test

The test correctly creates a FinalStateCursor instance with the required parameters and passes it to process_partition, aligning with the new method signature.


45-57: Good test coverage for cursor interactions

The test properly verifies that both cursor.observe and cursor.close_partition are called during successful partition processing. Using a mock cursor is appropriate here for verifying these interactions.


75-91: Excellent test coverage for cursor closing exceptions

This new test properly verifies that when cursor.close_partition raises an exception:

  1. Already processed records are preserved in the queue
  2. The exception is wrapped in a StreamThreadException
  3. A PartitionCompleteSentinel is still emitted

This ensures robust error handling in the partition processing flow.

unit_tests/connector_builder/test_connector_builder_handler.py (1)

903-905: Clear documentation of concurrent behavior

Good job documenting the expected behavior! The comment clearly explains that with concurrent processing, we expect 5 partition read attempts (one per partition) with no retries on 429 responses.

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)

89-105: Clean cursor handling in partition processing

Good refactoring! Retrieving the cursor from the stream instance and passing it to the partition reader ensures proper cursor lifecycle management within the processing thread. This eliminates potential race conditions that could occur with shared cursor access.


107-127: Excellent simplification of completion handling

The removal of cursor closing and exception handling from this method is a great improvement! By moving these responsibilities to the partition reader thread, you've:

  1. Eliminated potential race conditions
  2. Simplified the completion logic
  3. Better separated concerns

The check for partition existence in the running set (line 119) properly handles cases where a partition might have already been removed due to errors.

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (1)

179-185: Proper test verification of cursor passing

Good test update! The test correctly verifies that the cursor from the stream instance is passed to process_partition. This ensures proper cursor-stream association in the concurrent processing model.

airbyte_cdk/sources/concurrent_source/concurrent_source.py (4)

46-46: LGTM! Good addition for queue injection.

The optional queue parameter enables better testability and external control over queue behavior, which aligns well with the concurrent source architecture.

Also applies to: 63-69


110-118: LGTM! Proper queue propagation.

The queue is correctly passed to both PartitionEnqueuer and PartitionReader, and the PartitionLogger wrapper provides clean separation of logging concerns.

Also applies to: 126-126


150-155: Key fix for the deadlock issue!

This reordering of the loop condition is crucial - by checking queue.empty() before is_done(), we ensure all queued items are processed even if an exception occurs. This directly addresses the deadlock scenario described in the PR objectives where records exceeding the queue size could get stuck.


173-174: LGTM! Clean handling of AirbyteMessage queue items.

The direct yielding of AirbyteMessage items is appropriate since they're already in the final format.

airbyte_cdk/connector_builder/test_reader/helpers.py (1)

471-471: LGTM! Proper state conversion before serialization.

The state message is correctly converted using the new helper function before being included in the StreamReadSlices response.

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

70-82: Well-structured test limits configuration.

Good use of a dataclass to encapsulate test limits, and the __test__ = False is a nice touch to prevent pytest confusion.


98-131: LGTM! Clean integration of concurrency controls.

The constructor properly integrates the test limits, queue management, and message repository. The bounded queue with ConcurrentMessageRepository wrapper provides good concurrency control, and the limits are appropriately propagated to the component factory.

Also applies to: 139-142, 171-172

Copy link

github-actions bot commented Aug 5, 2025

PyTest Results (Fast)

3 694 tests   - 2   3 683 ✅  - 2   6m 29s ⏱️ +4s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 4d2bb96. ± Comparison against base commit e1664ec.

This pull request removes 6 and adds 4 tests. Note that renamed tests count towards both.
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_default_record_limit[test_create_request_no_record_limit]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_default_record_limit[test_create_request_no_record_limit_n_records_exceed_max]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_record_limit[test_create_request_record_limit_exceeds_max]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_record_limit[test_create_request_with_record_limit]
unit_tests.sources.streams.concurrent.test_concurrent_read_processor.TestConcurrentReadProcessor ‑ test_given_exception_on_partition_complete_sentinel_then_yield_error_trace_message_and_stream_is_incomplete
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.sources.declarative.stream_slicers.test_declarative_partition_generator.StreamSlicerPartitionGeneratorTest ‑ test_max_records_reached_on_previous_partition
unit_tests.sources.declarative.stream_slicers.test_declarative_partition_generator.StreamSlicerPartitionGeneratorTest ‑ test_max_records_reached_stops_reading
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_close_slice_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_read_when_process_partition_then_queue_records_and_exception_and_sentinel

♻️ This comment has been updated with latest results.

Copy link

github-actions bot commented Aug 5, 2025

PyTest Results (Full)

3 697 tests   - 2   3 686 ✅  - 2   11m 54s ⏱️ +24s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 4d2bb96. ± Comparison against base commit e1664ec.

This pull request removes 6 and adds 4 tests. Note that renamed tests count towards both.
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_default_record_limit[test_create_request_no_record_limit]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_default_record_limit[test_create_request_no_record_limit_n_records_exceed_max]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_record_limit[test_create_request_record_limit_exceeds_max]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_record_limit[test_create_request_with_record_limit]
unit_tests.sources.streams.concurrent.test_concurrent_read_processor.TestConcurrentReadProcessor ‑ test_given_exception_on_partition_complete_sentinel_then_yield_error_trace_message_and_stream_is_incomplete
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.sources.declarative.stream_slicers.test_declarative_partition_generator.StreamSlicerPartitionGeneratorTest ‑ test_max_records_reached_on_previous_partition
unit_tests.sources.declarative.stream_slicers.test_declarative_partition_generator.StreamSlicerPartitionGeneratorTest ‑ test_max_records_reached_stops_reading
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_close_slice_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_read_when_process_partition_then_queue_records_and_exception_and_sentinel

♻️ This comment has been updated with latest results.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)

18-22: Consider consolidating logging approaches

The comment mentions this is an intermediary approach that "should just be replaced by emitting messages directly onto the repository." Would it make sense to track this as a follow-up task to ensure we eventually consolidate the logging approaches, wdyt?

airbyte_cdk/sources/concurrent_source/concurrent_source.py (1)

101-105: Queue size configuration looks reasonable

The hardcoded queue size of 10,000 with the explanatory comment makes sense. However, since you mention it "might even need to be configurable depending on the source," should we consider making this configurable now, or would you prefer to wait until we have more data on actual usage patterns?

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

70-82: Consider renaming TestLimits for clarity

The TestLimits name might be confusing since it's not actually a test class. While the __test__ = False attribute prevents pytest issues, would it make sense to rename this to something like ConcurrencyLimits or ReadLimits to better reflect its purpose as a configuration dataclass, wdyt?


108-112: Duplicate queue size configuration

I notice this queue maxsize of 10,000 is duplicated from the ConcurrentSource. Should we consider extracting this to a shared constant to maintain consistency across both classes?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1034fd3 and 2580057.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py (7 hunks)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (9 hunks)
  • airbyte_cdk/sources/streams/concurrent/partition_reader.py (3 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: when modifying the `yamldeclarativesource` class in `airbyte_cdk/sources/declarative/yaml_declarativ...
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.

Applied to files:

  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: the files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from ...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: when code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repositor...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.

Applied to files:

  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
airbyte_cdk/sources/message/concurrent_repository.py (1)
  • ConcurrentMessageRepository (11-43)
airbyte_cdk/sources/message/repository.py (2)
  • InMemoryMessageRepository (74-95)
  • MessageRepository (45-60)
airbyte_cdk/sources/connector_state_manager.py (1)
  • ConnectorStateManager (32-161)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
  • ModelToComponentFactory (630-4140)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check: source-shopify
🔇 Additional comments (6)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)

81-92: Record limit enforcement looks good

The per-partition record limit implementation is clean and aligns with the PR objectives for addressing the deadlock issue. The comment clearly explains that the limit applies to individual partitions rather than globally.


93-93: Cursor cleanup properly handled

Good addition of cursor.close_partition() to ensure proper cleanup after processing each partition.

airbyte_cdk/sources/concurrent_source/concurrent_source.py (2)

155-160: Good fix for preventing missed queue items

The reordering to check queue.empty() before concurrent_stream_processor.is_done() properly addresses the edge case where exceptions could cause remaining items to be missed. The comment clearly explains the reasoning.


178-179: Direct AirbyteMessage yielding properly handled

The addition of AirbyteMessage handling aligns with the expanded QueueItem union type mentioned in the summary.

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

124-124: Good use of ConcurrentMessageRepository wrapper

The ConcurrentMessageRepository wrapper ensures proper message ordering in concurrent scenarios, which is crucial for the connector builder's message grouping functionality mentioned in the PR objectives.


334-336: Consistent slice limit application

The slice limits are consistently applied across all partition generator types, which provides good control over concurrent processing. The inline comments about the technical necessity are helpful for future maintainers.

Also applies to: 368-368, 400-402, 464-464

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)

1571-1611: Comprehensive test for max_records limit!

This test effectively validates that the retriever respects the max_records limit by stopping at exactly 5 records when 7 are available. The mock setup and assertions look solid. One small question - should we also verify that _total_records_read is correctly updated to 5 after this test, wdyt? It might help ensure the counter state is properly maintained.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2580057 and d1548c0.

📒 Files selected for processing (8)
  • airbyte_cdk/connector_builder/test_reader/message_grouper.py (1 hunks)
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py (7 hunks)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (9 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4 hunks)
  • airbyte_cdk/sources/streams/concurrent/partition_reader.py (3 hunks)
  • unit_tests/connector_builder/test_message_grouper.py (0 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (2 hunks)
💤 Files with no reviewable changes (1)
  • unit_tests/connector_builder/test_message_grouper.py
🚧 Files skipped from review as they are similar to previous changes (5)
  • airbyte_cdk/connector_builder/test_reader/message_grouper.py
  • airbyte_cdk/sources/streams/concurrent/partition_reader.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: the files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from ...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py
📚 Learning: when modifying the `yamldeclarativesource` class in `airbyte_cdk/sources/declarative/yaml_declarativ...
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.

Applied to files:

  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py
🧬 Code Graph Analysis (1)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (3)
airbyte_cdk/sources/types.py (5)
  • Record (21-72)
  • data (35-36)
  • StreamSlice (75-169)
  • cursor_slice (107-112)
  • partition (99-104)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (5)
  • name (108-116)
  • name (119-121)
  • primary_key (292-294)
  • primary_key (297-299)
  • read_records (494-549)
unit_tests/sources/fixtures/source_test_fixture.py (1)
  • primary_key (98-99)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (7)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)

95-95: LGTM - Clean attribute addition!

The max_records attribute is well-typed and properly defaulted. Nice addition for controlling record limits in test scenarios, wdyt?


105-105: Good counter initialization!

Initializing _total_records_read to 0 in __post_init__ is the right approach for tracking cumulative records across calls.


507-510: Efficient early exit logic!

The early return when max_records is already reached prevents unnecessary processing. This should help avoid the deadlock scenarios mentioned in the PR objectives by stopping record retrieval early, wdyt?


540-545: Confirm single-threaded retriever use—no sync needed

Our search didn’t find any places where a SimpleRetriever instance is shared across threads, and CDK retrievers run per-stream in a single-threaded loop. That means the _total_records_read increment is safe without additional locks. Let me know if there’s a concurrent usage pattern I overlooked! wdyt?

unit_tests/sources/declarative/retrievers/test_simple_retriever.py (3)

14-21: Standard imports addition!

The additional imports from airbyte_cdk.models look appropriate for the test functionality. Clean and well-organized import structure.


1613-1648: Excellent edge case coverage!

This test perfectly validates the early exit behavior when the max_records limit has already been reached from previous reads. Setting _total_records_read = 5 manually is a good way to simulate the state. The assertion for 0 records returned is spot-on.


1650-1687: Great test for partial reading scenario!

This test nicely validates the incremental behavior where some records have already been read (_total_records_read = 2) and only the remaining allowed records (3 out of 5 available) are returned. The logic verification is solid. The test coverage for the max_records feature is quite comprehensive across these three test functions!

@brianjlai
Copy link
Contributor Author

Most of the changes to support limiting at the global level will be in airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)

72-94: Potential off-by-one error in record counting

The counter is incremented after yielding the record (line 93), but the check happens before processing (lines 73-76). This means when total_record_counter equals max_records_limit - 1, one more record will be yielded before the counter reaches the limit. Is this intentional, or should the increment happen before the yield to ensure exactly max_records_limit records? wdyt?

 def read(self) -> Iterable[Record]:
     for stream_data in self._retriever.read_records(self._json_schema, self._stream_slice):
         if self._max_records_limit:
             global total_record_counter
             if total_record_counter >= self._max_records_limit:
                 break
+            total_record_counter += 1

         if isinstance(stream_data, Mapping):
             record = (
                 stream_data
                 if isinstance(stream_data, Record)
                 else Record(
                     data=stream_data,
                     stream_name=self.stream_name(),
                     associated_slice=self._stream_slice,
                 )
             )
             yield record
         else:
             self._message_repository.emit_message(stream_data)
-
-        if self._max_records_limit:
-            total_record_counter += 1
🧹 Nitpick comments (3)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (2)

1-1: Copyright year should be updated to 2025

The copyright notice shows 2024, but based on the current date (August 2025), it should be updated to 2025 to match the other files in this PR.

-# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

7-8: Consider a more robust approach for resetting global state in tests

While importing the module to reset total_record_counter works, it introduces tight coupling between tests and implementation details. Have you considered using a context manager or fixture to handle the global state reset more cleanly? This would make the tests more maintainable and less fragile to implementation changes. wdyt?

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

327-409: Consider extracting partition factory creation to reduce duplication

The pattern of creating DeclarativePartitionFactory with the same parameters (stream_name, json_schema, retriever, message_repository, max_records_limit) is repeated 5 times throughout this method. Would it be cleaner to extract this into a helper method to follow DRY principles? wdyt?

+def _create_partition_factory(
+    self,
+    stream: DeclarativeStream,
+    retriever: Retriever,
+) -> DeclarativePartitionFactory:
+    return DeclarativePartitionFactory(
+        stream_name=stream.name,
+        json_schema=stream.get_json_schema(),
+        retriever=retriever,
+        message_repository=self.message_repository,
+        max_records_limit=self._limits.max_records if self._limits else None,
+    )

 # Then use it in the method:
-partition_factory=DeclarativePartitionFactory(
-    stream_name=declarative_stream.name,
-    json_schema=declarative_stream.get_json_schema(),
-    retriever=retriever,
-    message_repository=self.message_repository,
-    max_records_limit=self._limits.max_records
-    if self._limits
-    else None,
-)
+partition_factory=self._create_partition_factory(declarative_stream, retriever)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d1548c0 and acc2e1f.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (9 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1 hunks)
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (6 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1 hunks)
  • unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (5 hunks)
✅ Files skipped from review due to trivial changes (2)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: the custompageincrement class in unit_tests/source_declarative_manifest/resources/source_the_guardia...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.

Applied to files:

  • unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
📚 Learning: when modifying the `yamldeclarativesource` class in `airbyte_cdk/sources/declarative/yaml_declarativ...
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.

Applied to files:

  • unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: the files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from ...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: preview_docs
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (4)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (2)

78-109: LGTM! Well-structured test for max records limit

The test effectively validates that the partition reader stops after reaching the configured maximum record limit. Good use of descriptive test data with F1 driver names! 🏎️


111-143: LGTM! Good coverage of limit persistence

The test properly verifies that the global record limit persists across multiple reads of the same partition, which is crucial for preventing unexpected behavior.

airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)

16-19: Thread safety concern with global record counter

I wasn’t able to find any concurrent calls to DeclarativePartition.read() in the codebase, so it’s unclear whether total_record_counter might be updated from multiple threads. Could you confirm if all test read operations run single-threaded? If there’s any chance of parallelism, we should guard increments with a threading.Lock (or use an atomic counter) to prevent race conditions—wdyt?

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

70-82: LGTM! Well-structured TestLimits dataclass

The dataclass provides clear defaults and the __test__ class variable prevents pytest confusion. Good defensive programming!

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. I only have one concern left but I'll approve

@brianjlai brianjlai merged commit addd443 into main Aug 7, 2025
25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants