Skip to content

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented Aug 11, 2025

…s (#699)"

This reverts commit addd443.

What

It appears that tests are failing on main (example and example) since merging the Connector Builder using ConcurrentDeclarativeSource stuff. The tests were passing on the PR though and they are working for me locally on main so I assume there is some flakiness here. I tried understanding the diff here but the *** notation does not make much sense to me. I won't have time to check this before going to PTO so if this comes out as being a blocker, we can ignore the tests for now until @brianjlai and I come back from vacations. I suspect the whole test_concurrent_declarative_source.py file is at risk based on the two examples above

Summary by CodeRabbit

  • New Features
    • Introduced default test-read limits (records, pages per slice, slices, streams).
    • Streamlined source creation based on manifest with configurable limiting.
  • Bug Fixes
    • Enforced hard record-limit during test reads to prevent over-processing.
    • Improved error handling and cleanup when concurrent partitions complete or fail.
  • Refactor
    • Simplified concurrency flow and partition processing; removed legacy queue and limit mechanisms.
    • Consolidated configuration through a component factory for more consistent behavior.
  • Tests
    • Updated tests to reflect new source creation flow and limits.
    • Added coverage for record-limit enforcement and concurrent completion/error scenarios.

Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@revert_concurrent_changes#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch revert_concurrent_changes

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

Copy link

PyTest Results (Fast)

3 696 tests  +2   3 685 ✅ +2   6m 29s ⏱️ -3s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit a82f584. ± Comparison against base commit addd443.

This pull request removes 4 and adds 6 tests. Note that renamed tests count towards both.
unit_tests.sources.declarative.stream_slicers.test_declarative_partition_generator.StreamSlicerPartitionGeneratorTest ‑ test_max_records_reached_on_previous_partition
unit_tests.sources.declarative.stream_slicers.test_declarative_partition_generator.StreamSlicerPartitionGeneratorTest ‑ test_max_records_reached_stops_reading
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_close_slice_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_read_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_default_record_limit[test_create_request_no_record_limit]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_default_record_limit[test_create_request_no_record_limit_n_records_exceed_max]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_record_limit[test_create_request_record_limit_exceeds_max]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_record_limit[test_create_request_with_record_limit]
unit_tests.sources.streams.concurrent.test_concurrent_read_processor.TestConcurrentReadProcessor ‑ test_given_exception_on_partition_complete_sentinel_then_yield_error_trace_message_and_stream_is_incomplete
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_when_process_partition_then_queue_records_and_exception_and_sentinel

Copy link
Contributor

coderabbitai bot commented Aug 11, 2025

📝 Walkthrough

Walkthrough

Refactors connector builder to create ManifestDeclarativeSource via ModelToComponentFactory with new TestLimits defaults. Simplifies/consolidates concurrent reading: removes external queue, PartitionLogger, cursor argument from PartitionReader, and ConcurrentMessageRepository. Drops per-slice/page/record limits in declarative partitioning. Updates tests and imports accordingly.

Changes

Cohort / File(s) Summary
Connector Builder API shift
airbyte_cdk/connector_builder/connector_builder_handler.py, airbyte_cdk/connector_builder/main.py
Adds TestLimits dataclass and default constants; updates get_limits; create_source now returns ManifestDeclarativeSource using ModelToComponentFactory; removes catalog/state and concurrent source path; minor serialization refactor.
Message grouping limits
airbyte_cdk/connector_builder/test_reader/message_grouper.py
Enforces record hard limit in get_message_groups loop condition.
Test reader helpers cleanup
airbyte_cdk/connector_builder/test_reader/helpers.py
Removes convert_state_blob_to_mapping; adjusts state handling; drops related imports.
Concurrent source internals
airbyte_cdk/sources/concurrent_source/concurrent_source.py, .../concurrent_read_processor.py, .../streams/concurrent/partition_reader.py, .../streams/concurrent/partitions/types.py
Localizes queue to read(); removes external queue API and PartitionLogger; PartitionReader.process_partition drops cursor param; updates completion/error handling; removes AirbyteMessage from QueueItem union; centralizes cursor close/error in processor.
Declarative source refactor
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Switches to component_factory-based construction; removes limits/TestLimits, queue wiring, and limit params; updates DeclarativePartitionFactory calls to simplified signature.
Component factory cleanup
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Removes MAX_SLICES constant.
Partition generator simplification
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py, .../stream_slicers/stream_slicer_test_read_decorator.py
Removes max-record/slice-limit mechanics and test decorator; simplifies constructors (factory/partition/generator); adjusts typing imports.
Message repository removal
airbyte_cdk/sources/message/concurrent_repository.py
Deletes ConcurrentMessageRepository class/file.
Minor doc cleanup
airbyte_cdk/sources/utils/slice_logger.py
Removes comment block; no functional change.
Unit tests: connector builder
unit_tests/connector_builder/test_connector_builder_handler.py, unit_tests/connector_builder/test_message_grouper.py
Update to ManifestDeclarativeSource and new defaults; add record-limit tests; adjust mocks/fixtures and call patterns.
Unit tests: declarative stream slicers/retrievers
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py, unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py, unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py, unit_tests/sources/declarative/retrievers/test_simple_retriever.py
Align with removed max-record limits and new constructors; migrate to ModelToComponentFactory where needed; drop unused imports/tests.
Unit tests: concurrent runtime
unit_tests/sources/streams/concurrent/test_partition_reader.py, unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py, unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py
Update for PartitionReader API and completion semantics; add cursor close/error assertions; adjust ConcurrentSource construction to new signatures.

Sequence Diagram(s)

sequenceDiagram
  participant Client as Connector Builder
  participant Handler as connector_builder_handler
  participant Factory as ModelToComponentFactory
  participant Source as ManifestDeclarativeSource

  Client->>Handler: handle_request(config)
  Handler->>Handler: get_limits(config) -> TestLimits
  Handler->>Factory: instantiate (disable retries/cache, limits)
  Handler->>Source: create(source_config, component_factory=Factory)
  Source-->>Client: read grouped messages
Loading
sequenceDiagram
  participant Main as ConcurrentSource.read
  participant Enq as PartitionEnqueuer
  participant Reader as PartitionReader
  participant Proc as ConcurrentReadProcessor

  Main->>Main: create local Queue
  Main->>Enq: start generating partitions -> Queue
  loop per partition
    Main->>Reader: submit process_partition(partition)
  end
  Reader-->>Main: records, then PartitionCompleteSentinel
  Proc->>Main: on_partition_complete_sentinel (close cursor, emit done/error)
  Main-->>Main: consume Queue until done
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

chore

Suggested reviewers

  • brianjlai
  • lmossman
  • natikgadzhi

Would you like to also add a short migration note for downstream users about the removed ConcurrentMessageRepository and updated PartitionReader/ConcurrentSource signatures, wdyt?

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch revert_concurrent_changes

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (2)
airbyte_cdk/sources/concurrent_source/concurrent_source.py (2)

134-147: Critical: read() can block indefinitely; timeout_seconds is no longer honored

The loop uses a blocking queue.get() without a timeout, so if workers deadlock or fail to emit sentinels, the main thread can hang indefinitely. This also makes the timeout_seconds parameter (documented in init) effectively unused. Could we restore timeout semantics by using queue.get(timeout=self._timeout_seconds) and handling Empty, wdyt?

@@
-    def _consume_from_queue(
+    def _consume_from_queue(
         self,
         queue: Queue[QueueItem],
         concurrent_stream_processor: ConcurrentReadProcessor,
     ) -> Iterable[AirbyteMessage]:
-        while airbyte_message_or_record_or_exception := queue.get():
-            yield from self._handle_item(
-                airbyte_message_or_record_or_exception,
-                concurrent_stream_processor,
-            )
-            if concurrent_stream_processor.is_done() and queue.empty():
-                # all partitions were generated and processed. we're done here
-                break
+        while True:
+            try:
+                item = queue.get(timeout=self._timeout_seconds)
+            except Empty:
+                self._logger.warning(
+                    f"No items received from worker threads for {self._timeout_seconds}s; stopping read()."
+                )
+                break
+            yield from self._handle_item(item, concurrent_stream_processor)
+            if concurrent_stream_processor.is_done() and queue.empty():
+                # all partitions were generated and processed. we're done here
+                break

And import Empty:

-from queue import Queue
+from queue import Queue, Empty

84-86: Enforce timeout_seconds on queue.get to match the docstring

The docstring promises that if no record is read within timeout_seconds, the source will stop reading—but in
airbyte_cdk/sources/concurrent_source/concurrent_source.py (around line 139) we still call

while airbyte_message_or_record_or_exception := queue.get():
    …

with no timeout. Could we update it along these lines?

  • Wrap the call in a try/except to apply the timeout:
    from queue import Empty
    
    try:
        item = queue.get(timeout=self._timeout_seconds)
    except Empty:
        break
    while item:
        yield from self._handle_item(item)
        …
  • Make sure any loop around queue.get() stops cleanly on timeout.
  • Update the concurrent-queue unit tests (test_partition_enqueuer.py/test_partition_reader.py) so they either mock get(timeout=…) or catch/ignore the Empty exception as needed.

This will realign implementation with the parameter docs—wdyt?

🧹 Nitpick comments (4)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)

27-38: Clarify docstring wording and fix typos for accuracy

Would you be open to tightening the docstring to reflect that we enqueue records and then a completion sentinel, and fix the typo “when”→“went”, plus make the threading note explicit, wdyt?

 def process_partition(self, partition: Partition) -> None:
-    """
-    Process a partition and put the records in the output queue.
-    When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
-
-    If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
-    main thread will have no way to know that something when wrong and will wait until the timeout is reached
-
-    This method is meant to be called from a thread.
-    :param partition: The partition to read data from
-    :return: None
-    """
+    """
+    Read a partition and put its records onto the output queue.
+    When all records from the partition are enqueued, a completion sentinel is added to signal partition completion.
+
+    If an exception is encountered, it is caught and put on the queue so the main thread is notified; otherwise it would wait until the timeout is reached.
+
+    This method is intended to be called from a worker thread.
+    :param partition: The partition to read data from.
+    :return: None
+    """

39-45: Always emit the completion sentinel; current logic looks good—tiny readability tweak?

The try/except ensures we always enqueue a completion sentinel (success or failure). For readability, would you consider using False directly instead of not self._IS_SUCCESSFUL, wdyt?

-        except Exception as e:
-            self._queue.put(StreamThreadException(e, partition.stream_name()))
-            self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
+        except Exception as e:
+            self._queue.put(StreamThreadException(e, partition.stream_name()))
+            self._queue.put(PartitionCompleteSentinel(partition, False))
airbyte_cdk/sources/concurrent_source/concurrent_source.py (2)

100-105: Nit: comment grammar and clarity around queue maxsize

Would you mind tightening this comment for clarity (typos and subject/verb agreement), wdyt?

-        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
-        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
-        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
-        # information and might even need to be configurable depending on the source
+        # We set a maxsize so the main thread is forced to process items when the queue grows.
+        # This assumes there are fewer threads generating partitions than the total number of workers; otherwise, generator threads could
+        # fill the queue. The 10_000 limit is an initial heuristic and may need adjustment or configurability per source.

31-35: Nit: typo in class docstring

Tiny typo: “w ere” → “were”. Want to patch it while we’re here, wdyt?

-    The read is done when all partitions for all streams w ere generated and read.
+    The read is done when all partitions for all streams were generated and read.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between addd443 and a82f584.

📒 Files selected for processing (23)
  • airbyte_cdk/connector_builder/connector_builder_handler.py (3 hunks)
  • airbyte_cdk/connector_builder/main.py (1 hunks)
  • airbyte_cdk/connector_builder/test_reader/helpers.py (2 hunks)
  • airbyte_cdk/connector_builder/test_reader/message_grouper.py (1 hunks)
  • airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2 hunks)
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py (5 hunks)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (6 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (0 hunks)
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (3 hunks)
  • airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (1 hunks)
  • airbyte_cdk/sources/message/concurrent_repository.py (0 hunks)
  • airbyte_cdk/sources/streams/concurrent/partition_reader.py (2 hunks)
  • airbyte_cdk/sources/streams/concurrent/partitions/types.py (1 hunks)
  • airbyte_cdk/sources/utils/slice_logger.py (0 hunks)
  • unit_tests/connector_builder/test_connector_builder_handler.py (17 hunks)
  • unit_tests/connector_builder/test_message_grouper.py (1 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (9 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1 hunks)
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (2 hunks)
  • unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (4 hunks)
  • unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1 hunks)
  • unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (5 hunks)
  • unit_tests/sources/streams/concurrent/test_partition_reader.py (3 hunks)
💤 Files with no reviewable changes (3)
  • airbyte_cdk/sources/utils/slice_logger.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • airbyte_cdk/sources/message/concurrent_repository.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧬 Code Graph Analysis (16)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)
airbyte_cdk/models/airbyte_protocol.py (1)
  • AirbyteMessage (79-88)
unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1)
unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_source_builder.py (2)
  • streams (65-79)
  • NeverLogSliceLogger (174-176)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (2)
airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py (1)
  • StreamSlicer (27-39)
airbyte_cdk/sources/types.py (1)
  • StreamSlice (75-169)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (3)
airbyte_cdk/sources/types.py (1)
  • Record (21-72)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
  • Partition (11-48)
airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
  • PartitionGenerationCompletedSentinel (9-24)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (9)
airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (1)
  • submit (45-46)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
  • stream_name (36-41)
airbyte_cdk/sources/concurrent_source/stream_thread_exception.py (2)
  • stream_name (12-13)
  • exception (16-17)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (2)
  • cursor (87-90)
  • name (54-57)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py (1)
  • close_partition (50-51)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py (1)
  • close_partition (75-80)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
  • close_partition (63-67)
  • close_partition (114-115)
  • close_partition (239-247)
airbyte_cdk/sources/file_based/stream/concurrent/cursor/abstract_concurrent_file_based_cursor.py (1)
  • close_partition (33-33)
airbyte_cdk/utils/traced_exception.py (3)
  • AirbyteTracedException (25-145)
  • from_exception (103-121)
  • as_sanitized_airbyte_message (123-145)
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
  • ModelToComponentFactory (626-4136)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)
airbyte_cdk/sources/types.py (1)
  • partition (99-104)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
  • Partition (11-48)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
  • DeclarativePartition (42-81)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
  • close_partition (159-184)
airbyte_cdk/connector_builder/main.py (1)
airbyte_cdk/connector_builder/connector_builder_handler.py (1)
  • create_source (78-93)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (4)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
  • close_partition (63-67)
  • close_partition (114-115)
  • close_partition (239-247)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)
  • on_partition (89-102)
  • on_partition_complete_sentinel (104-134)
airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
  • PartitionGenerationCompletedSentinel (9-24)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
  • PartitionCompleteSentinel (14-30)
unit_tests/connector_builder/test_message_grouper.py (2)
airbyte_cdk/connector_builder/test_reader/reader.py (2)
  • TestReader (39-459)
  • run_test_read (84-157)
unit_tests/connector_builder/utils.py (1)
  • create_configured_catalog (26-27)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (1)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
  • DeclarativePartitionFactory (14-39)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (3)
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py (1)
  • stream_slice (88-89)
airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py (1)
  • StreamSlicer (27-39)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py (1)
  • StreamSlicer (15-25)
unit_tests/sources/streams/concurrent/test_partition_reader.py (4)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)
  • PartitionReader (14-45)
  • process_partition (27-45)
unit_tests/sources/streams/concurrent/test_partition_enqueuer.py (1)
  • _consume_queue (106-113)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
  • PartitionCompleteSentinel (14-30)
airbyte_cdk/sources/concurrent_source/stream_thread_exception.py (1)
  • exception (16-17)
airbyte_cdk/sources/concurrent_source/concurrent_source.py (4)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
  • PartitionReader (14-45)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)
  • ConcurrentReadProcessor (30-255)
  • is_done (210-235)
airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py (1)
  • PartitionEnqueuer (16-64)
airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (1)
  • is_done (81-82)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (6)
unit_tests/connector_builder/test_connector_builder_handler.py (1)
  • name (817-818)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (2)
  • name (54-57)
  • get_json_schema (68-71)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (2)
  • name (625-626)
  • get_json_schema (658-659)
unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1)
  • message_repository (91-92)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
  • message_repository (268-269)
airbyte_cdk/sources/abstract_source.py (1)
  • message_repository (314-315)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)
🔇 Additional comments (42)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)

14-14: Import cleanup confirmed — safe to merge.

Running the rg scan across unit_tests shows many valid uses of AirbyteRecordMessage elsewhere, so removing it from test_simple_retriever.py is isolated and won’t introduce breakages. Approving this import change here, wdyt?

airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (2)

7-7: LGTM: broadened typing imports for alignment

Broadening typing imports to align with updated type definitions across the PR looks good; no runtime impact here. Thanks for keeping the types consistent, wdyt?


10-10: Optional Enhancement: support unlimited slices via None and guard against negative limits?

I searched all instantiations of StreamSlicerTestReadDecorator (in tests and in model_to_component_factory.py) and found that every call passes a positive integer—no negative values or None usages detected. That means we can safely update the decorator’s signature to accept Optional[int] = None and add a runtime check, for example:

 from dataclasses import dataclass
 from itertools import islice
 from typing import Any, Iterable, Optional
 from airbyte_cdk.sources.types import StreamSlice

 @dataclass
 class StreamSlicerTestReadDecorator(StreamSlicer):
     wrapped_slicer: StreamSlicer
-    maximum_number_of_slices: int
+    maximum_number_of_slices: Optional[int] = None  # None ⇒ no limit

     def stream_slices(self) -> Iterable[StreamSlice]:
+        if self.maximum_number_of_slices is None:
+            return self.wrapped_slicer.stream_slices()
+        if self.maximum_number_of_slices < 0:
+            raise ValueError("maximum_number_of_slices must be non-negative or None")
         return islice(self.wrapped_slicer.stream_slices(), self.maximum_number_of_slices)

This preserves existing behavior (all current callers pass a positive int) while enabling an “unlimited” mode. wdyt?

airbyte_cdk/connector_builder/test_reader/helpers.py (1)

8-8: LGTM! Clean simplification of state handling.

The removal of the Union import and the direct assignment of latest_state_message as a dict aligns well with the broader simplification effort. This eliminates unnecessary state conversion boilerplate, wdyt?

Also applies to: 469-469

airbyte_cdk/connector_builder/test_reader/message_grouper.py (1)

98-98: Nice simplification of the loop condition!

The cleaner while loop condition maintains the same behavior while improving readability. The precedence is clear: check the limit first, then try to get the next message.

unit_tests/connector_builder/test_message_grouper.py (1)

310-428: Excellent test coverage for record limit behavior!

The parameterized tests effectively cover both edge cases and normal flow:

  • Validation that exceeding max_record_limit raises ValueError
  • Proper counting of records across pages
  • Correct test_read_limit_reached flag behavior

The test structure is clean and assertions are comprehensive.

airbyte_cdk/connector_builder/main.py (1)

94-99: Good alignment with the updated create_source signature!

The switch to positional arguments correctly matches the new create_source(config, limits) signature. The formatting change with the type ignore comment on the decode() line is cleaner too.

airbyte_cdk/connector_builder/connector_builder_handler.py (2)

38-46: Nice dataclass design with pytest compatibility!

The __test__: ClassVar[bool] = False is a thoughtful addition to prevent pytest from mistaking this for a test class. The default values provide sensible limits for testing scenarios.


78-93: Clean refactor to ManifestDeclarativeSource!

The switch from ConcurrentDeclarativeSource to ManifestDeclarativeSource with ModelToComponentFactory is well-executed. The component factory configuration properly enforces test limits and disables retries/cache for predictable test behavior. This should resolve the flakiness issues mentioned in the PR description, wdyt?

unit_tests/connector_builder/test_connector_builder_handler.py (9)

19-27: Import changes look good!

The new imports for the DEFAULT constants and the switch to ManifestDeclarativeSource align well with the refactoring objective. The constants provide clear defaults for test limits.


533-537: Source instantiation updated correctly

The change from ConcurrentDeclarativeSource to ManifestDeclarativeSource with source_config=MANIFEST follows the new pattern nicely.


682-688: Mock class naming is consistent

Good job updating the mock class name to MockManifestDeclarativeSource to reflect the new source type being tested.


820-842: Test mock properly updated

The mock class for error testing has been correctly renamed to MockManifestDeclarativeSource, maintaining consistency throughout the test file.


891-896: ConfiguredAirbyteCatalogSerializer usage looks good

The catalog serialization and mock assertion update to assert_called_once() are appropriate changes. Just wondering - should we verify the actual call arguments to ensure the catalog is being passed correctly? wdyt?


973-983: Default constants properly integrated

Excellent use of the imported DEFAULT constants instead of hardcoded values. This makes the tests more maintainable and consistent with the production defaults.


1010-1016: Source creation and validation correct

The create_source function call and subsequent isinstance check for ManifestDeclarativeSource properly validate the source creation. The assertions on the internal constructor properties ensure the limits are applied correctly.


1269-1276: Error type expectations remain unchanged

The expected error strings "AirbyteTracedException" and "InvalidSchema" are still valid for the refactored code, which is good for maintaining backward compatibility in error handling.


1392-1393: Dynamic stream manifest handling updated

The ManifestDeclarativeSource instantiation with DYNAMIC_STREAM_MANIFEST shows that dynamic streams are properly supported in the new implementation.

unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1)

52-54: LGTM! Constructor signature update is consistent with the API changes.

The change from keyword arguments to positional arguments aligns with the broader refactoring of ConcurrentSource's constructor signature throughout the codebase.

airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)

36-38: Clean architectural simplification by removing AirbyteMessage from QueueItem.

This change effectively narrows the queue's responsibility to focus solely on partition-related items and exceptions, which improves type safety and makes the concurrent processing flow clearer. The removal of AirbyteMessage from the union aligns well with the broader refactoring to simplify concurrent stream processing, wdyt?

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)

102-102: Good separation of concerns by removing cursor from partition processing.

The simplified interface makes PartitionReader focused solely on reading data, while cursor management remains in the processor where it logically belongs.


115-134: Excellent error handling pattern with try/except/finally!

This change brings several improvements:

  • Ensures partition cleanup happens even when cursor.close_partition raises an exception
  • Properly flags exceptions for stream status tracking
  • Guarantees message queue draining in all scenarios
  • The sentinel.is_successful check prevents unnecessary operations on failed partitions

The error handling is now more robust and centralized. Nice work!

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (4)

182-183: Test correctly updated to match the new process_partition signature.

Good to see the test assertions properly reflecting the removal of the cursor parameter.


256-256: Good addition of close_partition assertion.

This ensures the partition is properly closed when messages are emitted from the repository, wdyt?


307-354: Excellent test coverage for error handling in close_partition!

This new test case thoroughly validates the error handling path when cursor.close_partition raises an exception. It properly verifies:

  • Error trace message is yielded
  • Stream status is marked as INCOMPLETE
  • The partition is still removed from the running set (via the finally block)

Great addition to ensure robustness of the error handling flow!


740-740: Important edge case coverage.

Good to verify that close_partition is skipped when the partition processing was not successful. This prevents unnecessary operations on failed partitions.

unit_tests/sources/streams/concurrent/test_partition_reader.py (3)

1-3: Copyright header addition.

Good to see the copyright header being added to maintain consistency across the codebase.


29-29: Constructor correctly simplified.

The test properly reflects the simplified PartitionReader API that only requires a queue parameter.


49-62: Good adaptation of exception handling test.

The test correctly validates the new in-band error signaling mechanism where exceptions are communicated via StreamThreadException in the queue, followed by a PartitionCompleteSentinel. This ensures proper error propagation in the concurrent processing flow.

airbyte_cdk/sources/concurrent_source/concurrent_source.py (1)

62-68: Local per-read queue wiring LGTM

Creating a local Queue per read() and wiring PartitionEnqueuer/PartitionReader to it improves encapsulation and avoids shared-state leaks. Looks good to me.

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)

3617-3618: LGTM! Consistent API updates across all test cases.

The updates to use the new 5-argument positional form for DeclarativePartition are consistent throughout the test file. This aligns well with the removal of max_records_limit from the partition generator, wdyt?

unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (2)

9-11: Clean import structure!

The direct import of DeclarativePartitionFactory improves readability. Also, nice cleanup removing the unused Record import.

Also applies to: 14-14


35-40: Consistent factory instantiation pattern.

All DeclarativePartitionFactory instantiations correctly use the new 4-argument signature. The removal of the max-records tests aligns with the broader simplification of the partition generation logic.

Also applies to: 50-55, 68-73

unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (2)

11-16: Clean migration to ModelToComponentFactory pattern.

The removal of TestLimits and introduction of ModelToComponentFactory aligns with the broader architectural shift towards factory-based configuration.


361-363: Great documentation for the cache disabling!

The comment clearly explains why disable_cache=True is necessary. This helps prevent subtle test interdependencies that could be hard to debug. Would it make sense to consider making this the default for test environments, wdyt?

airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (3)

14-39: Elegant simplification of the factory pattern!

The removal of max_records_limit streamlines the factory interface nicely. The switch to positional arguments in the create method (lines 33-39) is consistent with the constructor changes. Great job keeping the thread-safety documentation intact - it's valuable context, wdyt?


42-73: Clean removal of record limiting logic.

The read() method is now much simpler without the max_records gating. This should improve performance by removing unnecessary counter checks on each record. The unconditional processing aligns well with moving limit handling to a higher level in the architecture.


84-94: Streamlined partition generator without test decorators.

Nice simplification! Removing the StreamSlicerTestReadDecorator wrapper and slice_limit parameter makes the flow more direct and easier to understand. The generator now has a single responsibility - creating partitions from stream slices.

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)

6-6: Clean import list after Queue removal.

The removal of Queue from imports reflects the architectural shift away from manual queue management.


73-99: Well-structured factory-based initialization!

The introduction of component_factory parameter with a sensible default (lines 85-90) provides good flexibility. The comment about RFR (Resumable Full Refresh) and synthetic cursors (lines 81-84) provides valuable context for the disable_resumable_full_refresh=True setting.

One question: Should we consider adding validation to ensure the provided component_factory has the expected configuration for concurrent sources, wdyt?


121-127: Simplified ConcurrentSource instantiation.

The removal of the queue parameter makes the instantiation cleaner. Using self.message_repository directly is more straightforward than the previous queue-based approach.


280-285: Consistent factory instantiation pattern throughout!

All DeclarativePartitionFactory instantiations consistently use the new 4-argument positional form. This uniformity makes the codebase more maintainable. The removal of max_records_limit and slice_limit parameters aligns with moving these concerns to the component factory level. Great refactoring work!

Also applies to: 311-316, 341-346, 401-406

Copy link

PyTest Results (Full)

3 699 tests  +2   3 688 ✅ +3   11m 47s ⏱️ +8s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌  - 1 

Results for commit a82f584. ± Comparison against base commit addd443.

This pull request removes 4 and adds 6 tests. Note that renamed tests count towards both.
unit_tests.sources.declarative.stream_slicers.test_declarative_partition_generator.StreamSlicerPartitionGeneratorTest ‑ test_max_records_reached_on_previous_partition
unit_tests.sources.declarative.stream_slicers.test_declarative_partition_generator.StreamSlicerPartitionGeneratorTest ‑ test_max_records_reached_stops_reading
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_close_slice_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_read_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_default_record_limit[test_create_request_no_record_limit]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_default_record_limit[test_create_request_no_record_limit_n_records_exceed_max]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_record_limit[test_create_request_record_limit_exceeds_max]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_record_limit[test_create_request_with_record_limit]
unit_tests.sources.streams.concurrent.test_concurrent_read_processor.TestConcurrentReadProcessor ‑ test_given_exception_on_partition_complete_sentinel_then_yield_error_trace_message_and_stream_is_incomplete
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_when_process_partition_then_queue_records_and_exception_and_sentinel

@maxi297 maxi297 changed the title Revert "fix(connector-builder): remerge concurrent cdk builder change… fix: revert remerge concurrent cdk builder change because of flaky test Aug 11, 2025
@lmossman lmossman merged commit 1c9049a into main Aug 11, 2025
26 of 28 checks passed
@lmossman lmossman deleted the revert_concurrent_changes branch August 11, 2025 19:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants