Skip to content
Draft
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4e24489
spike: Replace null namespaces with empty strings
devin-ai-integration[bot] Aug 13, 2025
054093a
fix: Shorten comment to resolve Ruff lint error (line length)
devin-ai-integration[bot] Aug 13, 2025
0c2f196
Update airbyte/_message_iterators.py
aaronsteers Aug 13, 2025
5c05e1d
Update airbyte/_message_iterators.py
aaronsteers Aug 13, 2025
761554e
fix: Replace null namespaces with empty strings in catalog validation
devin-ai-integration[bot] Aug 13, 2025
3a2cb45
fix: Also handle null source_defined_cursor and is_resumable fields
devin-ai-integration[bot] Aug 13, 2025
d098050
fix: Clean up null namespaces in record messages from external sources
devin-ai-integration[bot] Aug 13, 2025
201187a
fix: Clean up null namespaces in record messages from external source…
devin-ai-integration[bot] Aug 13, 2025
808ec52
fix: Clean up null namespaces in record messages at source execution …
devin-ai-integration[bot] Aug 13, 2025
9aad1fb
fix: Comment out performance-problematic namespace checks in high-vol…
devin-ai-integration[bot] Aug 13, 2025
85a3b4e
fix: Ensure catalog cleaning changes are included
devin-ai-integration[bot] Aug 13, 2025
4c5a6d2
fix: Use Pydantic exclude_none=True to omit null fields from catalog …
devin-ai-integration[bot] Aug 13, 2025
50b07ad
fix: Add exclude_none=True to remaining model_dump_json() calls
devin-ai-integration[bot] Aug 13, 2025
89ff68a
fix: Complete exclude_none=True implementation for all model_dump_jso…
devin-ai-integration[bot] Aug 13, 2025
9c4f594
fix: Clean up null namespaces in trace stream_status messages
devin-ai-integration[bot] Aug 13, 2025
82e7730
Apply suggestion from @aaronsteers
aaronsteers Aug 13, 2025
405b947
Apply suggestion from @aaronsteers
aaronsteers Aug 13, 2025
35d9d60
Apply suggestion from @aaronsteers
aaronsteers Aug 13, 2025
d0a6f24
Apply suggestion from @aaronsteers
aaronsteers Aug 13, 2025
a8205c4
Apply suggestion from @aaronsteers
aaronsteers Aug 13, 2025
c8d1d41
feat: Switch benchmark default from source-e2e-test to source-faker
devin-ai-integration[bot] Aug 13, 2025
c3dce14
fix: Simplify source-faker config to use only count and seed parameters
devin-ai-integration[bot] Aug 13, 2025
88756af
feat: Test catalog-only exclude_none theory, update get_benchmark_sou…
devin-ai-integration[bot] Aug 13, 2025
57c9863
fix: Address lint and type checking issues
devin-ai-integration[bot] Aug 13, 2025
dbb96d0
security: Replace insecure tempfile.mktemp() with secure tempfile.mks…
devin-ai-integration[bot] Aug 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,16 @@ def _execute(
for line in self.executor.execute(args, stdin=stdin, suppress_stderr=suppress_stderr):
try:
message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line)

if (
message.type == Type.TRACE
and message.trace
and message.trace.stream_status
and message.trace.stream_status.stream_descriptor
and message.trace.stream_status.stream_descriptor.namespace is None
):
message.trace.stream_status.stream_descriptor.namespace = ""

if progress_tracker and message.record:
stream_name = message.record.stream
progress_tracker.tally_bytes_read(
Expand Down
2 changes: 1 addition & 1 deletion airbyte/_executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def _pump_input(
"""Pump lines into a pipe."""
with pipe:
try:
pipe.writelines(message.model_dump_json() + "\n" for message in messages)
pipe.writelines(message.model_dump_json(exclude_none=True) + "\n" for message in messages)
pipe.flush() # Ensure data is sent immediately
except (BrokenPipeError, OSError) as ex:
if isinstance(ex, BrokenPipeError):
Expand Down
17 changes: 12 additions & 5 deletions airbyte/_message_iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def __next__(self) -> AirbyteMessage:
@final
def read(self) -> str:
"""Read the next message from the iterator."""
return next(self).model_dump_json()
return next(self).model_dump_json(exclude_none=True)

@classmethod
def from_read_result(cls, read_result: ReadResult) -> AirbyteMessageIterator:
Expand All @@ -108,9 +108,8 @@ def generator() -> Generator[AirbyteMessage, None, None]:
"datetime.datetime", record.get(AB_EXTRACTED_AT_COLUMN)
).timestamp()
),
# `meta` and `namespace` are not handled:
meta=None,
namespace=None,
namespace="", # Some destinations fail if null/missing
),
)

Expand Down Expand Up @@ -138,7 +137,11 @@ def generator() -> Generator[AirbyteMessage, None, None]:
break
try:
# Let Pydantic handle the JSON decoding from the raw string
yield AirbyteMessage.model_validate_json(next_line)
message = AirbyteMessage.model_validate_json(next_line)
# if message.type == Type.RECORD and message.record:
# if message.record.namespace is None:
# message.record.namespace = ""
yield message
except pydantic.ValidationError:
# Handle JSON decoding errors (optional)
raise ValueError("Invalid JSON format") # noqa: B904
Expand All @@ -153,7 +156,11 @@ def generator() -> Generator[AirbyteMessage, None, None]:
for line in buffer:
try:
# Let Pydantic handle the JSON decoding from the raw string
yield AirbyteMessage.model_validate_json(line)
message = AirbyteMessage.model_validate_json(line)
# if message.type == Type.RECORD and message.record:
# if message.record.namespace is None:
# message.record.namespace = ""
yield message
except pydantic.ValidationError:
# Handle JSON decoding errors (optional)
raise ValueError(f"Invalid JSON format in input string: {line}") # noqa: B904
Expand Down
4 changes: 2 additions & 2 deletions airbyte/caches/_state_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,14 @@ def _write_state(
destination_name=self.destination_name,
source_name=self.source_name,
stream_name=stream_name,
state_json=state_message.model_dump_json(),
state_json=state_message.model_dump_json(exclude_none=True),
)
if self.destination_name
else CacheStreamStateModel(
source_name=self.source_name,
stream_name=stream_name,
table_name=table_prefix + stream_name,
state_json=state_message.model_dump_json(),
state_json=state_message.model_dump_json(exclude_none=True),
)
)

Expand Down
2 changes: 1 addition & 1 deletion airbyte/destinations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def _write_airbyte_message_stream(
with as_temp_files(
files_contents=[
self._hydrated_config,
catalog_provider.configured_catalog.model_dump_json(),
catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
]
Comment on lines 265 to 269
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Switching to exclude_none=True for catalog JSON avoids strict validator null errors — please confirm downstream tolerance and add a rationale comment.

This aligns with the PR’s intent to stop surfacing explicit nulls in ConfiguredAirbyteStream fields (e.g., namespace, source_defined_cursor, is_resumable). The change reduces spurious OpenAPI4J validation failures. The one behavioral change to watch is that optional fields previously present with null will now be omitted entirely; if any destination implementations (custom or legacy) were relying on the presence of keys even when null, they might need an adjustment. Would you mind confirming there are no destinations that require those keys to be present when the values are None, and adding a brief comment here to document the rationale, wdyt?

Suggested inline comment for future readers:

 with as_temp_files(
   files_contents=[
     self._hydrated_config,
-    catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
+    # Omit None-valued optional fields (e.g., namespace, source_defined_cursor, is_resumable)
+    # to avoid strict validators rejecting explicit nulls in some CDKs.
+    catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
   ]
 )

To help double-check consistency across the repo, you could run this quick search to ensure all configured catalog dumps now use exclude_none=True (and spot any stragglers):


🏁 Script executed:

#!/bin/bash
# Find all places where configured_catalog JSON is dumped
rg -n -A2 -B2 'configured_catalog\.model_dump_json\('

# Inspect all model_dump_json calls and see which ones use exclude_none=True
rg -n -A1 -B1 'model_dump_json\('

# Optional: scan for hardcoded "namespace": null in code/tests
rg -n -A2 -B2 '"namespace"\s*:\s*null'

Length of output: 2474


Add a short rationale comment here and confirm downstream tolerance for omitted None fields — wdyt?

Quick check: rg shows this repo already uses model_dump_json(exclude_none=True) in multiple places and I found no occurrences of a hardcoded "namespace": null. It looks consistent, but could we add a short inline rationale and confirm no destinations rely on keys being present with nulls?

Files to update / reference:

  • Please add the comment at: airbyte/destinations/base.py (around lines 265–269)
  • Other places already using exclude_none=True (for reference):
    • airbyte/_message_iterators.py:88
    • airbyte/shared/state_writers.py:79
    • airbyte/sources/base.py:755
    • airbyte/shared/state_providers.py:88
    • airbyte/_executors/base.py:44
    • airbyte/caches/_state_backend.py:146,153

Suggested inline diff to add at the destinations file:

 with as_temp_files(
   files_contents=[
     self._hydrated_config,
+    # Omit None-valued optional fields (e.g., namespace, source_defined_cursor, is_resumable)
+    # to avoid strict validators rejecting explicit nulls in some CDKs.
     catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
   ]
 )

Would you mind adding that comment and confirming there are no destination implementations that expect those keys to exist with null values? wdyt?

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
with as_temp_files(
files_contents=[
self._hydrated_config,
catalog_provider.configured_catalog.model_dump_json(),
catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
]
with as_temp_files(
files_contents=[
self._hydrated_config,
# Omit None-valued optional fields (e.g., namespace, source_defined_cursor, is_resumable)
# to avoid strict validators rejecting explicit nulls in some CDKs.
catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
]
🤖 Prompt for AI Agents
In airbyte/destinations/base.py around lines 265-269, add a short inline comment
above the call to model_dump_json(exclude_none=True) noting the deliberate
choice to omit keys with None values (e.g., "exclude_none=True to avoid emitting
keys with null values; verified repository-wide usage and no destinations expect
null-valued keys") and then run a quick scan of destination implementations
(grep for "namespace\": null" and check destination codepaths) to confirm no
destination relies on presence of keys with null values; if any destination does
rely on nulls, revert to including None or handle that destination specially.

) as [
config_file,
Expand Down
2 changes: 1 addition & 1 deletion airbyte/shared/state_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def to_state_input_file_text(self) -> str:
"["
+ "\n, ".join(
[
state_artifact.model_dump_json()
state_artifact.model_dump_json(exclude_none=True)
for state_artifact in (self._state_message_artifacts or [])
]
)
Expand Down
2 changes: 1 addition & 1 deletion airbyte/shared/state_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def _write_state(
state_message: AirbyteStateMessage,
) -> None:
"""Save or 'write' a state artifact."""
print(state_message.model_dump_json())
print(state_message.model_dump_json(exclude_none=True))


class NoOpStateWriter(StateWriterBase):
Expand Down
2 changes: 1 addition & 1 deletion airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ def _read_with_catalog(
with as_temp_files(
[
self._hydrated_config,
catalog.model_dump_json(),
catalog.model_dump_json(exclude_none=True),
state.to_state_input_file_text() if state else "[]",
]
) as [
Expand Down
Loading