Skip to content

Conversation

lmossman
Copy link
Contributor

@lmossman lmossman commented Aug 5, 2025

Reverts the following 3 PRs, since they seem to be causing issues with builder connectors (see slack thread)

Summary by CodeRabbit

  • Refactor

    • Simplified concurrency and partition management by removing internal logging, cursor handling, and queue injection options.
    • Streamlined component factory and limit configuration for source creation.
    • Removed internal message ordering repository and related logic.
  • Bug Fixes

    • Improved error handling during partition completion to yield trace messages if exceptions occur.
  • Tests

    • Updated and removed tests to reflect the simplified concurrency and partition logic.
    • Added test coverage for error handling during partition completion.
  • Chores

    • Updated imports and removed unused code and comments for clarity and maintainability.

…equests through the concurrent CDK by reworking message routing and order through the queue and message repositories (#688)"

This reverts commit 209cb22.
@Copilot Copilot AI review requested due to automatic review settings August 5, 2025 00:31
Copy link

github-actions bot commented Aug 5, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@lmossman/revert-cdk-changes#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch lmossman/revert-cdk-changes

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR reverts builder concurrency and pagination URL changes by removing the concurrent framework components from test files and connector builder handling, reverting to the previous synchronous implementation for testing purposes.

  • Removes concurrent processing components including ConcurrentMessageRepository, PartitionLogger, and related concurrency infrastructure
  • Simplifies PartitionReader to no longer handle cursor operations during partition processing
  • Reverts connector builder to use ManifestDeclarativeSource instead of ConcurrentDeclarativeSource for test operations

Reviewed Changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
test_partition_reader.py Removes cursor-related test cases and simplifies partition processing tests
test_concurrent_read_processor.py Adds cursor close_partition calls and error handling tests
stream_facade_builder.py Simplifies ConcurrentSource constructor call to use positional arguments
test_dynamic_schema_loader.py Replaces TestLimits with ModelToComponentFactory for cache disabling
test_http_requester.py Removes test cases for URL path handling with different URL patterns
test_connector_builder_handler.py Reverts from ConcurrentDeclarativeSource to ManifestDeclarativeSource usage
slice_logger.py Removes comment about concurrent CDK cleanup
partition_reader.py Removes PartitionLogger class and cursor operations from partition processing
concurrent_repository.py Completely removes ConcurrentMessageRepository implementation
stream_slicer_test_read_decorator.py Adds missing imports for proper type handling
declarative_partition_generator.py Removes slice_limit parameter and StreamSlicerTestReadDecorator usage
http_requester.py Simplifies URL joining logic
model_to_component_factory.py Removes MAX_SLICES constant
concurrent_declarative_source.py Removes TestLimits class and queue-based messaging infrastructure
concurrent_source.py Moves queue creation to read method and removes partition logger
concurrent_read_processor.py Adds cursor close_partition handling in partition completion
helpers.py Removes state blob conversion utility functions
main.py Simplifies source creation call
connector_builder_handler.py Moves TestLimits class here and reverts to ManifestDeclarativeSource
Comments suppressed due to low confidence (2)

unit_tests/connector_builder/test_connector_builder_handler.py:1269

  • The expected exception type has been changed from 'StreamThreadException' to 'AirbyteTracedException'. This change should be verified to ensure it aligns with the actual exception behavior after removing concurrent processing.
            "AirbyteTracedException",

unit_tests/connector_builder/test_connector_builder_handler.py:1275

  • The expected exception type has been changed from 'StreamThreadException' to 'InvalidSchema'. This change should be verified to ensure it aligns with the actual exception behavior after removing concurrent processing.
            "InvalidSchema",

Copy link
Contributor

coderabbitai bot commented Aug 5, 2025

📝 Walkthrough

Walkthrough

This change removes concurrency and queue management from the connector builder and related sources. It replaces ConcurrentDeclarativeSource with ManifestDeclarativeSource, eliminates test limit enforcement, and simplifies partition, cursor, and message repository logic. Test code and helper functions are updated to match these new, streamlined interfaces and behaviors.

Changes

Cohort / File(s) Change Summary
Connector Builder Handler Refactor
airbyte_cdk/connector_builder/connector_builder_handler.py, airbyte_cdk/connector_builder/main.py
Refactored to use ManifestDeclarativeSource instead of ConcurrentDeclarativeSource, removed concurrency/limit logic, updated imports, and simplified create_source signature and usage.
Test Limits and Concurrency Removal
airbyte_cdk/sources/declarative/concurrent_declarative_source.py, airbyte_cdk/sources/message/concurrent_repository.py
Removed TestLimits class, all concurrency and queue/message repository management, and related constructor parameters. Deleted ConcurrentMessageRepository class and its methods.
Partition and Cursor Simplification
airbyte_cdk/sources/streams/concurrent/partition_reader.py, airbyte_cdk/sources/streams/concurrent/partitions/types.py, airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py, airbyte_cdk/sources/concurrent_source/concurrent_source.py
Removed partition logger and cursor handling from partition reading and processing. Simplified queue management to be internal. Updated type aliases and removed direct queue injection. Enhanced error handling for partition completion.
Declarative Stream Slicer and Factory
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py, airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py, airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
Removed slice limit logic and related decorators. Deleted unused constants. Adjusted imports for clarity and type hinting.
Declarative HTTP Requester
airbyte_cdk/sources/declarative/requesters/http_requester.py
Refactored URL joining logic to a more concise ternary expression.
Connector Builder Test Updates
unit_tests/connector_builder/test_connector_builder_handler.py, airbyte_cdk/connector_builder/test_reader/helpers.py
Updated tests to use ManifestDeclarativeSource, removed state blob conversion helpers, adjusted imports, and updated test parameters and assertions for new source and limit handling.
Concurrent Source Test Updates
unit_tests/sources/streams/concurrent/test_partition_reader.py, unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py, unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py
Removed all cursor-related test logic, updated partition reader tests, added/adjusted tests for partition completion and error handling, and simplified constructor calls.
Declarative Source and Schema Loader Tests
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
Updated to use ModelToComponentFactory for disabling cache instead of TestLimits.
Declarative HTTP Requester Tests
unit_tests/sources/declarative/requesters/test_http_requester.py
Removed parameterized test for URL joining.
Slice Logger
airbyte_cdk/sources/utils/slice_logger.py
Removed a comment about future refactoring; no code changes.

Sequence Diagram(s)

sequenceDiagram
    participant Main as main.py
    participant Handler as connector_builder_handler.py
    participant Source as ManifestDeclarativeSource
    participant Factory as ModelToComponentFactory

    Main->>Handler: handle_request(config, limits)
    Handler->>Factory: Create ModelToComponentFactory(limits)
    Handler->>Source: Instantiate ManifestDeclarativeSource(factory)
    Source-->>Handler: Source instance
    Handler-->>Main: Serialized result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Possibly related PRs

  • airbytehq/airbyte-python-cdk#688: Introduced concurrency and queue management in connector builder, which this PR now removes—these are directly related but opposite in direction, wdyt?
  • airbytehq/airbyte-python-cdk#89: Modified concurrency logic in ConcurrentDeclarativeSource, while this PR removes concurrency limit and queue logic—both affect the same class, wdyt?
  • airbytehq/airbyte-python-cdk#455: Introduced and extended TestLimits, which this PR now removes and refactors—these changes are directly connected, wdyt?

Suggested labels

chore

Suggested reviewers

  • maxi297
  • brianjlai

Would you like to review any particular area in more detail, or should I focus on the concurrency removal and test updates, wdyt?

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch lmossman/revert-cdk-changes

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

68-93: Excellent constructor refactoring!

The optional component_factory parameter provides great flexibility while maintaining sensible defaults. The comment about disabling RFR with synthetic cursors helps explain the design decision.

One observation - the default factory disables resumable full refresh. Should we document this behavior change more prominently for users who might be expecting RFR to work with concurrent sources, wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between db1e4c0 and 62c5eab.

📒 Files selected for processing (20)
  • airbyte_cdk/connector_builder/connector_builder_handler.py (3 hunks)
  • airbyte_cdk/connector_builder/main.py (1 hunks)
  • airbyte_cdk/connector_builder/test_reader/helpers.py (2 hunks)
  • airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2 hunks)
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py (5 hunks)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (0 hunks)
  • airbyte_cdk/sources/declarative/requesters/http_requester.py (1 hunks)
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2 hunks)
  • airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (1 hunks)
  • airbyte_cdk/sources/message/concurrent_repository.py (0 hunks)
  • airbyte_cdk/sources/streams/concurrent/partition_reader.py (2 hunks)
  • airbyte_cdk/sources/streams/concurrent/partitions/types.py (1 hunks)
  • airbyte_cdk/sources/utils/slice_logger.py (0 hunks)
  • unit_tests/connector_builder/test_connector_builder_handler.py (17 hunks)
  • unit_tests/sources/declarative/requesters/test_http_requester.py (0 hunks)
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (2 hunks)
  • unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1 hunks)
  • unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (5 hunks)
  • unit_tests/sources/streams/concurrent/test_partition_reader.py (3 hunks)
💤 Files with no reviewable changes (4)
  • airbyte_cdk/sources/utils/slice_logger.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • airbyte_cdk/sources/message/concurrent_repository.py
  • unit_tests/sources/declarative/requesters/test_http_requester.py
🧰 Additional context used
🧠 Learnings (7)
📚 Learning: the files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from ...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
  • airbyte_cdk/sources/streams/concurrent/partitions/types.py
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
  • airbyte_cdk/connector_builder/test_reader/helpers.py
  • unit_tests/sources/streams/concurrent/test_partition_reader.py
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
  • unit_tests/connector_builder/test_connector_builder_handler.py
  • airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: when code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repositor...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.

Applied to files:

  • airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
  • airbyte_cdk/sources/streams/concurrent/partitions/types.py
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
  • airbyte_cdk/connector_builder/test_reader/helpers.py
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
  • unit_tests/connector_builder/test_connector_builder_handler.py
  • airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: when modifying the `yamldeclarativesource` class in `airbyte_cdk/sources/declarative/yaml_declarativ...
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.

Applied to files:

  • airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
  • unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py
  • airbyte_cdk/connector_builder/main.py
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
  • airbyte_cdk/connector_builder/test_reader/helpers.py
  • airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
  • unit_tests/connector_builder/test_connector_builder_handler.py
  • airbyte_cdk/sources/streams/concurrent/partition_reader.py
  • airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: the custompageincrement class in unit_tests/source_declarative_manifest/resources/source_the_guardia...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.

Applied to files:

  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
  • unit_tests/connector_builder/test_connector_builder_handler.py
  • airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: in the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict modu...
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.

Applied to files:

  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
  • unit_tests/connector_builder/test_connector_builder_handler.py
  • airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: in the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-g...
Learnt from: pnilan
PR: airbytehq/airbyte-python-cdk#0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.

Applied to files:

  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
📚 Learning: in the typetransformer class, the data being transformed comes from api responses or source systems,...
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#221
File: airbyte_cdk/sources/utils/transform.py:0-0
Timestamp: 2025-01-16T00:50:39.069Z
Learning: In the TypeTransformer class, the data being transformed comes from API responses or source systems, so only standard JSON-serializable types are expected. The python_to_json mapping covers all expected types, and it's designed to fail fast (KeyError) on unexpected custom types rather than providing fallbacks.

Applied to files:

  • airbyte_cdk/connector_builder/test_reader/helpers.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)
🔇 Additional comments (36)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)

37-37: LGTM! Simplification aligns with the revert.

Removing AirbyteMessage from the QueueItem union makes sense for reverting the concurrency changes. This simplifies the queue to handle only partition-related items and exceptions, which seems cleaner than mixing in raw Airbyte messages.

unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1)

53-53: Constructor calling convention updated - looks good!

The change from keyword to positional arguments for the ConcurrentSource constructor maintains the same argument order and values. This aligns well with simplifying the constructor interface as part of the revert. Nice and clean!

unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (2)

14-16: Import change looks good!

Switching from TestLimits to ModelToComponentFactory aligns well with the broader refactor to move away from internal limit management in ConcurrentDeclarativeSource.


361-363: Good test isolation improvement!

Using ModelToComponentFactory(disable_cache=True) instead of TestLimits() not only aligns with the refactor but also explicitly addresses the caching concern mentioned in your comment. This should help prevent test interference - nice attention to detail! wdyt about this approach vs other test isolation strategies?

airbyte_cdk/connector_builder/main.py (2)

94-94: Function call simplification looks clean!

The create_source call now only takes config and limits as positional arguments, which suggests a nice simplification in the function signature. This aligns well with the broader refactor to ManifestDeclarativeSource.


95-99: Minor formatting improvement!

Moving the type ignore comment to the specific .decode() call is more precise than applying it to the entire expression. Good attention to detail!

airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)

85-89: The removal of slice_limit is safe – no callers pass that parameter

I checked all instantiations of StreamSlicerPartitionGenerator in concurrent_declarative_source.py and found none include a slice_limit argument (and no other references to slice_limit exist). This means we won’t break any callers by dropping it. LGTM – thanks for cleaning this up! wdyt?

airbyte_cdk/connector_builder/test_reader/helpers.py (2)

8-8: Good cleanup of unused import.

Removing AirbyteStateBlob from the imports makes sense since the conversion logic was removed. Clean!


469-469: Redundant state conversion safely removed

We ran a full search and confirmed there are no remaining calls to convert_state_blob_to_mapping. Since this helper already receives an AirbyteStateMessage, using latest_state_message directly doesn’t change how state is passed in—it simply drops an unnecessary blob-to-mapping step.

• No convert_state_blob_to_mapping references found
• All AirbyteStateBlob usages remain valid and untouched elsewhere

Looks good to merge as-is—wdyt?

unit_tests/sources/streams/concurrent/test_partition_reader.py (2)

29-29: Nice simplification of the PartitionReader constructor.

Removing the cursor and logger parameters makes the API much cleaner. The test setup is more straightforward now.


32-32: Test method calls properly updated.

All the process_partition calls have been correctly updated to remove the cursor parameter. The test coverage still validates the essential behaviors - empty partitions, successful processing, and exception handling.

Also applies to: 43-43, 55-55

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)

102-102: Process partition call correctly simplified.

The removal of the cursor parameter from process_partition aligns perfectly with the PartitionReader simplification. Clean change!


115-134: Enhanced error handling around cursor closing looks solid.

The addition of proper exception handling around cursor.close_partition is a great improvement. The try/catch/finally structure ensures:

  • Exceptions during cursor closing are properly flagged and reported
  • Partition cleanup happens regardless of cursor errors
  • Stream completion logic still executes properly

The use of AirbyteTracedException.from_exception(...).as_sanitized_airbyte_message() for error reporting is the right approach here. Nice defensive coding! wdyt?

unit_tests/connector_builder/test_connector_builder_handler.py (7)

20-22: LGTM on importing the default constants!

Nice improvement to centralize these default values from the handler module instead of relying on TestLimits defaults. This makes the tests more consistent with the actual implementation, wdyt?


60-60: Source import updated correctly.

The switch from ConcurrentDeclarativeSource to ManifestDeclarativeSource aligns perfectly with the PR's goal of reverting concurrency changes.


533-533: All source instantiations updated consistently.

Great job consistently replacing ConcurrentDeclarativeSource with ManifestDeclarativeSource throughout the test file. The removal of catalog and state parameters aligns with the simplified function signatures mentioned in the summary.

Also applies to: 682-687, 694-694, 783-783, 820-842, 884-884, 948-948, 1010-1012, 1104-1104, 1152-1152, 1239-1239, 1335-1335, 1392-1392


973-975: Test parameters updated to use actual defaults.

Excellent improvement! Using the actual default constants instead of TestLimits defaults makes these parameterized tests more accurate and aligned with the real implementation behavior.

Also applies to: 980-982


891-891: Simplified assertion looks good.

The switch to assert_called_once() is cleaner than checking call_count == 1. Minor but nice improvement in test readability!

Also applies to: 896-896


1269-1270: Error expectations updated correctly.

Good catch updating the expected error strings to match the new error handling behavior. This ensures the tests remain accurate after the error handling changes.

Also applies to: 1275-1276


307-354: Great addition of error handling test coverage!

This new test method effectively covers the edge case where close_partition raises an exception. The test structure is solid with proper assertions for both the error trace and incomplete stream status. Nice improvement to the test coverage, wdyt?

airbyte_cdk/sources/concurrent_source/concurrent_source.py (5)

7-7: Import cleanup looks good.

Removing Optional from the typing imports and simplifying the PartitionReader import makes sense given the parameter removals. Clean and consistent!

Also applies to: 19-19


62-68: Queue parameter removal centralizes queue management.

The removal of the optional queue parameter simplifies the factory method and centralizes queue creation to the read method scope. This trades some injection flexibility for simpler queue management - seems like a reasonable design choice for this refactor, wdyt?


100-104: Well-reasoned local queue creation.

Creating the queue locally with a fixed maxsize of 10,000 is a good approach to prevent unbounded growth. The detailed comment explaining the rationale and assumptions is particularly helpful. Nice documentation of the design decision!


107-107: PartitionReader instantiation simplified correctly.

The removal of PartitionLogger from PartitionReader and using the local queue for PartitionEnqueuer aligns well with the broader refactor to simplify partition processing. Clean simplification!

Also applies to: 112-112


120-120: Queue consumption logic improved.

Passing the queue explicitly is necessary now that it's local, and reordering the condition to check is_done() first before queue.empty() is a nice logical optimization. Good attention to efficiency!

Also applies to: 144-144

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (3)

182-182: Process partition calls updated correctly.

Good job updating the process_partition calls to remove the expected_cursor argument. This keeps the tests aligned with the simplified PartitionReader interface after cursor handling was removed.

Also applies to: 207-207


256-256: Cursor closure assertions add valuable coverage.

Adding assert_called_once() for close_partition in these partition completion tests provides good coverage of the cursor closure behavior. Nice improvement to verify this critical step happens correctly!

Also applies to: 305-305, 382-382


307-354: Excellent error handling test addition!

This new test method provides comprehensive coverage of the error scenario when close_partition raises an exception. The test structure is well-designed with proper mocking, execution, and verification of both error trace and stream status messages. Really strengthens the error handling coverage, wdyt?

airbyte_cdk/sources/streams/concurrent/partition_reader.py (3)

1-11: LGTM! Clean import removal.

The removal of the PartitionLogger import aligns well with the simplified partition processing approach.


21-25: Constructor simplification looks good!

The removal of the partition_logger parameter streamlines the class interface nicely.


27-45: Excellent simplification of partition processing!

The removal of cursor handling from this method creates a cleaner separation of concerns. The preserved exception handling ensures thread safety is maintained. This aligns perfectly with the PR's goal of removing concurrency management complexity.

airbyte_cdk/connector_builder/connector_builder_handler.py (4)

27-31: Nice use of constants for default values!

Moving these to module-level constants improves maintainability. Have you considered if these values might need to be configurable via environment variables in the future, wdyt?


38-46: Great dataclass conversion!

The transition to dataclass is clean and the __test__ ClassVar is a thoughtful addition to prevent pytest confusion. The field defaults align perfectly with the module constants.


48-56: Clean refactoring of get_limits!

The use of or for default fallback values is more pythonic and concise.


78-93: All create_source callers now aligned with new signature

I’ve verified that every invocation of create_source has been updated to use create_source(config, limits) only:

  • airbyte_cdk/connector_builder/main.py
  • unit_tests/connector_builder/test_connector_builder_handler.py

No leftover calls to the old signature remain. Looks good to me—shall we merge? wdyt?

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

6-6: Clean import simplification!

Removing Queue from imports aligns with the removal of internal queue management.

@lmossman lmossman changed the title revert: builder concurrency and pagination url changes fix: revert builder concurrency and pagination url changes Aug 5, 2025
Copy link

github-actions bot commented Aug 5, 2025

PyTest Results (Fast)

3 695 tests   - 4   3 684 ✅  - 4   6m 29s ⏱️ ±0s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 62c5eab. ± Comparison against base commit db1e4c0.

This pull request removes 6 and adds 2 tests. Note that renamed tests count towards both.
unit_tests.sources.declarative.requesters.test_http_requester ‑ test_join_url_with_url_and_path[test_no_path-https://airbyte.io/my_endpoint-None-https://airbyte.io/my_endpoint]
unit_tests.sources.declarative.requesters.test_http_requester ‑ test_join_url_with_url_and_path[test_path_does_include_url-https://airbyte.io/my_endpoint-https://airbyte.io/my_endpoint/with_path-https://airbyte.io/my_endpoint/with_path]
unit_tests.sources.declarative.requesters.test_http_requester ‑ test_join_url_with_url_and_path[test_path_does_not_include_url-https://airbyte.io/my_endpoint-with_path-https://airbyte.io/my_endpoint/with_path]
unit_tests.sources.declarative.requesters.test_http_requester ‑ test_join_url_with_url_and_path[test_path_is_different_full_url-https://airbyte.io/my_endpoint-https://airbyte-paginated.io/my_paginated_endpoint-https://airbyte-paginated.io/my_paginated_endpoint]
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_close_slice_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_read_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.sources.streams.concurrent.test_concurrent_read_processor.TestConcurrentReadProcessor ‑ test_given_exception_on_partition_complete_sentinel_then_yield_error_trace_message_and_stream_is_incomplete
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_when_process_partition_then_queue_records_and_exception_and_sentinel

Copy link

github-actions bot commented Aug 5, 2025

PyTest Results (Full)

3 698 tests   - 4   3 687 ✅  - 4   11m 35s ⏱️ -5s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 62c5eab. ± Comparison against base commit db1e4c0.

This pull request removes 6 and adds 2 tests. Note that renamed tests count towards both.
unit_tests.sources.declarative.requesters.test_http_requester ‑ test_join_url_with_url_and_path[test_no_path-https://airbyte.io/my_endpoint-None-https://airbyte.io/my_endpoint]
unit_tests.sources.declarative.requesters.test_http_requester ‑ test_join_url_with_url_and_path[test_path_does_include_url-https://airbyte.io/my_endpoint-https://airbyte.io/my_endpoint/with_path-https://airbyte.io/my_endpoint/with_path]
unit_tests.sources.declarative.requesters.test_http_requester ‑ test_join_url_with_url_and_path[test_path_does_not_include_url-https://airbyte.io/my_endpoint-with_path-https://airbyte.io/my_endpoint/with_path]
unit_tests.sources.declarative.requesters.test_http_requester ‑ test_join_url_with_url_and_path[test_path_is_different_full_url-https://airbyte.io/my_endpoint-https://airbyte-paginated.io/my_paginated_endpoint-https://airbyte-paginated.io/my_paginated_endpoint]
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_close_slice_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_read_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.sources.streams.concurrent.test_concurrent_read_processor.TestConcurrentReadProcessor ‑ test_given_exception_on_partition_complete_sentinel_then_yield_error_trace_message_and_stream_is_incomplete
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_when_process_partition_then_queue_records_and_exception_and_sentinel

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants