From 399fa092724469b3c2a85819b0feef1ac9711b7e Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 1 Sep 2024 12:37:33 -0700 Subject: [PATCH 01/11] drive-by-fix: improved docstrings --- airbyte/exceptions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index b4d60e73..aebd5b2e 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -306,15 +306,15 @@ class AirbyteConnectorReadError(AirbyteConnectorError): class AirbyteConnectorWriteError(AirbyteConnectorError): - """Error when reading from the connector.""" + """Error when writing to the connector.""" class AirbyteConnectorSpecFailedError(AirbyteConnectorError): - """Error when reading from the connector.""" + """Error when getting spec from the connector.""" class AirbyteConnectorDiscoverFailedError(AirbyteConnectorError): - """Error when reading from the connector.""" + """Error when running discovery on the connector.""" class AirbyteNoDataFromConnectorError(AirbyteConnectorError): From 42a0dad53dfb0e247f9b618cda0fcec54df0f326 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 1 Sep 2024 12:38:07 -0700 Subject: [PATCH 02/11] drive-by-improvement: add brokenpipe handling --- airbyte/_executors/base.py | 4 ++++ airbyte/exceptions.py | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/airbyte/_executors/base.py b/airbyte/_executors/base.py index 7b405755..70216a92 100644 --- a/airbyte/_executors/base.py +++ b/airbyte/_executors/base.py @@ -42,6 +42,10 @@ def _pump_input( try: pipe.writelines(message.model_dump_json() + "\n" for message in messages) pipe.flush() # Ensure data is sent immediately + except BrokenPipeError: + # If the pipe is broken, ignore the exception + # The subprocess will handle the error + exception_holder.set_exception(exc.AirbyteConnectorBrokenPipeError()) except Exception as ex: exception_holder.set_exception(ex) diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index aebd5b2e..67925d9d 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -293,6 +293,17 @@ def _get_log_file(self) -> Path | None: return None +class AirbyteConnectorBrokenPipeError(AirbyteConnectorError, BrokenPipeError): + """Broken pipe error occurred. + + This indicates that an upstream source failed and closed the pipe. + """ + + guidance = ( + "Please check the upstream source logs for more information on the cause of the failure." + ) + + class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError): """Connector executable not found.""" From 7de9ca1ec63e51fcf2901c427939803d3e652a9b Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 1 Sep 2024 13:44:18 -0700 Subject: [PATCH 03/11] drive-by-improvement: streamline process failure exception prints --- airbyte/_connector_base.py | 3 ++- airbyte/_executors/base.py | 9 ++++++++- airbyte/destinations/base.py | 3 ++- airbyte/exceptions.py | 16 +++++++++++++++- 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 33d0852a..a2429a00 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -377,7 +377,8 @@ def _execute( raise exc.AirbyteConnectorFailedError( connector_name=self.name, log_text=self._last_log_messages, - ) from e + original_exception=e, + ) from None __all__ = [ diff --git a/airbyte/_executors/base.py b/airbyte/_executors/base.py index 70216a92..61a0cf70 100644 --- a/airbyte/_executors/base.py +++ b/airbyte/_executors/base.py @@ -88,7 +88,9 @@ def _stream_from_subprocess( ) input_thread.start() input_thread.join() # Ensure the input thread has finished - if exception_holder.exception: + if exception_holder.exception and not isinstance( + exception_holder.exception, exc.AirbyteConnectorBrokenPipeError + ): raise exception_holder.exception else: @@ -137,6 +139,11 @@ def _stream_from_subprocess( raise exc.AirbyteSubprocessFailedError( run_args=args, exit_code=exit_code, + context={ + "exception": str(exception_holder.exception) + if exception_holder.exception + else None, + }, ) diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index 55ff9436..22133513 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -295,7 +295,8 @@ def _write_airbyte_message_stream( raise exc.AirbyteConnectorWriteError( connector_name=self.name, log_text=self._last_log_messages, - ) from ex + original_exception=ex, + ) from None __all__ = [ diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index 67925d9d..85679230 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -68,6 +68,7 @@ class PyAirbyteError(Exception): log_file: Path | None = None context: dict[str, Any] | None = None message: str | None = None + original_exception: Exception | None = None def get_message(self) -> str: """Return the best description for the exception. @@ -83,7 +84,15 @@ def get_message(self) -> str: def __str__(self) -> str: """Return a string representation of the exception.""" - special_properties = ["message", "guidance", "help_url", "log_text", "context", "log_file"] + special_properties = [ + "message", + "guidance", + "help_url", + "log_text", + "context", + "log_file", + "original_exception", + ] display_properties = { k: v for k, v in self.__dict__.items() @@ -112,6 +121,11 @@ def __str__(self) -> str: if self.help_url: exception_str += f"\n More info: {self.help_url}" + if self.original_exception: + exception_str += ( + f"\n Caused by:\n{indent(str(self.original_exception), prefix=' ')!s}" + ) + return exception_str def __repr__(self) -> str: From e9b70166dc75328fade400b4713e99122ea75c95 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 1 Sep 2024 14:20:55 -0700 Subject: [PATCH 04/11] drive-by-improvement: prettify exceptions --- airbyte/_connector_base.py | 14 ++++++++++++-- airbyte/_executors/base.py | 12 +++++++----- airbyte/exceptions.py | 35 +++++++++++++++++++++-------------- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index a2429a00..9de6faff 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -298,8 +298,8 @@ def check(self) -> None: except exc.AirbyteConnectorFailedError as ex: raise exc.AirbyteConnectorCheckFailedError( connector_name=self.name, - log_text=ex.log_text, - ) from ex + original_exception=ex, + ) from None def install(self) -> None: """Install the connector if it is not yet installed.""" @@ -373,7 +373,17 @@ def _execute( # This is likely a log message, so log it as INFO. self._print_info_message(line) + except exc.AirbyteSubprocessFailedError as ex: + # Generic subprocess failure, so raise a connector error. + raise exc.AirbyteConnectorFailedError( + connector_name=self.name, + log_text=ex.log_text, + context={ + "exit_code": ex.exit_code, + }, + ) from None except Exception as e: + # This is an unexpected error, so wrap the original exception when raising. raise exc.AirbyteConnectorFailedError( connector_name=self.name, log_text=self._last_log_messages, diff --git a/airbyte/_executors/base.py b/airbyte/_executors/base.py index 61a0cf70..6807689f 100644 --- a/airbyte/_executors/base.py +++ b/airbyte/_executors/base.py @@ -139,11 +139,13 @@ def _stream_from_subprocess( raise exc.AirbyteSubprocessFailedError( run_args=args, exit_code=exit_code, - context={ - "exception": str(exception_holder.exception) - if exception_holder.exception - else None, - }, + original_exception=( + exception_holder.exception + if not isinstance( + exception_holder.exception, exc.AirbyteConnectorBrokenPipeError + ) + else None + ), ) diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index 85679230..94761acc 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -54,6 +54,8 @@ DOCS_URL_BASE = "https://airbytehq.github.io/PyAirbyte" DOCS_URL = f"{DOCS_URL_BASE}/airbyte.html" +VERTICAL_SEPARATOR = "\n" + "-" * 60 + # Base error class @@ -102,29 +104,32 @@ def __str__(self) -> str: context_str = "\n ".join( f"{str(k).replace('_', ' ').title()}: {v!r}" for k, v in display_properties.items() ) - exception_str = f"{self.__class__.__name__}: {self.get_message()}\n" - if context_str: - exception_str += " " + context_str + exception_str = ( + f"{self.get_message()} ({self.__class__.__name__})" + + VERTICAL_SEPARATOR + + f"\n{self.__class__.__name__}: {self.get_message()}" + ) - if self.log_text: - if isinstance(self.log_text, list): - self.log_text = "\n".join(self.log_text) + if self.guidance: + exception_str += f"\n {self.guidance}" - exception_str += f"\n Log output: \n {indent(self.log_text, ' ')}" + if self.help_url: + exception_str += f"\n More info: {self.help_url}" + + if context_str: + exception_str += "\n " + context_str if self.log_file: exception_str += f"\n Log file: {self.log_file.absolute()!s}" - if self.guidance: - exception_str += f"\n Suggestion: {self.guidance}" + if self.log_text: + if isinstance(self.log_text, list): + self.log_text = "\n".join(self.log_text) - if self.help_url: - exception_str += f"\n More info: {self.help_url}" + exception_str += f"\n Log output: \n {indent(self.log_text, ' ')}" if self.original_exception: - exception_str += ( - f"\n Caused by:\n{indent(str(self.original_exception), prefix=' ')!s}" - ) + exception_str += VERTICAL_SEPARATOR + f"\nCaused by: {self.original_exception!s}" return exception_str @@ -289,6 +294,8 @@ class AirbyteConnectorError(PyAirbyteError): def __post_init__(self) -> None: """Set the log file path for the connector.""" self.log_file = self._get_log_file() + if not self.guidance and self.log_file: + self.guidance = "Please review the log file for more information." def _get_log_file(self) -> Path | None: """Return the log file path for the connector.""" From 860f30a38ebcceb0113d9b30e8e4cb86fa820bb0 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 1 Sep 2024 14:33:30 -0700 Subject: [PATCH 05/11] examples: add snowflake cortex example --- examples/run_snowflake_destination.py | 93 +++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 examples/run_snowflake_destination.py diff --git a/examples/run_snowflake_destination.py b/examples/run_snowflake_destination.py new file mode 100644 index 00000000..e41014c8 --- /dev/null +++ b/examples/run_snowflake_destination.py @@ -0,0 +1,93 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +""" +Usage: + poetry install + poetry run python examples/run_snowflake_destination.py +""" + +from __future__ import annotations + +import airbyte as ab +from airbyte.caches import SnowflakeCache +from airbyte.secrets.google_gsm import GoogleGSMSecretManager + +SCALE = 10_000 + + +AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing" +secret_mgr = GoogleGSMSecretManager( + project=AIRBYTE_INTERNAL_GCP_PROJECT, + credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"), +) + +secret = secret_mgr.get_secret( + secret_name="AIRBYTE_LIB_SNOWFLAKE_CREDS", +) +assert secret is not None, "Secret not found." +secret_config = secret.parse_json() + +snowflake_destination_secret = secret_mgr.fetch_connector_secret( + connector_name="destination-snowflake", +).parse_json() +cortex_destination_secret = secret_mgr.fetch_connector_secret( + connector_name="destination-snowflake-cortex", +).parse_json() + + +source = ab.get_source( + "source-faker", + config={ + "count": SCALE, + }, + install_if_missing=True, + streams="*", +) +cache = SnowflakeCache( + account=secret_config["account"], + username=secret_config["username"], + password=secret_config["password"], + database=secret_config["database"], + warehouse=secret_config["warehouse"], + role=secret_config["role"], +) +snowflake_destination = ab.get_destination( + "destination-snowflake", + config={ + **snowflake_destination_secret, + "default_schema": "pyairbyte_tests", + }, +) +cortex_destination_secret["processing"]["text_fields"] = [ + "make", + "model", + "name", + "gender", +] +cortex_destination_secret["indexing"]["default_schema"] = "pyairbyte_tests" +cortex_destination = ab.get_destination( + "destination-snowflake-cortex", + config=cortex_destination_secret, +) +cortex_destination.print_config_spec() +# snowflake_destination.print_config_spec() +# cortex_destination.print_config_spec() +# snowflake_destination.check() +# cortex_destination.check() +# source.check() + +# # This works: +# snowflake_write_result = snowflake_destination.write( +# source, +# cache=False, # Toggle comment to test with/without caching +# ) + +# This fails with 'BrokenPipeError', but no other error logged: +cortex_write_result = cortex_destination.write( + source, + cache=False, # Toggle comment to test with/without caching +) + +# result = source.read(cache) + +# for name in ["products"]: +# print(f"Stream {name}: {len(list(result[name]))} records") From 0cde86a6a7e2be417e8e0d94a677ced7c732326a Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 1 Sep 2024 14:43:16 -0700 Subject: [PATCH 06/11] example script: fix stream list --- examples/run_snowflake_destination.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/run_snowflake_destination.py b/examples/run_snowflake_destination.py index e41014c8..95293d37 100644 --- a/examples/run_snowflake_destination.py +++ b/examples/run_snowflake_destination.py @@ -11,7 +11,7 @@ from airbyte.caches import SnowflakeCache from airbyte.secrets.google_gsm import GoogleGSMSecretManager -SCALE = 10_000 +SCALE = 100 AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing" @@ -40,7 +40,7 @@ "count": SCALE, }, install_if_missing=True, - streams="*", + streams=["products", "users"], ) cache = SnowflakeCache( account=secret_config["account"], From 1b5184e0fe71572b220d550926b948f1cca62b6d Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 1 Sep 2024 14:47:15 -0700 Subject: [PATCH 07/11] update example script --- examples/run_snowflake_destination.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/examples/run_snowflake_destination.py b/examples/run_snowflake_destination.py index 95293d37..558f12e5 100644 --- a/examples/run_snowflake_destination.py +++ b/examples/run_snowflake_destination.py @@ -68,12 +68,12 @@ "destination-snowflake-cortex", config=cortex_destination_secret, ) -cortex_destination.print_config_spec() +# cortex_destination.print_config_spec() # snowflake_destination.print_config_spec() # cortex_destination.print_config_spec() -# snowflake_destination.check() -# cortex_destination.check() -# source.check() +snowflake_destination.check() +cortex_destination.check() +source.check() # # This works: # snowflake_write_result = snowflake_destination.write( @@ -81,7 +81,6 @@ # cache=False, # Toggle comment to test with/without caching # ) -# This fails with 'BrokenPipeError', but no other error logged: cortex_write_result = cortex_destination.write( source, cache=False, # Toggle comment to test with/without caching From 85590cb538ca3530d1577ee0a0d27fdfbe9622e6 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 1 Sep 2024 17:04:50 -0700 Subject: [PATCH 08/11] Update airbyte/exceptions.py --- airbyte/exceptions.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index 94761acc..0713bb64 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -314,17 +314,6 @@ def _get_log_file(self) -> Path | None: return None -class AirbyteConnectorBrokenPipeError(AirbyteConnectorError, BrokenPipeError): - """Broken pipe error occurred. - - This indicates that an upstream source failed and closed the pipe. - """ - - guidance = ( - "Please check the upstream source logs for more information on the cause of the failure." - ) - - class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError): """Connector executable not found.""" From 74f5e4cb4c62a8e9c5503ff5a667bd0096256b52 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 1 Sep 2024 17:05:19 -0700 Subject: [PATCH 09/11] Update airbyte/_executors/base.py --- airbyte/_executors/base.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/airbyte/_executors/base.py b/airbyte/_executors/base.py index 2e17aefc..463de068 100644 --- a/airbyte/_executors/base.py +++ b/airbyte/_executors/base.py @@ -42,10 +42,6 @@ def _pump_input( try: pipe.writelines(message.model_dump_json() + "\n" for message in messages) pipe.flush() # Ensure data is sent immediately - except BrokenPipeError: - # If the pipe is broken, ignore the exception - # The subprocess will handle the error - exception_holder.set_exception(exc.AirbyteConnectorBrokenPipeError()) except Exception as ex: exception_holder.set_exception(ex) From b048edcb46fff8ecf8726c47ba6c1ff114c616aa Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 6 Sep 2024 09:11:13 -0700 Subject: [PATCH 10/11] clean up sample script --- examples/run_snowflake_destination.py | 124 ++++++++++---------------- 1 file changed, 46 insertions(+), 78 deletions(-) diff --git a/examples/run_snowflake_destination.py b/examples/run_snowflake_destination.py index 558f12e5..c3586b0f 100644 --- a/examples/run_snowflake_destination.py +++ b/examples/run_snowflake_destination.py @@ -8,85 +8,53 @@ from __future__ import annotations import airbyte as ab -from airbyte.caches import SnowflakeCache from airbyte.secrets.google_gsm import GoogleGSMSecretManager SCALE = 100 - -AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing" -secret_mgr = GoogleGSMSecretManager( - project=AIRBYTE_INTERNAL_GCP_PROJECT, - credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"), -) - -secret = secret_mgr.get_secret( - secret_name="AIRBYTE_LIB_SNOWFLAKE_CREDS", -) -assert secret is not None, "Secret not found." -secret_config = secret.parse_json() - -snowflake_destination_secret = secret_mgr.fetch_connector_secret( - connector_name="destination-snowflake", -).parse_json() -cortex_destination_secret = secret_mgr.fetch_connector_secret( - connector_name="destination-snowflake-cortex", -).parse_json() - - -source = ab.get_source( - "source-faker", - config={ - "count": SCALE, - }, - install_if_missing=True, - streams=["products", "users"], -) -cache = SnowflakeCache( - account=secret_config["account"], - username=secret_config["username"], - password=secret_config["password"], - database=secret_config["database"], - warehouse=secret_config["warehouse"], - role=secret_config["role"], -) -snowflake_destination = ab.get_destination( - "destination-snowflake", - config={ - **snowflake_destination_secret, - "default_schema": "pyairbyte_tests", - }, -) -cortex_destination_secret["processing"]["text_fields"] = [ - "make", - "model", - "name", - "gender", -] -cortex_destination_secret["indexing"]["default_schema"] = "pyairbyte_tests" -cortex_destination = ab.get_destination( - "destination-snowflake-cortex", - config=cortex_destination_secret, -) -# cortex_destination.print_config_spec() -# snowflake_destination.print_config_spec() -# cortex_destination.print_config_spec() -snowflake_destination.check() -cortex_destination.check() -source.check() - -# # This works: -# snowflake_write_result = snowflake_destination.write( -# source, -# cache=False, # Toggle comment to test with/without caching -# ) - -cortex_write_result = cortex_destination.write( - source, - cache=False, # Toggle comment to test with/without caching -) - -# result = source.read(cache) - -# for name in ["products"]: -# print(f"Stream {name}: {len(list(result[name]))} records") +def get_secret_config() -> dict: + AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing" + secret_mgr = GoogleGSMSecretManager( + project=AIRBYTE_INTERNAL_GCP_PROJECT, + credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"), + ) + + secret = secret_mgr.get_secret( + secret_name="AIRBYTE_LIB_SNOWFLAKE_CREDS", + ) + assert secret is not None, "Secret not found." + secret_config = secret.parse_json() + + cortex_destination_secret = secret_mgr.fetch_connector_secret( + connector_name="destination-snowflake-cortex", + ).parse_json() + return cortex_destination_secret + + +def sync_to_cortex() -> None: + source = ab.get_source( + "source-faker", + config={ + "count": SCALE, + }, + install_if_missing=True, + streams=["products", "users"], + ) + source.check() + cortex_config = get_secret_config() + cortex_config["processing"]["text_fields"] = [ + "make", + "model", + "name", + "gender", + ] + cortex_config["indexing"]["default_schema"] = "pyairbyte_demo" + cortex_destination = ab.get_destination( + "destination-snowflake-cortex", + config=cortex_config, + ) + cortex_destination.write(source) + + +if __name__ == "__main__": + sync_to_cortex() From ddd699e7d60ce9e13e892a2f4784d7473d66bbc7 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 6 Sep 2024 18:46:23 +0000 Subject: [PATCH 11/11] Auto-fix lint and format issues --- examples/run_snowflake_destination.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/run_snowflake_destination.py b/examples/run_snowflake_destination.py index c3586b0f..0467c8b4 100644 --- a/examples/run_snowflake_destination.py +++ b/examples/run_snowflake_destination.py @@ -12,6 +12,7 @@ SCALE = 100 + def get_secret_config() -> dict: AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing" secret_mgr = GoogleGSMSecretManager(