Skip to content

Conversation

aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Aug 13, 2025

spike, do not merge: Fix bulk CDK null field validation for ConfiguredAirbyteStream

Summary

This PR addresses a regression in the Airbyte CDK that causes "Null value is not allowed (code: 1021)" validation errors when syncing data from source-faker to destination-dev-null. The issue stems from stricter OpenAPI4J schema validation in the bulk CDK that doesn't allow null values for certain AirbyteStream fields that should be nullable according to the Airbyte protocol.

Key Changes:

  • Modified ValidatedJsonUtils.kt to add special handling for ConfiguredAirbyteStream validation
  • Created allowNullFieldsInAirbyteStreamSchema() method that sets nullable: true for three problematic fields:
    • namespace - streams can have null namespaces
    • source_defined_cursor - not all sources define cursors
    • is_resumable - not all streams are resumable
  • The fix modifies the generated JSON schema at runtime before OpenAPI4J validation occurs

Review & Testing Checklist for Human

  • Test end-to-end with rebuilt destination-dev-null: Build a pre-release version of destination-dev-null with this CDK change and verify that uvx --from=airbyte pyab benchmark runs successfully without validation errors
  • Verify schema modification approach: Review the runtime schema modification logic in allowNullFieldsInAirbyteStreamSchema() to ensure it doesn't have unexpected side effects or break other validation scenarios
  • Check field coverage: Confirm that the three fields made nullable (namespace, source_defined_cursor, is_resumable) are complete - verify no other AirbyteStream fields should also be nullable
  • Validate against Airbyte protocol: Cross-check that making these fields nullable aligns with the official Airbyte protocol specification

Diagram

%%{ init : { "theme" : "default" }}%%
graph TD
    PyAirbyte["PyAirbyte<br/>benchmark command"]
    SrcFaker["source-faker<br/>generates catalog"]
    DestDevNull["destination-dev-null<br/>validates catalog"]
    
    ValidatedJsonUtils["ValidatedJsonUtils.kt<br/>parseList() method"]:::major-edit
    AllowNullMethod["allowNullFieldsInAirbyteStreamSchema()<br/>runtime schema modification"]:::major-edit
    OpenAPI4J["OpenAPI4J<br/>SchemaValidator"]:::context
    
    
    ConfiguredCatalog["ConfiguredAirbyteCatalog<br/>with null fields"]:::context
    
    PyAirbyte --> SrcFaker
    SrcFaker --> ConfiguredCatalog
    ConfiguredCatalog --> DestDevNull
    DestDevNull --> ValidatedJsonUtils
    ValidatedJsonUtils --> AllowNullMethod
    AllowNullMethod --> OpenAPI4J
    
    subgraph Legend
        L1[Major Edit]:::major-edit
        L2[Minor Edit]:::minor-edit  
        L3[Context/No Edit]:::context
    end
    
    classDef major-edit fill:#90EE90
    classDef minor-edit fill:#87CEEB
    classDef context fill:#FFFFFF
Loading

Notes

This is a spike PR to demonstrate one approach to fixing the null validation regression. The fix works by intercepting the JSON schema generation process and explicitly marking problematic fields as nullable before OpenAPI4J validation occurs.

Alternative approaches considered:

  • Updating the source schema definitions (more invasive)
  • Modifying OpenAPI4J configuration (less targeted)
  • Pre-processing the catalog JSON (more complex)

Session Info:

Related PR: Companion PyAirbyte fix at airbytehq/PyAirbyte (replaces null namespaces with empty strings as defensive measure)

Summary by CodeRabbit

  • New Features

    • Benchmark helper now supports selecting the connector (source-faker or source-e2e-test) via an optional parameter.
  • Bug Fixes

    • Record messages no longer include explicit null fields (e.g., namespace), improving compatibility with some destinations.
    • Generated catalog JSON omits null-valued fields, reducing noise and potential downstream issues.
  • Documentation

    • Added an example script to manually run and preview source connector output, including basic configuration and summary reporting.

- Update StreamDescriptor to use empty string for namespace instead of None
- Update AirbyteRecordMessage to use empty string for namespace instead of None
- Prevents 'Null value is not allowed (code: 1021)' validation errors in destinations

Co-Authored-By: AJ Steers <[email protected]>
Copy link
Contributor

Original prompt from AJ Steers
SYSTEM:
=== BEGIN THREAD HISTORY (in #dev-move) ===
AJ Steers (U05AKF1BCC9): I'm getting this error when syncing data from source-faker to destination-dev-null on the latest CDK.
```streams.2.stream.namespace: Null value is not allowed. (code: 1021)
From: streams.2.<items>.<#/definitions/ConfiguredAirbyteStream>.stream.<#/definitions/AirbyteStream>.namespace.<nullable>```
My understanding is that 'namespace' should be allowed to be null. It is possible for the CDK to continue to handling nulls gracefully, as was the prior behavior?

AJ Steers (U05AKF1BCC9): Repro steps, assuming uv is installed (`brew install uv`):

`uvx --from=airbyte pyab benchmark`

If you have already pulled `airbyte/destination-dev-null`, you need to additionally run `docker pull airbyte/destination-dev-null:latest` to see the error. (Older versions worked fine.)

Davin Chia (U02TNCJ5ERH): <@U02TKE0UXEH> is this something new we added with the new dev-null version?

Subodh Chaturvedi (U02TKE0UXEH): Could be! Let me take a look. 

AJ Steers (U05AKF1BCC9): Raising again for visibility. This also broke a test script that <@U05682XPZ60> and I were using to validate the Java-connectors-on-Python use case. The destination-bigquery connector is likewise failing when receiving a null namespace.

ATTACHMENT:"https://app.devin.ai/attachments/89cb8d82-c62d-458b-87e0-52aa6866e2b9/image.png"

AJ Steers (U05AKF1BCC9): Specifically in this case, we are testing with a built distTar. Maybe not affecting the published version of the connector.

AJ Steers (U05AKF1BCC9): I think I'm going to ask @Devin to spike on this in a new thread.
=== END THREAD HISTORY ===

The latest message is the one right above that tagged you.

Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1755114459-fix-pyairbyte-null-namespace' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1755114459-fix-pyairbyte-null-namespace'

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test-pr - Runs tests with the updated PyAirbyte

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

Copy link
Contributor

coderabbitai bot commented Aug 13, 2025

Warning

Rate limit exceeded

@devin-ai-integration[bot] has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 7 minutes and 2 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 88756af and dbb96d0.

📒 Files selected for processing (2)
  • airbyte/sources/util.py (3 hunks)
  • examples/manual_connector_test.py (1 hunks)
📝 Walkthrough

Walkthrough

Updates record message construction to omit explicit None fields; writes destination catalog JSON excluding None values; extends get_benchmark_source with a connector_name switch supporting source-faker and source-e2e-test; and adds a manual Docker-based source connector test script.

Changes

Cohort / File(s) Summary
Record message construction
airbyte/_message_iterators.py
Stop passing meta=None and namespace=None when building AirbyteRecordMessage; add comment noting some destinations expect undeclared null namespace.
Catalog JSON serialization
airbyte/destinations/base.py
Dump configured_catalog with exclude_none=True when writing temp catalog, omitting null fields.
Benchmark source selection
airbyte/sources/util.py
Update get_benchmark_source signature to accept connector_name (None, "source-faker", or "source-e2e-test"); validate input and construct Source accordingly.
Manual connector test script
examples/manual_connector_test.py
New script to run a source connector via Docker, write temp config/catalog, capture output to file, and print a brief summary.

Sequence Diagram(s)

sequenceDiagram
  participant User
  participant Script as manual_connector_test.py
  participant Docker
  participant Connector as Source Container

  User->>Script: run_connector()
  Script->>Script: Write temp config.json & catalog.json
  Script->>Docker: docker run ... read /tmp/config.json /tmp/catalog.json
  Docker->>Connector: Start in READ mode
  Connector-->>Docker: Emit messages (stdout)
  Docker-->>Script: Stream stdout to /tmp/manual_source_output.jsonl
  Script->>Script: Cleanup temp files
  Script-->>User: Print line count and first 3 lines
Loading
sequenceDiagram
  participant Caller
  participant Util as get_benchmark_source
  alt connector_name == "source-faker" or None
    Util->>Util: Build Source(name=source-faker, count/seed)
  else connector_name == "source-e2e-test"
    Util->>Util: Build Source(name=source-e2e-test, max_records/seed/message_interval, docker_image)
  else invalid
    Util-->>Caller: Raise PyAirbyteInputError
  end
  Util-->>Caller: Return Source
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~15 minutes

Possibly related PRs

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1755114459-fix-pyairbyte-null-namespace

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai or @coderabbitai title anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 07e843e and 4e24489.

📒 Files selected for processing (1)
  • airbyte/_message_iterators.py (2 hunks)
🧰 Additional context used
🪛 GitHub Actions: Run Linters
airbyte/_message_iterators.py

[error] 113-113: poetry run ruff check: E501 Line too long (104 > 100) in airbyte/_message_iterators.py:113.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (Fast)

@aaronsteers aaronsteers changed the title spike, do not merge: Replace null namespaces with empty strings fix: resolve issue where some destinations will fail due to null namespace in catalog Aug 13, 2025
@aaronsteers aaronsteers changed the title fix: resolve issue where some destinations will fail due to null namespace in catalog fix: resolve issue where some destinations will fail due to null/missing stream namespace keys in catalog Aug 13, 2025
Copy link

github-actions bot commented Aug 13, 2025

PyTest Results (Fast Tests Only, No Creds)

301 tests  ±0   301 ✅ ±0   4m 27s ⏱️ +9s
  1 suites ±0     0 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit dbb96d0. ± Comparison against base commit 07e843e.

♻️ This comment has been updated with latest results.

devin-ai-integration bot and others added 5 commits August 13, 2025 20:07
This extends the PyAirbyte fix to also clean up null namespaces in the
catalog itself, not just record messages. The destination connector
validates the catalog before processing records, so null namespaces
in the catalog cause validation failures even if record messages are
cleaned up.

Co-Authored-By: AJ Steers <[email protected]>
Extends the catalog validation to replace null values for all three
problematic fields: namespace, source_defined_cursor, and is_resumable.
This should resolve all remaining validation errors.

Co-Authored-By: AJ Steers <[email protected]>
Copy link

github-actions bot commented Aug 13, 2025

PyTest Results (Full)

364 tests  ±0   350 ✅ ±0   22m 15s ⏱️ + 3m 43s
  1 suites ±0    14 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit dbb96d0. ± Comparison against base commit 07e843e.

♻️ This comment has been updated with latest results.

devin-ai-integration bot and others added 2 commits August 13, 2025 20:32
…ume loops

Per GitHub feedback from AJ Steers, removed namespace checking from:
- _message_iterators.py: from_str_buffer and from_str_iterable methods
- _connector_base.py: _execute method

These are high-volume loops that run once per record. The catalog cleaning
approach in catalog_providers.py is sufficient to fix the validation issue
without the performance overhead.

Co-Authored-By: AJ Steers <[email protected]>
This completes the performance optimization by ensuring all catalog-level
null field cleaning is committed while the high-volume loop changes
have been commented out per GitHub feedback.

Co-Authored-By: AJ Steers <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
airbyte/shared/catalog_providers.py (1)

52-54: Return type annotation is incorrect (returns catalog but annotated as None)

validate_catalog returns the catalog object but is annotated as -> None. This breaks type expectations and IDE help. Update the return type, wdyt?

Apply this diff:

-    def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> None:
+    def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> ConfiguredAirbyteCatalog:
♻️ Duplicate comments (2)
airbyte/_message_iterators.py (2)

46-47: Centralize empty-namespace literal into a module-level constant

Using a shared constant makes the rationale discoverable and avoids duplicated literals. Replace with a constant, wdyt?

Apply this local change:

-                    name=stream_name,
-                    namespace="",  # Some destinations fail if null/missing
+                    name=stream_name,
+                    namespace=EMPTY_NAMESPACE,

And add near the top of the module (outside this range):

# Rationale: Some validators (e.g., OpenAPI4J with stricter settings) reject null/None for namespace.
# Using empty string is a pragmatic compatibility choice in this spike.
EMPTY_NAMESPACE: str = ""

113-114: Reuse the EMPTY_NAMESPACE constant and drop the inline comment

This keeps line length under limits and avoids comment duplication. Replace the literal, wdyt?

Apply this diff:

-                            namespace="",  # Some destinations fail if null/missing
+                            namespace=EMPTY_NAMESPACE,
🧹 Nitpick comments (5)
airbyte/_connector_base.py (1)

472-475: De-duplicate record.namespace normalization across modules — wdyt?

Search found ".namespace is None" in:

  • airbyte/_connector_base.py:473
  • airbyte/_message_iterators.py:143, 162
  • airbyte/shared/catalog_providers.py:70

Proposed small helper and usage:

def _normalize_record_namespace(message: AirbyteMessage) -> None:
    if message.type == Type.RECORD and message.record and message.record.namespace is None:
        message.record.namespace = ""

Replace occurrences like this:

-                    if message.type == Type.RECORD and message.record and message.record.namespace is None:
-                        message.record.namespace = ""
+                    _normalize_record_namespace(message)

Note: shared/catalog_providers.py checks stream.stream.namespace (not an AirbyteMessage). Should we normalize stream.namespace the same way, or leave that check as-is? wdyt?

airbyte/shared/catalog_providers.py (2)

58-61: Trim trailing whitespace to fix Ruff W293

There’s trailing whitespace on an otherwise blank line that Ruff flags. Remove it to keep CI happy, wdyt?

Apply this diff:

-        
+
         Also replaces null namespaces with empty strings for compatibility with strict
         destination validation.

69-69: Trim trailing whitespace to fix Ruff W293 (second instance)

There’s another whitespace-only line here flagged by Ruff. Remove it, wdyt?

Apply this diff:

-            
+
             if stream.stream.namespace is None:
                 stream.stream.namespace = ""
airbyte/_message_iterators.py (2)

142-145: Combine nested ifs to satisfy Ruff SIM102

Flatten the condition to avoid SIM102 warnings, wdyt?

Apply this diff:

-                    if message.type == Type.RECORD and message.record:
-                        if message.record.namespace is None:
-                            message.record.namespace = ""
+                    if (
+                        message.type == Type.RECORD
+                        and message.record
+                        and message.record.namespace is None
+                    ):
+                        message.record.namespace = ""

161-164: Combine nested ifs here as well (Ruff SIM102)

Same simplification as above to keep linters happy, wdyt?

Apply this diff:

-                    if message.type == Type.RECORD and message.record:
-                        if message.record.namespace is None:
-                            message.record.namespace = ""
+                    if (
+                        message.type == Type.RECORD
+                        and message.record
+                        and message.record.namespace is None
+                    ):
+                        message.record.namespace = ""
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5c05e1d and 808ec52.

📒 Files selected for processing (3)
  • airbyte/_connector_base.py (1 hunks)
  • airbyte/_message_iterators.py (4 hunks)
  • airbyte/shared/catalog_providers.py (2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#369
File: airbyte/_connector_base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In this codebase, `message.record.stream` is a required property enforced by schema, so it will not be `None`.
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#369
File: airbyte/_connector_base.py:0-0
Timestamp: 2024-09-17T21:18:12.530Z
Learning: In this codebase, `message.record.stream` is a required property enforced by schema, so it will not be `None`.
🪛 GitHub Actions: Run Linters
airbyte/shared/catalog_providers.py

[warning] 58-58: Ruff: W293 - Blank line contains whitespace on line 58. Command: 'poetry run ruff check .'


[warning] 69-69: Ruff: W293 - Blank line contains whitespace on line 69. Command: 'poetry run ruff check .'

airbyte/_connector_base.py

[warning] 472-472: Ruff: W293 - Blank line contains whitespace on line 472. Command: 'poetry run ruff check .'


[error] 473-473: Ruff: E501 - Line too long (107 > 100) on line 473. Command: 'poetry run ruff check .'


[warning] 475-475: Ruff: W293 - Blank line contains whitespace on line 475. Command: 'poetry run ruff check .'

airbyte/_message_iterators.py

[warning] 142-142: Ruff: SIM102 - Use a single if statement instead of nested if statements on line 142. Command: 'poetry run ruff check .'


[warning] 161-161: Ruff: SIM102 - Use a single if statement instead of nested if statements on line 161. Command: 'poetry run ruff check .'

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte/shared/catalog_providers.py (1)

70-76: No code treats None specially — defaulting these booleans to False looks safe

I searched the repo for comparisons against None for these fields and found only the defaults being set in the file below; other occurrences are tests/fixtures that already use True/False. I didn't find any logic that relies on None to mean “unspecified.”

  • airbyte/shared/catalog_providers.py — lines ~70–75: sets namespace = "", source_defined_cursor = False, is_resumable = False
  • tests/unit_tests/sources/test_source_key_overrides.py — source_defined_cursor = True (line 20)
  • tests/integration_tests/fixtures/source-test/source_test/run.py — source_defined_cursor set to True/False in multiple fixture streams

Shall I mark this resolved, or would you prefer keeping None as an explicit “unspecified” sentinel, wdyt?

devin-ai-integration bot and others added 4 commits August 13, 2025 20:47
…serialization

- Modified destinations/base.py to use exclude_none=True in catalog serialization
- Modified sources/base.py to use exclude_none=True in catalog serialization
- Removed previous null-to-empty-string replacement logic from catalog_providers.py
- This approach omits null namespace fields entirely instead of sending explicit nulls

Co-Authored-By: AJ Steers <[email protected]>
- Updated _message_iterators.py read() method to use exclude_none=True
- Updated _executors/base.py _pump_input() to use exclude_none=True
- This ensures all message serialization omits null fields consistently

Co-Authored-By: AJ Steers <[email protected]>
…n() calls

- Updated state_providers.py to use exclude_none=True for state artifact serialization
- Updated state_writers.py to use exclude_none=True for state message printing
- Updated _state_backend.py to use exclude_none=True for both state JSON storage calls
- This ensures all message serialization consistently omits null fields

Co-Authored-By: AJ Steers <[email protected]>
- Add namespace cleaning for trace messages with stream_status to prevent mismatch
- Source connectors emit stream_status with null namespaces causing destination tracking issues
- This ensures consistent namespace handling between stream tracking and completion messages

Co-Authored-By: AJ Steers <[email protected]>
aaronsteers and others added 3 commits August 13, 2025 14:02
- Add helper script for manual connector testing in examples/
- Change get_benchmark_source() to use source-faker instead of source-e2e-test
- Add explanatory comment about source-e2e-test limitations with Java destinations
- Update source-faker config with correct parameters

Co-Authored-By: AJ Steers <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
airbyte/caches/_state_backend.py (1)

124-134: Critical: control flow bug may raise on valid GLOBAL/LEGACY states.

The if/if/elif/else chain means GLOBAL (or LEGACY) types fall through to the final else and raise PyAirbyteInternalError, because the second if starts a new chain with an elif/else. This likely breaks writing GLOBAL/LEGACY state.

Would you switch to a single if/elif/elif/else chain? wdyt?

Apply this diff:

-        if state_message.type == AirbyteStateType.GLOBAL:
-            stream_name = GLOBAL_STATE_STREAM_NAME
-        if state_message.type == AirbyteStateType.LEGACY:
-            stream_name = LEGACY_STATE_STREAM_NAME
-        elif state_message.type == AirbyteStateType.STREAM and state_message.stream:
-            stream_name = state_message.stream.stream_descriptor.name
-        else:
-            raise PyAirbyteInternalError(
-                message="Invalid state message type.",
-                context={"state_message": state_message},
-            )
+        if state_message.type == AirbyteStateType.GLOBAL:
+            stream_name = GLOBAL_STATE_STREAM_NAME
+        elif state_message.type == AirbyteStateType.LEGACY:
+            stream_name = LEGACY_STATE_STREAM_NAME
+        elif state_message.type == AirbyteStateType.STREAM and state_message.stream:
+            stream_name = state_message.stream.stream_descriptor.name
+        else:
+            raise PyAirbyteInternalError(
+                message="Invalid state message type.",
+                context={"state_message": state_message},
+            )
🧹 Nitpick comments (1)
airbyte/shared/state_writers.py (1)

79-79: Omitting None in stdout state messages is consistent and desirable.

This matches the broader change and avoids emitting nulls to destinations. Would you also consider flushing to ensure prompt state delivery in long-running jobs? wdyt?

You could do:

-        print(state_message.model_dump_json(exclude_none=True))
+        print(state_message.model_dump_json(exclude_none=True), flush=True)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 85a3b4e and a8205c4.

📒 Files selected for processing (7)
  • airbyte/_executors/base.py (1 hunks)
  • airbyte/_message_iterators.py (2 hunks)
  • airbyte/caches/_state_backend.py (1 hunks)
  • airbyte/destinations/base.py (1 hunks)
  • airbyte/shared/state_providers.py (1 hunks)
  • airbyte/shared/state_writers.py (1 hunks)
  • airbyte/sources/base.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte/_message_iterators.py
🧰 Additional context used
🧬 Code Graph Analysis (2)
airbyte/caches/_state_backend.py (1)
airbyte/cloud/connections.py (1)
  • table_prefix (124-129)
airbyte/destinations/base.py (3)
airbyte/shared/sql_processor.py (1)
  • catalog_provider (215-231)
airbyte/shared/catalog_providers.py (1)
  • configured_catalog (70-72)
airbyte/sources/base.py (1)
  • configured_catalog (402-418)
🪛 GitHub Actions: Run Linters
airbyte/_executors/base.py

[error] 1-1: Ruff format --check would reformat: airbyte/_executors/base.py. Run 'poetry run ruff format .' to fix.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
🔇 Additional comments (6)
airbyte/_executors/base.py (3)

44-44: Omitting None fields in message JSON looks correct and aligns with the protocol.

Switching to exclude_none=True should prevent nulls from being emitted and unblock stricter validators. Nice improvement. wdyt?


1-1: Fix Ruff formatting to unblock the pipeline.

GitHub Actions indicates Ruff would reformat this file. Could you run: poetry run ruff format . (or apply the repo’s standard formatting command) and commit the changes? wdyt?


44-44: Confirm downstream tolerance for omitted keys (namespace / source_defined_cursor / is_resumable)

Quick summary: I scanned the repo for risky direct dict-index access and didn't find any places that would break if those keys are omitted; only a few comments/tests reference them. Internally looks safe, but I can't verify external downstream consumers (destinations/shims) from here — could you confirm?

Relevant spots to note:

  • airbyte/_executors/base.py — the changed line:
    pipe.writelines(message.model_dump_json(exclude_none=True) + "\n" for message in messages)
  • airbyte/_connector_base.py: ~line 461 — comment: "# "namespace": "", # We're knowingly omitting this to keep perf impact low."
  • airbyte/_message_iterators.py: ~line 112 — comment: "# namespace=None, # Some destinations need this undeclared if null"
  • tests/unit_tests/sources/test_source_key_overrides.py: line 20 — stream.source_defined_cursor = True (attribute access)
  • tests/integration_tests/fixtures/source-test/source_test/run.py: lines ~15/40/56/72 — fixture dicts include "source_defined_cursor"

Questions / suggestions — wdyt?

  • Could we run or add an integration test that verifies common destinations accept messages missing these keys (or add a small consumer test in CI)?
  • Or should we avoid exclude_none=True for these specific fields and emit explicit nulls for backward compatibility?
  • Would adding a short comment in the code explaining the intentional omission and the expected downstream compatibility help reviewers/maintainers?
airbyte/caches/_state_backend.py (1)

146-154: Consistent omit-none serialization for state_json looks good.

Persisting state with exclude_none=True keeps DB payloads clean and aligns with the rest of the PR changes. No compatibility issues expected since model_validate_json will handle missing optional fields. wdyt?

airbyte/shared/state_providers.py (1)

88-90: Skipping None fields in state input file text aligns with validator expectations.

This should help avoid nulls in the state payload sent to destinations. Looks good to me. wdyt?

airbyte/sources/base.py (1)

753-757: Emitting catalog JSON with exclude_none=True is the right call for protocol compatibility.

Omitting namespace/source_defined_cursor/is_resumable when None should pass stricter OpenAPI4J validation. Could we add an e2e smoke test (e.g., source-faker → destination-dev-null) proving the read path works with omitted fields and no 1021 validation error, so we lock this behavior in? wdyt?

Comment on lines 265 to 269
with as_temp_files(
files_contents=[
self._hydrated_config,
catalog_provider.configured_catalog.model_dump_json(),
catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Switching to exclude_none=True for catalog JSON avoids strict validator null errors — please confirm downstream tolerance and add a rationale comment.

This aligns with the PR’s intent to stop surfacing explicit nulls in ConfiguredAirbyteStream fields (e.g., namespace, source_defined_cursor, is_resumable). The change reduces spurious OpenAPI4J validation failures. The one behavioral change to watch is that optional fields previously present with null will now be omitted entirely; if any destination implementations (custom or legacy) were relying on the presence of keys even when null, they might need an adjustment. Would you mind confirming there are no destinations that require those keys to be present when the values are None, and adding a brief comment here to document the rationale, wdyt?

Suggested inline comment for future readers:

 with as_temp_files(
   files_contents=[
     self._hydrated_config,
-    catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
+    # Omit None-valued optional fields (e.g., namespace, source_defined_cursor, is_resumable)
+    # to avoid strict validators rejecting explicit nulls in some CDKs.
+    catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
   ]
 )

To help double-check consistency across the repo, you could run this quick search to ensure all configured catalog dumps now use exclude_none=True (and spot any stragglers):


🏁 Script executed:

#!/bin/bash
# Find all places where configured_catalog JSON is dumped
rg -n -A2 -B2 'configured_catalog\.model_dump_json\('

# Inspect all model_dump_json calls and see which ones use exclude_none=True
rg -n -A1 -B1 'model_dump_json\('

# Optional: scan for hardcoded "namespace": null in code/tests
rg -n -A2 -B2 '"namespace"\s*:\s*null'

Length of output: 2474


Add a short rationale comment here and confirm downstream tolerance for omitted None fields — wdyt?

Quick check: rg shows this repo already uses model_dump_json(exclude_none=True) in multiple places and I found no occurrences of a hardcoded "namespace": null. It looks consistent, but could we add a short inline rationale and confirm no destinations rely on keys being present with nulls?

Files to update / reference:

  • Please add the comment at: airbyte/destinations/base.py (around lines 265–269)
  • Other places already using exclude_none=True (for reference):
    • airbyte/_message_iterators.py:88
    • airbyte/shared/state_writers.py:79
    • airbyte/sources/base.py:755
    • airbyte/shared/state_providers.py:88
    • airbyte/_executors/base.py:44
    • airbyte/caches/_state_backend.py:146,153

Suggested inline diff to add at the destinations file:

 with as_temp_files(
   files_contents=[
     self._hydrated_config,
+    # Omit None-valued optional fields (e.g., namespace, source_defined_cursor, is_resumable)
+    # to avoid strict validators rejecting explicit nulls in some CDKs.
     catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
   ]
 )

Would you mind adding that comment and confirming there are no destination implementations that expect those keys to exist with null values? wdyt?

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
with as_temp_files(
files_contents=[
self._hydrated_config,
catalog_provider.configured_catalog.model_dump_json(),
catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
]
with as_temp_files(
files_contents=[
self._hydrated_config,
# Omit None-valued optional fields (e.g., namespace, source_defined_cursor, is_resumable)
# to avoid strict validators rejecting explicit nulls in some CDKs.
catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
]
🤖 Prompt for AI Agents
In airbyte/destinations/base.py around lines 265-269, add a short inline comment
above the call to model_dump_json(exclude_none=True) noting the deliberate
choice to omit keys with None values (e.g., "exclude_none=True to avoid emitting
keys with null values; verified repository-wide usage and no destinations expect
null-valued keys") and then run a quick scan of destination implementations
(grep for "namespace\": null" and check destination codepaths) to confirm no
destination relies on presence of keys with null values; if any destination does
rely on nulls, revert to including None or handle that destination specially.

…rce with connector_name parameter, and simplify example script

- Remove all exclude_none=True additions except catalog serialization in destinations/base.py
- Add optional connector_name parameter to get_benchmark_source() with validation for 'source-faker' and 'source-e2e-test'
- Update docstring to document compatibility differences between connectors
- Simplify manual_connector_test.py to use constants at top with minimal code and no error handling
- Benchmark test still fails with NullPointerException due to PyAirbyte creating malformed trace messages with null stream_status fields

Co-Authored-By: AJ Steers <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
examples/manual_connector_test.py (1)

31-36: Security: replace deprecated and insecure mktemp() with NamedTemporaryFile(delete=False)

Using tempfile.mktemp is vulnerable to TOCTOU and flagged by CodeQL. Could we switch to NamedTemporaryFile with delete=False and write via the opened handle, wdyt?

Apply:

-    config_file = Path(tempfile.mktemp(suffix=".json"))
-    catalog_file = Path(tempfile.mktemp(suffix=".json"))
-    
-    config_file.write_text(json.dumps(CONFIG, indent=2))
-    catalog_file.write_text(json.dumps(CATALOG, indent=2))
+    with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as cf:
+        json.dump(CONFIG, cf, indent=2)
+        config_file = Path(cf.name)
+    with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as catf:
+        json.dump(CATALOG, catf, indent=2)
+        catalog_file = Path(catf.name)

References:

🧹 Nitpick comments (6)
airbyte/sources/util.py (3)

154-158: Docstring note reads well; minor wording improvement for clarity

Would you consider tightening the note to active voice for readability, e.g., “because it does not generate final STATE or SUCCESS trace messages”, wdyt?

Apply:

-            Note: source-e2e-test is slightly faster but has compatibility issues with
-            some destinations due to not generating final STATE or SUCCESS trace messages.
+            Note: source-e2e-test is slightly faster but can be incompatible with some
+            destinations because it does not generate final STATE or SUCCESS trace messages.

Also, small nit above (Lines 146–147, not in this hunk): “If underscores are providing within a numeric a string” → “If underscores are provided within a numeric string.”


173-182: Input validation/mapping: guard against non-positive num_records and accept common aliases

Two small hardening tweaks could make this more user-friendly and robust:

  • Validate num_records > 0 to fail fast on invalid inputs.
  • Accept short aliases (“faker”, “e2e”, “e2e-test”) and normalize to canonical names. This mirrors patterns in examples (e.g., run_perf_test_reads.py) and reduces friction. Wdyt?

Here’s a minimal diff within this block:

     if connector_name is None:
         connector_name = "source-faker"
-    
-    if connector_name not in ["source-faker", "source-e2e-test"]:
+
+    # Validate record count
+    if isinstance(num_records, int) and num_records <= 0:
+        raise PyAirbyteInputError(
+            message="num_records must be a positive integer.",
+            input_value=str(num_records),
+            guidance="Provide a positive integer or a string like '5e6'.",
+        )
+
+    # Normalize and validate connector name
+    alias_map = {
+        "faker": "source-faker",
+        "e2e": "source-e2e-test",
+        "e2e-test": "source-e2e-test",
+    }
+    connector_name = alias_map.get(connector_name, connector_name)
+
+    if connector_name not in ["source-faker", "source-e2e-test"]:
         raise PyAirbyteInputError(
             message="Invalid connector name for benchmarking.",
             input_value=connector_name,
-            guidance="Valid options are 'source-faker' and 'source-e2e-test'.",
+            guidance="Valid options are 'source-faker' and 'source-e2e-test' (aliases: 'faker', 'e2e', 'e2e-test').",
         )

193-205: Surface seed/message_interval as optional knobs for repeatability and throughput control

Would it be helpful to allow callers to override seed and message_interval (with sensible defaults) so benchmarks can control determinism and throughput without editing code, wdyt? This could be done with optional kwargs (or a small dataclass) while keeping the current defaults.

Example signature extension (outside this hunk for reference):

def get_benchmark_source(..., *, connector_name: str | None = None, install_if_missing: bool = True, seed: int = 0, message_interval: int = 1000) -> Source:
    ...

And then use those values in both branches.

examples/manual_connector_test.py (3)

47-49: Surface connector errors to the user

Right now stderr is captured but discarded, making failures opaque. Would you print stderr and return code on failure (still keeping the script simple), wdyt?

-    with open(OUTPUT_FILE, "w") as f:
-        subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True, timeout=60)
+    with open(OUTPUT_FILE, "w") as f:
+        proc = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True, timeout=60)
+    if proc.returncode != 0:
+        print(f"Connector exited with status {proc.returncode}")
+        if proc.stderr:
+            print(proc.stderr.strip())

9-15: Make CONNECTOR_IMAGE overridable via env for local testing flexibility

For a manual test helper, allowing users to override the image without editing the file can be handy. Shall we read from an env var (with the current default), wdyt?

+import os
@@
-CONNECTOR_IMAGE = "airbyte/source-e2e-test:dev"
+CONNECTOR_IMAGE = os.getenv("CONNECTOR_IMAGE", "airbyte/source-e2e-test:dev")

53-56: Handle empty output gracefully (nit)

If the run produces no output, split/print will show a single empty line. Would you guard against empty content to avoid confusing prints, wdyt?

-    lines = Path(OUTPUT_FILE).read_text().strip().split('\n')
-    print(f"Generated {len(lines)} lines in {OUTPUT_FILE}")
-    for i, line in enumerate(lines[:3]):
+    content = Path(OUTPUT_FILE).read_text()
+    lines = [ln for ln in content.splitlines() if ln]
+    print(f"Generated {len(lines)} non-empty lines in {OUTPUT_FILE}")
+    for i, line in enumerate(lines[:3]):
         print(f"  {i+1}: {line}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a8205c4 and 88756af.

📒 Files selected for processing (3)
  • airbyte/_message_iterators.py (1 hunks)
  • airbyte/sources/util.py (3 hunks)
  • examples/manual_connector_test.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte/_message_iterators.py
🧰 Additional context used
📓 Path-based instructions (1)
examples/*

⚙️ CodeRabbit Configuration File

These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.

Files:

  • examples/manual_connector_test.py
🧠 Learnings (1)
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.

Applied to files:

  • airbyte/sources/util.py
🧬 Code Graph Analysis (1)
airbyte/sources/util.py (2)
airbyte/exceptions.py (1)
  • PyAirbyteInputError (201-210)
examples/run_perf_test_reads.py (1)
  • get_source (151-178)
🪛 GitHub Actions: Run Linters
airbyte/sources/util.py

[error] 1-1: Ruff format check would reformat this file. Run 'poetry run ruff format airbyte/sources/util.py' to fix.

examples/manual_connector_test.py

[error] 1-1: Ruff format check would reformat this file. Run 'poetry run ruff format examples/manual_connector_test.py' to fix.

🪛 ast-grep (0.38.6)
examples/manual_connector_test.py

[warning] 30-30: The function mktemp is deprecated. When using this function, it is possible for an attacker to modify the created file before the filename is returned. Use NamedTemporaryFile() instead and pass it the delete=False parameter.
Context: tempfile.mktemp(suffix=".json")
Note: [CWE-377]: Insecure Temporary File [OWASP A01:2021]: Broken Access Control [REFERENCES]
https://docs.python.org/3/library/tempfile.html#tempfile.mktemp
https://owasp.org/Top10/A01_2021-Broken_Access_Control

(avoid-mktemp-python)


[warning] 31-31: The function mktemp is deprecated. When using this function, it is possible for an attacker to modify the created file before the filename is returned. Use NamedTemporaryFile() instead and pass it the delete=False parameter.
Context: tempfile.mktemp(suffix=".json")
Note: [CWE-377]: Insecure Temporary File [OWASP A01:2021]: Broken Access Control [REFERENCES]
https://docs.python.org/3/library/tempfile.html#tempfile.mktemp
https://owasp.org/Top10/A01_2021-Broken_Access_Control

(avoid-mktemp-python)

🪛 GitHub Check: CodeQL
examples/manual_connector_test.py

[failure] 31-31: Insecure temporary file
Call to deprecated function tempfile.mktemp may be insecure.


[failure] 32-32: Insecure temporary file
Call to deprecated function tempfile.mktemp may be insecure.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte/sources/util.py (3)

183-192: Faker path looks good

The faker branch is straightforward and aligns with get_source expectations. Passing streams="*" is consistent with benchmark usage. LGTM.


1-212: Ruff formatting failure in CI — formatter couldn't be executed in the verification environment

I attempted to run the formatter but the sandbox is missing poetry (poetry: command not found), so I couldn't run ruff to verify formatting.

  • File needing formatting: airbyte/sources/util.py (lines 1–212)
  • Suggested commands to run locally or in CI:
    • If you use poetry: poetry run ruff format airbyte/sources/util.py && poetry run ruff check airbyte/sources/util.py
    • Otherwise: python -m ruff format airbyte/sources/util.py && python -m ruff check airbyte/sources/util.py

Shall I re-run verification after you run the formatter and push the change, wdyt?


138-141: No breaking change — kw-only connector_name is safe (call sites & docs checked)

I searched the repo: callers do not rely on positional args and there are no docs/examples referencing the new kw param, so this looks backwards-compatible.

  • Definition: airbyte/sources/util.py — get_benchmark_source(...) (def at ~line 135)
  • Call sites found:
    • examples/run_perf_test_reads.py — line ~167: return get_benchmark_source(num_records=num_records)
    • airbyte/cli.py — line ~514: get_benchmark_source(num_records=num_records)
  • No occurrences found that pass connector_name or reference it in examples/docs

Want me to optionally add a brief note/example in the docs or update the CLI help to show the new connector_name selector, or shall we leave it as-is? wdyt?

examples/manual_connector_test.py (1)

1-62: Ruff formatting failure in CI — can you run the formatter to unblock CI, wdyt?

I tried to run poetry run ruff format in the sandbox but poetry isn't installed here, so I couldn't verify the formatting locally.

Files to check:

  • examples/manual_connector_test.py (lines 1–62)

Suggested commands to run locally (or CI):

  • poetry run ruff format examples/manual_connector_test.py
  • poetry run ruff check examples/manual_connector_test.py

Or, if you don't use Poetry:

  • ruff format examples/manual_connector_test.py
  • ruff check examples/manual_connector_test.py

Shall I re-run the check after you push the formatted changes or share the command output, wdyt?

devin-ai-integration bot and others added 2 commits August 13, 2025 22:16
- Fix ruff lint issues in get_benchmark_source(): use set literal for membership testing and remove unnecessary else
- Add return type annotation to run_connector() function in example script
- All local checks now pass: ruff format, ruff check, mypy

Co-Authored-By: AJ Steers <[email protected]>
…temp()

- Fix CodeQL security alerts (2 high severity) by using secure temporary file creation
- tempfile.mktemp() is deprecated and insecure due to race conditions
- tempfile.mkstemp() creates files securely and returns (fd, path) tuple
- All local checks pass: ruff format, ruff check, mypy
- Example script tested and works correctly

Co-Authored-By: AJ Steers <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant