From 4c5185136266f7356eec2d1d5e37bc462666df6e Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 1 Sep 2024 12:37:33 -0700 Subject: [PATCH 1/5] drive-by-fix: improved docstrings --- airbyte/exceptions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index b4d60e73..aebd5b2e 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -306,15 +306,15 @@ class AirbyteConnectorReadError(AirbyteConnectorError): class AirbyteConnectorWriteError(AirbyteConnectorError): - """Error when reading from the connector.""" + """Error when writing to the connector.""" class AirbyteConnectorSpecFailedError(AirbyteConnectorError): - """Error when reading from the connector.""" + """Error when getting spec from the connector.""" class AirbyteConnectorDiscoverFailedError(AirbyteConnectorError): - """Error when reading from the connector.""" + """Error when running discovery on the connector.""" class AirbyteNoDataFromConnectorError(AirbyteConnectorError): From be36348767c0807864b1ab3292ed8c3e7f94faca Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 1 Sep 2024 12:38:07 -0700 Subject: [PATCH 2/5] drive-by-improvement: add brokenpipe handling --- airbyte/_executors/base.py | 4 ++++ airbyte/exceptions.py | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/airbyte/_executors/base.py b/airbyte/_executors/base.py index 7b405755..70216a92 100644 --- a/airbyte/_executors/base.py +++ b/airbyte/_executors/base.py @@ -42,6 +42,10 @@ def _pump_input( try: pipe.writelines(message.model_dump_json() + "\n" for message in messages) pipe.flush() # Ensure data is sent immediately + except BrokenPipeError: + # If the pipe is broken, ignore the exception + # The subprocess will handle the error + exception_holder.set_exception(exc.AirbyteConnectorBrokenPipeError()) except Exception as ex: exception_holder.set_exception(ex) diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index aebd5b2e..67925d9d 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -293,6 +293,17 @@ def _get_log_file(self) -> Path | None: return None +class AirbyteConnectorBrokenPipeError(AirbyteConnectorError, BrokenPipeError): + """Broken pipe error occurred. + + This indicates that an upstream source failed and closed the pipe. + """ + + guidance = ( + "Please check the upstream source logs for more information on the cause of the failure." + ) + + class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError): """Connector executable not found.""" From 22af3d42dcd53fb7bf83c74e9fa7cc9e8a1749f5 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 1 Sep 2024 13:44:18 -0700 Subject: [PATCH 3/5] drive-by-improvement: streamline process failure exception prints --- airbyte/_connector_base.py | 3 ++- airbyte/_executors/base.py | 9 ++++++++- airbyte/destinations/base.py | 3 ++- airbyte/exceptions.py | 16 +++++++++++++++- 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 33d0852a..a2429a00 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -377,7 +377,8 @@ def _execute( raise exc.AirbyteConnectorFailedError( connector_name=self.name, log_text=self._last_log_messages, - ) from e + original_exception=e, + ) from None __all__ = [ diff --git a/airbyte/_executors/base.py b/airbyte/_executors/base.py index 70216a92..61a0cf70 100644 --- a/airbyte/_executors/base.py +++ b/airbyte/_executors/base.py @@ -88,7 +88,9 @@ def _stream_from_subprocess( ) input_thread.start() input_thread.join() # Ensure the input thread has finished - if exception_holder.exception: + if exception_holder.exception and not isinstance( + exception_holder.exception, exc.AirbyteConnectorBrokenPipeError + ): raise exception_holder.exception else: @@ -137,6 +139,11 @@ def _stream_from_subprocess( raise exc.AirbyteSubprocessFailedError( run_args=args, exit_code=exit_code, + context={ + "exception": str(exception_holder.exception) + if exception_holder.exception + else None, + }, ) diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index 55ff9436..22133513 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -295,7 +295,8 @@ def _write_airbyte_message_stream( raise exc.AirbyteConnectorWriteError( connector_name=self.name, log_text=self._last_log_messages, - ) from ex + original_exception=ex, + ) from None __all__ = [ diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index 67925d9d..85679230 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -68,6 +68,7 @@ class PyAirbyteError(Exception): log_file: Path | None = None context: dict[str, Any] | None = None message: str | None = None + original_exception: Exception | None = None def get_message(self) -> str: """Return the best description for the exception. @@ -83,7 +84,15 @@ def get_message(self) -> str: def __str__(self) -> str: """Return a string representation of the exception.""" - special_properties = ["message", "guidance", "help_url", "log_text", "context", "log_file"] + special_properties = [ + "message", + "guidance", + "help_url", + "log_text", + "context", + "log_file", + "original_exception", + ] display_properties = { k: v for k, v in self.__dict__.items() @@ -112,6 +121,11 @@ def __str__(self) -> str: if self.help_url: exception_str += f"\n More info: {self.help_url}" + if self.original_exception: + exception_str += ( + f"\n Caused by:\n{indent(str(self.original_exception), prefix=' ')!s}" + ) + return exception_str def __repr__(self) -> str: From a00d7e92d955c476399e33e7215d89072afcc9a6 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 1 Sep 2024 14:20:55 -0700 Subject: [PATCH 4/5] drive-by-improvement: prettify exceptions --- airbyte/_connector_base.py | 14 ++++++++++++-- airbyte/_executors/base.py | 12 +++++++----- airbyte/exceptions.py | 35 +++++++++++++++++++++-------------- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index a2429a00..9de6faff 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -298,8 +298,8 @@ def check(self) -> None: except exc.AirbyteConnectorFailedError as ex: raise exc.AirbyteConnectorCheckFailedError( connector_name=self.name, - log_text=ex.log_text, - ) from ex + original_exception=ex, + ) from None def install(self) -> None: """Install the connector if it is not yet installed.""" @@ -373,7 +373,17 @@ def _execute( # This is likely a log message, so log it as INFO. self._print_info_message(line) + except exc.AirbyteSubprocessFailedError as ex: + # Generic subprocess failure, so raise a connector error. + raise exc.AirbyteConnectorFailedError( + connector_name=self.name, + log_text=ex.log_text, + context={ + "exit_code": ex.exit_code, + }, + ) from None except Exception as e: + # This is an unexpected error, so wrap the original exception when raising. raise exc.AirbyteConnectorFailedError( connector_name=self.name, log_text=self._last_log_messages, diff --git a/airbyte/_executors/base.py b/airbyte/_executors/base.py index 61a0cf70..6807689f 100644 --- a/airbyte/_executors/base.py +++ b/airbyte/_executors/base.py @@ -139,11 +139,13 @@ def _stream_from_subprocess( raise exc.AirbyteSubprocessFailedError( run_args=args, exit_code=exit_code, - context={ - "exception": str(exception_holder.exception) - if exception_holder.exception - else None, - }, + original_exception=( + exception_holder.exception + if not isinstance( + exception_holder.exception, exc.AirbyteConnectorBrokenPipeError + ) + else None + ), ) diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index 85679230..94761acc 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -54,6 +54,8 @@ DOCS_URL_BASE = "https://airbytehq.github.io/PyAirbyte" DOCS_URL = f"{DOCS_URL_BASE}/airbyte.html" +VERTICAL_SEPARATOR = "\n" + "-" * 60 + # Base error class @@ -102,29 +104,32 @@ def __str__(self) -> str: context_str = "\n ".join( f"{str(k).replace('_', ' ').title()}: {v!r}" for k, v in display_properties.items() ) - exception_str = f"{self.__class__.__name__}: {self.get_message()}\n" - if context_str: - exception_str += " " + context_str + exception_str = ( + f"{self.get_message()} ({self.__class__.__name__})" + + VERTICAL_SEPARATOR + + f"\n{self.__class__.__name__}: {self.get_message()}" + ) - if self.log_text: - if isinstance(self.log_text, list): - self.log_text = "\n".join(self.log_text) + if self.guidance: + exception_str += f"\n {self.guidance}" - exception_str += f"\n Log output: \n {indent(self.log_text, ' ')}" + if self.help_url: + exception_str += f"\n More info: {self.help_url}" + + if context_str: + exception_str += "\n " + context_str if self.log_file: exception_str += f"\n Log file: {self.log_file.absolute()!s}" - if self.guidance: - exception_str += f"\n Suggestion: {self.guidance}" + if self.log_text: + if isinstance(self.log_text, list): + self.log_text = "\n".join(self.log_text) - if self.help_url: - exception_str += f"\n More info: {self.help_url}" + exception_str += f"\n Log output: \n {indent(self.log_text, ' ')}" if self.original_exception: - exception_str += ( - f"\n Caused by:\n{indent(str(self.original_exception), prefix=' ')!s}" - ) + exception_str += VERTICAL_SEPARATOR + f"\nCaused by: {self.original_exception!s}" return exception_str @@ -289,6 +294,8 @@ class AirbyteConnectorError(PyAirbyteError): def __post_init__(self) -> None: """Set the log file path for the connector.""" self.log_file = self._get_log_file() + if not self.guidance and self.log_file: + self.guidance = "Please review the log file for more information." def _get_log_file(self) -> Path | None: """Return the log file path for the connector.""" From 54e9d9b1a13b8406f6680ea5710ef096ebd0c384 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 1 Sep 2024 16:07:22 -0700 Subject: [PATCH 5/5] remove unnecessary custom class for broken pipe --- airbyte/_executors/base.py | 13 +++++-------- airbyte/exceptions.py | 11 ----------- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/airbyte/_executors/base.py b/airbyte/_executors/base.py index 6807689f..463de068 100644 --- a/airbyte/_executors/base.py +++ b/airbyte/_executors/base.py @@ -42,10 +42,6 @@ def _pump_input( try: pipe.writelines(message.model_dump_json() + "\n" for message in messages) pipe.flush() # Ensure data is sent immediately - except BrokenPipeError: - # If the pipe is broken, ignore the exception - # The subprocess will handle the error - exception_holder.set_exception(exc.AirbyteConnectorBrokenPipeError()) except Exception as ex: exception_holder.set_exception(ex) @@ -88,8 +84,11 @@ def _stream_from_subprocess( ) input_thread.start() input_thread.join() # Ensure the input thread has finished + + # Don't bother raising broken pipe errors, as they only + # indicate that a subprocess has terminated early. if exception_holder.exception and not isinstance( - exception_holder.exception, exc.AirbyteConnectorBrokenPipeError + exception_holder.exception, BrokenPipeError ): raise exception_holder.exception @@ -141,9 +140,7 @@ def _stream_from_subprocess( exit_code=exit_code, original_exception=( exception_holder.exception - if not isinstance( - exception_holder.exception, exc.AirbyteConnectorBrokenPipeError - ) + if not isinstance(exception_holder.exception, BrokenPipeError) else None ), ) diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index 94761acc..0713bb64 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -314,17 +314,6 @@ def _get_log_file(self) -> Path | None: return None -class AirbyteConnectorBrokenPipeError(AirbyteConnectorError, BrokenPipeError): - """Broken pipe error occurred. - - This indicates that an upstream source failed and closed the pipe. - """ - - guidance = ( - "Please check the upstream source logs for more information on the cause of the failure." - ) - - class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError): """Connector executable not found."""