From 4e24489184c5f1ceb2c9afc5bf9617bd22228fd9 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 19:49:17 +0000 Subject: [PATCH 01/25] spike: Replace null namespaces with empty strings - Update StreamDescriptor to use empty string for namespace instead of None - Update AirbyteRecordMessage to use empty string for namespace instead of None - Prevents 'Null value is not allowed (code: 1021)' validation errors in destinations Co-Authored-By: AJ Steers --- airbyte/_message_iterators.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index ea7040eb..6d2d4343 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -43,6 +43,7 @@ def _new_stream_success_message(stream_name: str) -> AirbyteMessage: stream_status=AirbyteStreamStatusTraceMessage( stream_descriptor=StreamDescriptor( name=stream_name, + namespace="", # Use empty string instead of None for compatibility ), status=AirbyteStreamStatus.COMPLETE, reasons=None, @@ -108,9 +109,8 @@ def generator() -> Generator[AirbyteMessage, None, None]: "datetime.datetime", record.get(AB_EXTRACTED_AT_COLUMN) ).timestamp() ), - # `meta` and `namespace` are not handled: meta=None, - namespace=None, + namespace="", # Use empty string instead of None to avoid validation errors ), ) From 054093a803308310ae7988046c9d74fb010717ba Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 19:56:39 +0000 Subject: [PATCH 02/25] fix: Shorten comment to resolve Ruff lint error (line length) Co-Authored-By: AJ Steers --- airbyte/_message_iterators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index 6d2d4343..a4c61da0 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -110,7 +110,7 @@ def generator() -> Generator[AirbyteMessage, None, None]: ).timestamp() ), meta=None, - namespace="", # Use empty string instead of None to avoid validation errors + namespace="", # Use empty string instead of None ), ) From 0c2f1966cc2378cb6022462f9c2f1d14756e2133 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 13 Aug 2025 12:58:45 -0700 Subject: [PATCH 03/25] Update airbyte/_message_iterators.py --- airbyte/_message_iterators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index a4c61da0..23d15d2f 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -43,7 +43,7 @@ def _new_stream_success_message(stream_name: str) -> AirbyteMessage: stream_status=AirbyteStreamStatusTraceMessage( stream_descriptor=StreamDescriptor( name=stream_name, - namespace="", # Use empty string instead of None for compatibility + namespace="", # Some destinations fail if null/missing ), status=AirbyteStreamStatus.COMPLETE, reasons=None, From 5c05e1d34b496782183b44802ec5d220fdf3036f Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 13 Aug 2025 12:58:54 -0700 Subject: [PATCH 04/25] Update airbyte/_message_iterators.py --- airbyte/_message_iterators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index 23d15d2f..08189419 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -110,7 +110,7 @@ def generator() -> Generator[AirbyteMessage, None, None]: ).timestamp() ), meta=None, - namespace="", # Use empty string instead of None + namespace="", # Some destinations fail if null/missing ), ) From 761554eef8c732166a05be7e660c6293a4b5fba3 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 20:07:33 +0000 Subject: [PATCH 05/25] fix: Replace null namespaces with empty strings in catalog validation This extends the PyAirbyte fix to also clean up null namespaces in the catalog itself, not just record messages. The destination connector validates the catalog before processing records, so null namespaces in the catalog cause validation failures even if record messages are cleaned up. Co-Authored-By: AJ Steers --- airbyte/shared/catalog_providers.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airbyte/shared/catalog_providers.py b/airbyte/shared/catalog_providers.py index bf22a76b..27794122 100644 --- a/airbyte/shared/catalog_providers.py +++ b/airbyte/shared/catalog_providers.py @@ -55,6 +55,9 @@ def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> None: This requires ensuring that `generationId` and `minGenerationId` are both set. If not, both values will be set to `1`. + + Also replaces null namespaces with empty strings for compatibility with strict + destination validation. """ for stream in catalog.streams: if stream.generation_id is None: @@ -63,6 +66,9 @@ def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> None: stream.minimum_generation_id = 1 if stream.sync_id is None: stream.sync_id = 1 # This should ideally increment monotonically with each sync. + + if stream.stream.namespace is None: + stream.stream.namespace = "" return catalog From 3a2cb45fca1d36a8d2faab6f3448d4b74307e14e Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 20:09:00 +0000 Subject: [PATCH 06/25] fix: Also handle null source_defined_cursor and is_resumable fields Extends the catalog validation to replace null values for all three problematic fields: namespace, source_defined_cursor, and is_resumable. This should resolve all remaining validation errors. Co-Authored-By: AJ Steers --- airbyte/shared/catalog_providers.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airbyte/shared/catalog_providers.py b/airbyte/shared/catalog_providers.py index 27794122..eb4f5e1c 100644 --- a/airbyte/shared/catalog_providers.py +++ b/airbyte/shared/catalog_providers.py @@ -69,6 +69,10 @@ def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> None: if stream.stream.namespace is None: stream.stream.namespace = "" + if stream.stream.source_defined_cursor is None: + stream.stream.source_defined_cursor = False + if stream.stream.is_resumable is None: + stream.stream.is_resumable = False return catalog From d098050dc555e435ad43f434e76a0e0f8ef2fd8b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 20:11:07 +0000 Subject: [PATCH 07/25] fix: Clean up null namespaces in record messages from external sources Co-Authored-By: AJ Steers --- airbyte/_message_iterators.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index 08189419..0e9bf7f9 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -138,7 +138,10 @@ def generator() -> Generator[AirbyteMessage, None, None]: break try: # Let Pydantic handle the JSON decoding from the raw string - yield AirbyteMessage.model_validate_json(next_line) + message = AirbyteMessage.model_validate_json(next_line) + if message.type == Type.RECORD and message.record and message.record.namespace is None: + message.record.namespace = "" + yield message except pydantic.ValidationError: # Handle JSON decoding errors (optional) raise ValueError("Invalid JSON format") # noqa: B904 @@ -153,7 +156,10 @@ def generator() -> Generator[AirbyteMessage, None, None]: for line in buffer: try: # Let Pydantic handle the JSON decoding from the raw string - yield AirbyteMessage.model_validate_json(line) + message = AirbyteMessage.model_validate_json(line) + if message.type == Type.RECORD and message.record and message.record.namespace is None: + message.record.namespace = "" + yield message except pydantic.ValidationError: # Handle JSON decoding errors (optional) raise ValueError(f"Invalid JSON format in input string: {line}") # noqa: B904 From 201187a33c9c8af788aa1b8b7aab2c89655a7edf Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 20:14:03 +0000 Subject: [PATCH 08/25] fix: Clean up null namespaces in record messages from external sources (updated logic) Co-Authored-By: AJ Steers --- airbyte/_message_iterators.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index 0e9bf7f9..4c1b14d9 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -139,8 +139,9 @@ def generator() -> Generator[AirbyteMessage, None, None]: try: # Let Pydantic handle the JSON decoding from the raw string message = AirbyteMessage.model_validate_json(next_line) - if message.type == Type.RECORD and message.record and message.record.namespace is None: - message.record.namespace = "" + if message.type == Type.RECORD and message.record: + if message.record.namespace is None: + message.record.namespace = "" yield message except pydantic.ValidationError: # Handle JSON decoding errors (optional) @@ -157,8 +158,9 @@ def generator() -> Generator[AirbyteMessage, None, None]: try: # Let Pydantic handle the JSON decoding from the raw string message = AirbyteMessage.model_validate_json(line) - if message.type == Type.RECORD and message.record and message.record.namespace is None: - message.record.namespace = "" + if message.type == Type.RECORD and message.record: + if message.record.namespace is None: + message.record.namespace = "" yield message except pydantic.ValidationError: # Handle JSON decoding errors (optional) From 808ec523e66fb7cffc42000229dc1dd7034cf403 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 20:15:07 +0000 Subject: [PATCH 09/25] fix: Clean up null namespaces in record messages at source execution level Co-Authored-By: AJ Steers --- airbyte/_connector_base.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 66adde6c..aafa22a0 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -469,6 +469,10 @@ def _execute( for line in self.executor.execute(args, stdin=stdin, suppress_stderr=suppress_stderr): try: message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line) + + if message.type == Type.RECORD and message.record and message.record.namespace is None: + message.record.namespace = "" + if progress_tracker and message.record: stream_name = message.record.stream progress_tracker.tally_bytes_read( From 9aad1fb4a0a968acd67941360d640e5372ac495f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 20:32:45 +0000 Subject: [PATCH 10/25] fix: Comment out performance-problematic namespace checks in high-volume loops Per GitHub feedback from AJ Steers, removed namespace checking from: - _message_iterators.py: from_str_buffer and from_str_iterable methods - _connector_base.py: _execute method These are high-volume loops that run once per record. The catalog cleaning approach in catalog_providers.py is sufficient to fix the validation issue without the performance overhead. Co-Authored-By: AJ Steers --- airbyte/_connector_base.py | 12 ++++++++---- airbyte/_message_iterators.py | 12 ++++++------ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index aafa22a0..a1349ed0 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -469,10 +469,14 @@ def _execute( for line in self.executor.execute(args, stdin=stdin, suppress_stderr=suppress_stderr): try: message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line) - - if message.type == Type.RECORD and message.record and message.record.namespace is None: - message.record.namespace = "" - + + # if ( + # message.type == Type.RECORD + # and message.record + # and message.record.namespace is None + # ): + # message.record.namespace = "" + if progress_tracker and message.record: stream_name = message.record.stream progress_tracker.tally_bytes_read( diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index 4c1b14d9..bbb35111 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -139,9 +139,9 @@ def generator() -> Generator[AirbyteMessage, None, None]: try: # Let Pydantic handle the JSON decoding from the raw string message = AirbyteMessage.model_validate_json(next_line) - if message.type == Type.RECORD and message.record: - if message.record.namespace is None: - message.record.namespace = "" + # if message.type == Type.RECORD and message.record: + # if message.record.namespace is None: + # message.record.namespace = "" yield message except pydantic.ValidationError: # Handle JSON decoding errors (optional) @@ -158,9 +158,9 @@ def generator() -> Generator[AirbyteMessage, None, None]: try: # Let Pydantic handle the JSON decoding from the raw string message = AirbyteMessage.model_validate_json(line) - if message.type == Type.RECORD and message.record: - if message.record.namespace is None: - message.record.namespace = "" + # if message.type == Type.RECORD and message.record: + # if message.record.namespace is None: + # message.record.namespace = "" yield message except pydantic.ValidationError: # Handle JSON decoding errors (optional) From 85a3b4e043ff3eca1585e4f55e1d4aa433876011 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 20:33:01 +0000 Subject: [PATCH 11/25] fix: Ensure catalog cleaning changes are included This completes the performance optimization by ensuring all catalog-level null field cleaning is committed while the high-volume loop changes have been commented out per GitHub feedback. Co-Authored-By: AJ Steers --- airbyte/shared/catalog_providers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte/shared/catalog_providers.py b/airbyte/shared/catalog_providers.py index eb4f5e1c..9727d3e0 100644 --- a/airbyte/shared/catalog_providers.py +++ b/airbyte/shared/catalog_providers.py @@ -55,7 +55,7 @@ def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> None: This requires ensuring that `generationId` and `minGenerationId` are both set. If not, both values will be set to `1`. - + Also replaces null namespaces with empty strings for compatibility with strict destination validation. """ @@ -66,7 +66,7 @@ def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> None: stream.minimum_generation_id = 1 if stream.sync_id is None: stream.sync_id = 1 # This should ideally increment monotonically with each sync. - + if stream.stream.namespace is None: stream.stream.namespace = "" if stream.stream.source_defined_cursor is None: From 4c5a6d25c2b28f03b92768681827d2f48e27bf17 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 20:47:15 +0000 Subject: [PATCH 12/25] fix: Use Pydantic exclude_none=True to omit null fields from catalog serialization - Modified destinations/base.py to use exclude_none=True in catalog serialization - Modified sources/base.py to use exclude_none=True in catalog serialization - Removed previous null-to-empty-string replacement logic from catalog_providers.py - This approach omits null namespace fields entirely instead of sending explicit nulls Co-Authored-By: AJ Steers --- airbyte/destinations/base.py | 2 +- airbyte/shared/catalog_providers.py | 10 ---------- airbyte/sources/base.py | 2 +- 3 files changed, 2 insertions(+), 12 deletions(-) diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index b1482bc0..8593dac2 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -265,7 +265,7 @@ def _write_airbyte_message_stream( with as_temp_files( files_contents=[ self._hydrated_config, - catalog_provider.configured_catalog.model_dump_json(), + catalog_provider.configured_catalog.model_dump_json(exclude_none=True), ] ) as [ config_file, diff --git a/airbyte/shared/catalog_providers.py b/airbyte/shared/catalog_providers.py index 9727d3e0..bf22a76b 100644 --- a/airbyte/shared/catalog_providers.py +++ b/airbyte/shared/catalog_providers.py @@ -55,9 +55,6 @@ def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> None: This requires ensuring that `generationId` and `minGenerationId` are both set. If not, both values will be set to `1`. - - Also replaces null namespaces with empty strings for compatibility with strict - destination validation. """ for stream in catalog.streams: if stream.generation_id is None: @@ -67,13 +64,6 @@ def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> None: if stream.sync_id is None: stream.sync_id = 1 # This should ideally increment monotonically with each sync. - if stream.stream.namespace is None: - stream.stream.namespace = "" - if stream.stream.source_defined_cursor is None: - stream.stream.source_defined_cursor = False - if stream.stream.is_resumable is None: - stream.stream.is_resumable = False - return catalog @property diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index 698a50d9..20f05de8 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -752,7 +752,7 @@ def _read_with_catalog( with as_temp_files( [ self._hydrated_config, - catalog.model_dump_json(), + catalog.model_dump_json(exclude_none=True), state.to_state_input_file_text() if state else "[]", ] ) as [ From 50b07adeb0253a30faa0825c5f13c898bc03b7b9 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 20:50:16 +0000 Subject: [PATCH 13/25] fix: Add exclude_none=True to remaining model_dump_json() calls - Updated _message_iterators.py read() method to use exclude_none=True - Updated _executors/base.py _pump_input() to use exclude_none=True - This ensures all message serialization omits null fields consistently Co-Authored-By: AJ Steers --- airbyte/_executors/base.py | 2 +- airbyte/_message_iterators.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte/_executors/base.py b/airbyte/_executors/base.py index 99dc13b0..2fc7d036 100644 --- a/airbyte/_executors/base.py +++ b/airbyte/_executors/base.py @@ -41,7 +41,7 @@ def _pump_input( """Pump lines into a pipe.""" with pipe: try: - pipe.writelines(message.model_dump_json() + "\n" for message in messages) + pipe.writelines(message.model_dump_json(exclude_none=True) + "\n" for message in messages) pipe.flush() # Ensure data is sent immediately except (BrokenPipeError, OSError) as ex: if isinstance(ex, BrokenPipeError): diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index bbb35111..c107f737 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -86,7 +86,7 @@ def __next__(self) -> AirbyteMessage: @final def read(self) -> str: """Read the next message from the iterator.""" - return next(self).model_dump_json() + return next(self).model_dump_json(exclude_none=True) @classmethod def from_read_result(cls, read_result: ReadResult) -> AirbyteMessageIterator: From 89ff68ad78487e0aad21133655579cd7f265b8b6 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 20:53:34 +0000 Subject: [PATCH 14/25] fix: Complete exclude_none=True implementation for all model_dump_json() calls - Updated state_providers.py to use exclude_none=True for state artifact serialization - Updated state_writers.py to use exclude_none=True for state message printing - Updated _state_backend.py to use exclude_none=True for both state JSON storage calls - This ensures all message serialization consistently omits null fields Co-Authored-By: AJ Steers --- airbyte/caches/_state_backend.py | 4 ++-- airbyte/shared/state_providers.py | 2 +- airbyte/shared/state_writers.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte/caches/_state_backend.py b/airbyte/caches/_state_backend.py index 7fef5bbf..60d4dcce 100644 --- a/airbyte/caches/_state_backend.py +++ b/airbyte/caches/_state_backend.py @@ -143,14 +143,14 @@ def _write_state( destination_name=self.destination_name, source_name=self.source_name, stream_name=stream_name, - state_json=state_message.model_dump_json(), + state_json=state_message.model_dump_json(exclude_none=True), ) if self.destination_name else CacheStreamStateModel( source_name=self.source_name, stream_name=stream_name, table_name=table_prefix + stream_name, - state_json=state_message.model_dump_json(), + state_json=state_message.model_dump_json(exclude_none=True), ) ) diff --git a/airbyte/shared/state_providers.py b/airbyte/shared/state_providers.py index 359e8ee8..95880dc9 100644 --- a/airbyte/shared/state_providers.py +++ b/airbyte/shared/state_providers.py @@ -85,7 +85,7 @@ def to_state_input_file_text(self) -> str: "[" + "\n, ".join( [ - state_artifact.model_dump_json() + state_artifact.model_dump_json(exclude_none=True) for state_artifact in (self._state_message_artifacts or []) ] ) diff --git a/airbyte/shared/state_writers.py b/airbyte/shared/state_writers.py index 738325e4..272b4aca 100644 --- a/airbyte/shared/state_writers.py +++ b/airbyte/shared/state_writers.py @@ -76,7 +76,7 @@ def _write_state( state_message: AirbyteStateMessage, ) -> None: """Save or 'write' a state artifact.""" - print(state_message.model_dump_json()) + print(state_message.model_dump_json(exclude_none=True)) class NoOpStateWriter(StateWriterBase): From 9c4f5943e16428dd5832f234c6e0da3bc171ac01 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 20:55:41 +0000 Subject: [PATCH 15/25] fix: Clean up null namespaces in trace stream_status messages - Add namespace cleaning for trace messages with stream_status to prevent mismatch - Source connectors emit stream_status with null namespaces causing destination tracking issues - This ensures consistent namespace handling between stream tracking and completion messages Co-Authored-By: AJ Steers --- airbyte/_connector_base.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index a1349ed0..2c4d84ec 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -470,12 +470,14 @@ def _execute( try: message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line) - # if ( - # message.type == Type.RECORD - # and message.record - # and message.record.namespace is None - # ): - # message.record.namespace = "" + if ( + message.type == Type.TRACE + and message.trace + and message.trace.stream_status + and message.trace.stream_status.stream_descriptor + and message.trace.stream_status.stream_descriptor.namespace is None + ): + message.trace.stream_status.stream_descriptor.namespace = "" if progress_tracker and message.record: stream_name = message.record.stream From 82e773060a10389ba47b883eff03cd94d5fa8b5f Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 13 Aug 2025 14:00:09 -0700 Subject: [PATCH 16/25] Apply suggestion from @aaronsteers --- airbyte/_message_iterators.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index c107f737..23d99b73 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -43,7 +43,6 @@ def _new_stream_success_message(stream_name: str) -> AirbyteMessage: stream_status=AirbyteStreamStatusTraceMessage( stream_descriptor=StreamDescriptor( name=stream_name, - namespace="", # Some destinations fail if null/missing ), status=AirbyteStreamStatus.COMPLETE, reasons=None, From 405b947734bdedaff8ef3f48bb31549fb2774a28 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 13 Aug 2025 14:00:38 -0700 Subject: [PATCH 17/25] Apply suggestion from @aaronsteers --- airbyte/_connector_base.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 2c4d84ec..66adde6c 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -469,16 +469,6 @@ def _execute( for line in self.executor.execute(args, stdin=stdin, suppress_stderr=suppress_stderr): try: message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line) - - if ( - message.type == Type.TRACE - and message.trace - and message.trace.stream_status - and message.trace.stream_status.stream_descriptor - and message.trace.stream_status.stream_descriptor.namespace is None - ): - message.trace.stream_status.stream_descriptor.namespace = "" - if progress_tracker and message.record: stream_name = message.record.stream progress_tracker.tally_bytes_read( From 35d9d60aa0c3587b5312472d00c36f5e687d649c Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 13 Aug 2025 14:02:08 -0700 Subject: [PATCH 18/25] Apply suggestion from @aaronsteers --- airbyte/_message_iterators.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index 23d99b73..4e5e761e 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -156,11 +156,7 @@ def generator() -> Generator[AirbyteMessage, None, None]: for line in buffer: try: # Let Pydantic handle the JSON decoding from the raw string - message = AirbyteMessage.model_validate_json(line) - # if message.type == Type.RECORD and message.record: - # if message.record.namespace is None: - # message.record.namespace = "" - yield message + yield AirbyteMessage.model_validate_json(line) except pydantic.ValidationError: # Handle JSON decoding errors (optional) raise ValueError(f"Invalid JSON format in input string: {line}") # noqa: B904 From d0a6f24c82476daa3bae45ba5ed64fa74e536bcc Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 13 Aug 2025 14:02:13 -0700 Subject: [PATCH 19/25] Apply suggestion from @aaronsteers --- airbyte/_message_iterators.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index 4e5e761e..d41805d9 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -137,11 +137,7 @@ def generator() -> Generator[AirbyteMessage, None, None]: break try: # Let Pydantic handle the JSON decoding from the raw string - message = AirbyteMessage.model_validate_json(next_line) - # if message.type == Type.RECORD and message.record: - # if message.record.namespace is None: - # message.record.namespace = "" - yield message + yield AirbyteMessage.model_validate_json(next_line) except pydantic.ValidationError: # Handle JSON decoding errors (optional) raise ValueError("Invalid JSON format") # noqa: B904 From a8205c414d31d076833396d0626383059c8fdf44 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 13 Aug 2025 14:02:58 -0700 Subject: [PATCH 20/25] Apply suggestion from @aaronsteers --- airbyte/_message_iterators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index d41805d9..7b8fd24a 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -109,7 +109,7 @@ def generator() -> Generator[AirbyteMessage, None, None]: ).timestamp() ), meta=None, - namespace="", # Some destinations fail if null/missing + # namespace=None, # Some destinations need this undeclared if null ), ) From c8d1d41c004a9f1b54ac68d9327c5ca7bfc365c7 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 21:27:50 +0000 Subject: [PATCH 21/25] feat: Switch benchmark default from source-e2e-test to source-faker - Add helper script for manual connector testing in examples/ - Change get_benchmark_source() to use source-faker instead of source-e2e-test - Add explanatory comment about source-e2e-test limitations with Java destinations - Update source-faker config with correct parameters Co-Authored-By: AJ Steers --- airbyte/sources/util.py | 16 ++-- examples/manual_connector_test.py | 142 ++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+), 9 deletions(-) create mode 100644 examples/manual_connector_test.py diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index f2c52c11..11688ee0 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -165,17 +165,15 @@ def get_benchmark_source( input_value=str(num_records), ) from None + # Note: source-e2e-test does not generate final STATE or SUCCESS trace messages, + # which can cause failures when combined with certain Java destinations like destination-dev-null return get_source( - name="source-e2e-test", - docker_image=True, - # docker_image="airbyte/source-e2e-test:latest", + name="source-faker", config={ - "type": "BENCHMARK", - "schema": "FIVE_STRING_COLUMNS", - "terminationCondition": { - "type": "MAX_RECORDS", - "max": num_records, - }, + "count": num_records, + "seed": 0, + "records_per_sync": num_records, + "records_per_slice": 1000, }, streams="*", install_if_missing=install_if_missing, diff --git a/examples/manual_connector_test.py b/examples/manual_connector_test.py new file mode 100644 index 00000000..7d9fb822 --- /dev/null +++ b/examples/manual_connector_test.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +"""Helper script for manually testing source connectors with small record counts. + +This script demonstrates how to invoke source connectors directly to examine their +message output, which is useful for debugging connector behavior and message formats. + +Usage: + python examples/manual_connector_test.py + +The script will create temporary config and catalog files, invoke the source connector, +and save the output to a file for examination. +""" + +import json +import subprocess +import tempfile +from pathlib import Path + + +def create_config_file(config_type: str = "INFINITE_FEED", max_records: int = 5) -> Path: + """Create a temporary config file for the source connector.""" + if config_type == "INFINITE_FEED": + config = { + "type": "INFINITE_FEED", + "max_records": max_records, + "seed": 0, + "message_interval": 1000 + } + elif config_type == "BENCHMARK": + config = { + "type": "BENCHMARK", + "schema": "FIVE_STRING_COLUMNS", + "terminationCondition": { + "type": "MAX_RECORDS", + "max": max_records + } + } + else: + raise ValueError(f"Unknown config type: {config_type}") + + config_file = Path(tempfile.mktemp(suffix=".json")) + config_file.write_text(json.dumps(config, indent=2)) + return config_file + + +def create_catalog_file() -> Path: + """Create a temporary catalog file for the source connector.""" + catalog = { + "streams": [ + { + "stream": { + "name": "data", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] + } + + catalog_file = Path(tempfile.mktemp(suffix=".json")) + catalog_file.write_text(json.dumps(catalog, indent=2)) + return catalog_file + + +def run_source_connector( + source_image: str = "airbyte/source-e2e-test:dev", + config_type: str = "INFINITE_FEED", + max_records: int = 5, + output_file: str = "/tmp/manual_source_output.jsonl" +) -> None: + """Run the source connector and capture its output.""" + + config_file = create_config_file(config_type, max_records) + catalog_file = create_catalog_file() + + try: + cmd = [ + "docker", "run", "--rm", + "-v", f"{config_file}:/tmp/config.json", + "-v", f"{catalog_file}:/tmp/catalog.json", + source_image, + "read", + "--config", "/tmp/config.json", + "--catalog", "/tmp/catalog.json" + ] + + print(f"Running command: {' '.join(cmd)}") + print(f"Config type: {config_type}") + print(f"Max records: {max_records}") + print(f"Output file: {output_file}") + + with open(output_file, "w") as f: + result = subprocess.run( + cmd, + stdout=f, + stderr=subprocess.PIPE, + text=True, + timeout=60 + ) + + if result.returncode == 0: + print(f"โœ… Source connector completed successfully") + print(f"๐Ÿ“„ Output saved to: {output_file}") + + output_path = Path(output_file) + if output_path.exists(): + lines = output_path.read_text().strip().split('\n') + print(f"๐Ÿ“Š Generated {len(lines)} lines of output") + print("๐Ÿ” First 3 lines:") + for i, line in enumerate(lines[:3]): + print(f" {i+1}: {line}") + else: + print(f"โŒ Source connector failed with exit code: {result.returncode}") + print(f"stderr: {result.stderr}") + + finally: + config_file.unlink(missing_ok=True) + catalog_file.unlink(missing_ok=True) + + +if __name__ == "__main__": + print("๐Ÿงช Manual Source Connector Test") + print("=" * 40) + + print("\n๐Ÿ“‹ Testing with INFINITE_FEED configuration...") + run_source_connector( + config_type="INFINITE_FEED", + max_records=5, + output_file="/tmp/manual_source_infinite_feed.jsonl" + ) + + print("\n๐Ÿ“‹ Testing with BENCHMARK configuration...") + run_source_connector( + config_type="BENCHMARK", + max_records=5, + output_file="/tmp/manual_source_benchmark.jsonl" + ) + + print("\nโœจ Manual testing complete!") + print("๐Ÿ’ก Examine the output files to understand connector message formats") From c3dce14ae451cb1c452f4b63556f5bab16b77527 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 21:28:40 +0000 Subject: [PATCH 22/25] fix: Simplify source-faker config to use only count and seed parameters Co-Authored-By: AJ Steers --- airbyte/sources/util.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index 11688ee0..a2a1989e 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -172,8 +172,6 @@ def get_benchmark_source( config={ "count": num_records, "seed": 0, - "records_per_sync": num_records, - "records_per_slice": 1000, }, streams="*", install_if_missing=install_if_missing, From 88756af908ae10de9476a9de9ff81a735cb1f68a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 22:01:55 +0000 Subject: [PATCH 23/25] feat: Test catalog-only exclude_none theory, update get_benchmark_source with connector_name parameter, and simplify example script - Remove all exclude_none=True additions except catalog serialization in destinations/base.py - Add optional connector_name parameter to get_benchmark_source() with validation for 'source-faker' and 'source-e2e-test' - Update docstring to document compatibility differences between connectors - Simplify manual_connector_test.py to use constants at top with minimal code and no error handling - Benchmark test still fails with NullPointerException due to PyAirbyte creating malformed trace messages with null stream_status fields Co-Authored-By: AJ Steers --- airbyte/_executors/base.py | 2 +- airbyte/_message_iterators.py | 2 +- airbyte/caches/_state_backend.py | 4 +- airbyte/shared/state_providers.py | 2 +- airbyte/shared/state_writers.py | 2 +- airbyte/sources/base.py | 2 +- airbyte/sources/util.py | 49 +++++++-- examples/manual_connector_test.py | 175 ++++++++---------------------- 8 files changed, 92 insertions(+), 146 deletions(-) diff --git a/airbyte/_executors/base.py b/airbyte/_executors/base.py index 2fc7d036..99dc13b0 100644 --- a/airbyte/_executors/base.py +++ b/airbyte/_executors/base.py @@ -41,7 +41,7 @@ def _pump_input( """Pump lines into a pipe.""" with pipe: try: - pipe.writelines(message.model_dump_json(exclude_none=True) + "\n" for message in messages) + pipe.writelines(message.model_dump_json() + "\n" for message in messages) pipe.flush() # Ensure data is sent immediately except (BrokenPipeError, OSError) as ex: if isinstance(ex, BrokenPipeError): diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index 7b8fd24a..3f6a4239 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -85,7 +85,7 @@ def __next__(self) -> AirbyteMessage: @final def read(self) -> str: """Read the next message from the iterator.""" - return next(self).model_dump_json(exclude_none=True) + return next(self).model_dump_json() @classmethod def from_read_result(cls, read_result: ReadResult) -> AirbyteMessageIterator: diff --git a/airbyte/caches/_state_backend.py b/airbyte/caches/_state_backend.py index 60d4dcce..7fef5bbf 100644 --- a/airbyte/caches/_state_backend.py +++ b/airbyte/caches/_state_backend.py @@ -143,14 +143,14 @@ def _write_state( destination_name=self.destination_name, source_name=self.source_name, stream_name=stream_name, - state_json=state_message.model_dump_json(exclude_none=True), + state_json=state_message.model_dump_json(), ) if self.destination_name else CacheStreamStateModel( source_name=self.source_name, stream_name=stream_name, table_name=table_prefix + stream_name, - state_json=state_message.model_dump_json(exclude_none=True), + state_json=state_message.model_dump_json(), ) ) diff --git a/airbyte/shared/state_providers.py b/airbyte/shared/state_providers.py index 95880dc9..359e8ee8 100644 --- a/airbyte/shared/state_providers.py +++ b/airbyte/shared/state_providers.py @@ -85,7 +85,7 @@ def to_state_input_file_text(self) -> str: "[" + "\n, ".join( [ - state_artifact.model_dump_json(exclude_none=True) + state_artifact.model_dump_json() for state_artifact in (self._state_message_artifacts or []) ] ) diff --git a/airbyte/shared/state_writers.py b/airbyte/shared/state_writers.py index 272b4aca..738325e4 100644 --- a/airbyte/shared/state_writers.py +++ b/airbyte/shared/state_writers.py @@ -76,7 +76,7 @@ def _write_state( state_message: AirbyteStateMessage, ) -> None: """Save or 'write' a state artifact.""" - print(state_message.model_dump_json(exclude_none=True)) + print(state_message.model_dump_json()) class NoOpStateWriter(StateWriterBase): diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index 20f05de8..698a50d9 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -752,7 +752,7 @@ def _read_with_catalog( with as_temp_files( [ self._hydrated_config, - catalog.model_dump_json(exclude_none=True), + catalog.model_dump_json(), state.to_state_input_file_text() if state else "[]", ] ) as [ diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index a2a1989e..3b9d0ccf 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -135,6 +135,7 @@ def get_source( # noqa: PLR0913 # Too many arguments def get_benchmark_source( num_records: int | str = "5e5", *, + connector_name: str | None = None, install_if_missing: bool = True, ) -> Source: """Get a source for benchmarking. @@ -150,6 +151,10 @@ def get_benchmark_source( 500,000 records. Can be an integer (`1000`) or a string in scientific notation. For example, `"5e6"` will generate 5 million records. + connector_name: The connector to use for benchmarking. Defaults to "source-faker". + Valid options are "source-faker" and "source-e2e-test". + Note: source-e2e-test is slightly faster but has compatibility issues with + some destinations due to not generating final STATE or SUCCESS trace messages. install_if_missing: Whether to install the source if it is not available locally. Returns: @@ -165,17 +170,39 @@ def get_benchmark_source( input_value=str(num_records), ) from None - # Note: source-e2e-test does not generate final STATE or SUCCESS trace messages, - # which can cause failures when combined with certain Java destinations like destination-dev-null - return get_source( - name="source-faker", - config={ - "count": num_records, - "seed": 0, - }, - streams="*", - install_if_missing=install_if_missing, - ) + if connector_name is None: + connector_name = "source-faker" + + if connector_name not in ["source-faker", "source-e2e-test"]: + raise PyAirbyteInputError( + message="Invalid connector name for benchmarking.", + input_value=connector_name, + guidance="Valid options are 'source-faker' and 'source-e2e-test'.", + ) + + if connector_name == "source-faker": + return get_source( + name="source-faker", + config={ + "count": num_records, + "seed": 0, + }, + streams="*", + install_if_missing=install_if_missing, + ) + else: # source-e2e-test + return get_source( + name="source-e2e-test", + docker_image=True, + config={ + "type": "INFINITE_FEED", + "max_records": num_records, + "seed": 0, + "message_interval": 1000, + }, + streams="*", + install_if_missing=install_if_missing, + ) __all__ = [ diff --git a/examples/manual_connector_test.py b/examples/manual_connector_test.py index 7d9fb822..6093e536 100644 --- a/examples/manual_connector_test.py +++ b/examples/manual_connector_test.py @@ -1,142 +1,61 @@ #!/usr/bin/env python3 -"""Helper script for manually testing source connectors with small record counts. - -This script demonstrates how to invoke source connectors directly to examine their -message output, which is useful for debugging connector behavior and message formats. - -Usage: - python examples/manual_connector_test.py - -The script will create temporary config and catalog files, invoke the source connector, -and save the output to a file for examination. -""" +"""Helper script for manually testing source connectors with small record counts.""" import json import subprocess import tempfile from pathlib import Path - -def create_config_file(config_type: str = "INFINITE_FEED", max_records: int = 5) -> Path: - """Create a temporary config file for the source connector.""" - if config_type == "INFINITE_FEED": - config = { - "type": "INFINITE_FEED", - "max_records": max_records, - "seed": 0, - "message_interval": 1000 - } - elif config_type == "BENCHMARK": - config = { - "type": "BENCHMARK", - "schema": "FIVE_STRING_COLUMNS", - "terminationCondition": { - "type": "MAX_RECORDS", - "max": max_records - } - } - else: - raise ValueError(f"Unknown config type: {config_type}") - +CONNECTOR_IMAGE = "airbyte/source-e2e-test:dev" +CONFIG = { + "type": "INFINITE_FEED", + "max_records": 5, + "seed": 0, + "message_interval": 1000 +} +CATALOG = { + "streams": [{ + "stream": { + "name": "data", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }] +} +OUTPUT_FILE = "/tmp/manual_source_output.jsonl" + +def run_connector(): + """Run the source connector and capture its output.""" config_file = Path(tempfile.mktemp(suffix=".json")) - config_file.write_text(json.dumps(config, indent=2)) - return config_file - - -def create_catalog_file() -> Path: - """Create a temporary catalog file for the source connector.""" - catalog = { - "streams": [ - { - "stream": { - "name": "data", - "json_schema": {}, - "supported_sync_modes": ["full_refresh"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - } - ] - } - catalog_file = Path(tempfile.mktemp(suffix=".json")) - catalog_file.write_text(json.dumps(catalog, indent=2)) - return catalog_file - - -def run_source_connector( - source_image: str = "airbyte/source-e2e-test:dev", - config_type: str = "INFINITE_FEED", - max_records: int = 5, - output_file: str = "/tmp/manual_source_output.jsonl" -) -> None: - """Run the source connector and capture its output.""" - config_file = create_config_file(config_type, max_records) - catalog_file = create_catalog_file() + config_file.write_text(json.dumps(CONFIG, indent=2)) + catalog_file.write_text(json.dumps(CATALOG, indent=2)) - try: - cmd = [ - "docker", "run", "--rm", - "-v", f"{config_file}:/tmp/config.json", - "-v", f"{catalog_file}:/tmp/catalog.json", - source_image, - "read", - "--config", "/tmp/config.json", - "--catalog", "/tmp/catalog.json" - ] - - print(f"Running command: {' '.join(cmd)}") - print(f"Config type: {config_type}") - print(f"Max records: {max_records}") - print(f"Output file: {output_file}") - - with open(output_file, "w") as f: - result = subprocess.run( - cmd, - stdout=f, - stderr=subprocess.PIPE, - text=True, - timeout=60 - ) - - if result.returncode == 0: - print(f"โœ… Source connector completed successfully") - print(f"๐Ÿ“„ Output saved to: {output_file}") - - output_path = Path(output_file) - if output_path.exists(): - lines = output_path.read_text().strip().split('\n') - print(f"๐Ÿ“Š Generated {len(lines)} lines of output") - print("๐Ÿ” First 3 lines:") - for i, line in enumerate(lines[:3]): - print(f" {i+1}: {line}") - else: - print(f"โŒ Source connector failed with exit code: {result.returncode}") - print(f"stderr: {result.stderr}") + cmd = [ + "docker", "run", "--rm", + "-v", f"{config_file}:/tmp/config.json", + "-v", f"{catalog_file}:/tmp/catalog.json", + CONNECTOR_IMAGE, + "read", + "--config", "/tmp/config.json", + "--catalog", "/tmp/catalog.json" + ] - finally: - config_file.unlink(missing_ok=True) - catalog_file.unlink(missing_ok=True) - - -if __name__ == "__main__": - print("๐Ÿงช Manual Source Connector Test") - print("=" * 40) - - print("\n๐Ÿ“‹ Testing with INFINITE_FEED configuration...") - run_source_connector( - config_type="INFINITE_FEED", - max_records=5, - output_file="/tmp/manual_source_infinite_feed.jsonl" - ) + with open(OUTPUT_FILE, "w") as f: + subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True, timeout=60) - print("\n๐Ÿ“‹ Testing with BENCHMARK configuration...") - run_source_connector( - config_type="BENCHMARK", - max_records=5, - output_file="/tmp/manual_source_benchmark.jsonl" - ) + config_file.unlink(missing_ok=True) + catalog_file.unlink(missing_ok=True) - print("\nโœจ Manual testing complete!") - print("๐Ÿ’ก Examine the output files to understand connector message formats") + lines = Path(OUTPUT_FILE).read_text().strip().split('\n') + print(f"Generated {len(lines)} lines in {OUTPUT_FILE}") + for i, line in enumerate(lines[:3]): + print(f" {i+1}: {line}") + +if __name__ == "__main__": + print("Manual Source Connector Test") + run_connector() + print("Complete!") From 57c9863fafbc2897b6c7617f314f42966c9d5d10 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 22:16:55 +0000 Subject: [PATCH 24/25] fix: Address lint and type checking issues - Fix ruff lint issues in get_benchmark_source(): use set literal for membership testing and remove unnecessary else - Add return type annotation to run_connector() function in example script - All local checks now pass: ruff format, ruff check, mypy Co-Authored-By: AJ Steers --- airbyte/sources/util.py | 31 ++++++++--------- examples/manual_connector_test.py | 56 ++++++++++++++++++------------- 2 files changed, 49 insertions(+), 38 deletions(-) diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index 3b9d0ccf..84410367 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -172,8 +172,8 @@ def get_benchmark_source( if connector_name is None: connector_name = "source-faker" - - if connector_name not in ["source-faker", "source-e2e-test"]: + + if connector_name not in {"source-faker", "source-e2e-test"}: raise PyAirbyteInputError( message="Invalid connector name for benchmarking.", input_value=connector_name, @@ -190,19 +190,20 @@ def get_benchmark_source( streams="*", install_if_missing=install_if_missing, ) - else: # source-e2e-test - return get_source( - name="source-e2e-test", - docker_image=True, - config={ - "type": "INFINITE_FEED", - "max_records": num_records, - "seed": 0, - "message_interval": 1000, - }, - streams="*", - install_if_missing=install_if_missing, - ) + + # source-e2e-test + return get_source( + name="source-e2e-test", + docker_image=True, + config={ + "type": "INFINITE_FEED", + "max_records": num_records, + "seed": 0, + "message_interval": 1000, + }, + streams="*", + install_if_missing=install_if_missing, + ) __all__ = [ diff --git a/examples/manual_connector_test.py b/examples/manual_connector_test.py index 6093e536..a20c49bc 100644 --- a/examples/manual_connector_test.py +++ b/examples/manual_connector_test.py @@ -11,49 +11,59 @@ "type": "INFINITE_FEED", "max_records": 5, "seed": 0, - "message_interval": 1000 + "message_interval": 1000, } CATALOG = { - "streams": [{ - "stream": { - "name": "data", - "json_schema": {}, - "supported_sync_modes": ["full_refresh"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }] + "streams": [ + { + "stream": { + "name": "data", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + } + ] } OUTPUT_FILE = "/tmp/manual_source_output.jsonl" -def run_connector(): + +def run_connector() -> None: """Run the source connector and capture its output.""" config_file = Path(tempfile.mktemp(suffix=".json")) catalog_file = Path(tempfile.mktemp(suffix=".json")) - + config_file.write_text(json.dumps(CONFIG, indent=2)) catalog_file.write_text(json.dumps(CATALOG, indent=2)) - + cmd = [ - "docker", "run", "--rm", - "-v", f"{config_file}:/tmp/config.json", - "-v", f"{catalog_file}:/tmp/catalog.json", + "docker", + "run", + "--rm", + "-v", + f"{config_file}:/tmp/config.json", + "-v", + f"{catalog_file}:/tmp/catalog.json", CONNECTOR_IMAGE, "read", - "--config", "/tmp/config.json", - "--catalog", "/tmp/catalog.json" + "--config", + "/tmp/config.json", + "--catalog", + "/tmp/catalog.json", ] - + with open(OUTPUT_FILE, "w") as f: subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True, timeout=60) - + config_file.unlink(missing_ok=True) catalog_file.unlink(missing_ok=True) - - lines = Path(OUTPUT_FILE).read_text().strip().split('\n') + + lines = Path(OUTPUT_FILE).read_text().strip().split("\n") print(f"Generated {len(lines)} lines in {OUTPUT_FILE}") for i, line in enumerate(lines[:3]): - print(f" {i+1}: {line}") + print(f" {i + 1}: {line}") + if __name__ == "__main__": print("Manual Source Connector Test") From dbb96d02f023533caa4d5d31f5a96815f9eb9116 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 Aug 2025 22:20:11 +0000 Subject: [PATCH 25/25] security: Replace insecure tempfile.mktemp() with secure tempfile.mkstemp() - Fix CodeQL security alerts (2 high severity) by using secure temporary file creation - tempfile.mktemp() is deprecated and insecure due to race conditions - tempfile.mkstemp() creates files securely and returns (fd, path) tuple - All local checks pass: ruff format, ruff check, mypy - Example script tested and works correctly Co-Authored-By: AJ Steers --- examples/manual_connector_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/manual_connector_test.py b/examples/manual_connector_test.py index a20c49bc..2f4c4c87 100644 --- a/examples/manual_connector_test.py +++ b/examples/manual_connector_test.py @@ -31,8 +31,8 @@ def run_connector() -> None: """Run the source connector and capture its output.""" - config_file = Path(tempfile.mktemp(suffix=".json")) - catalog_file = Path(tempfile.mktemp(suffix=".json")) + config_file = Path(tempfile.mkstemp(suffix=".json")[1]) + catalog_file = Path(tempfile.mkstemp(suffix=".json")[1]) config_file.write_text(json.dumps(CONFIG, indent=2)) catalog_file.write_text(json.dumps(CATALOG, indent=2))