diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 33d0852a..9de6faff 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -298,8 +298,8 @@ def check(self) -> None: except exc.AirbyteConnectorFailedError as ex: raise exc.AirbyteConnectorCheckFailedError( connector_name=self.name, - log_text=ex.log_text, - ) from ex + original_exception=ex, + ) from None def install(self) -> None: """Install the connector if it is not yet installed.""" @@ -373,11 +373,22 @@ def _execute( # This is likely a log message, so log it as INFO. self._print_info_message(line) + except exc.AirbyteSubprocessFailedError as ex: + # Generic subprocess failure, so raise a connector error. + raise exc.AirbyteConnectorFailedError( + connector_name=self.name, + log_text=ex.log_text, + context={ + "exit_code": ex.exit_code, + }, + ) from None except Exception as e: + # This is an unexpected error, so wrap the original exception when raising. raise exc.AirbyteConnectorFailedError( connector_name=self.name, log_text=self._last_log_messages, - ) from e + original_exception=e, + ) from None __all__ = [ diff --git a/airbyte/_executors/base.py b/airbyte/_executors/base.py index 7b405755..463de068 100644 --- a/airbyte/_executors/base.py +++ b/airbyte/_executors/base.py @@ -84,7 +84,12 @@ def _stream_from_subprocess( ) input_thread.start() input_thread.join() # Ensure the input thread has finished - if exception_holder.exception: + + # Don't bother raising broken pipe errors, as they only + # indicate that a subprocess has terminated early. + if exception_holder.exception and not isinstance( + exception_holder.exception, BrokenPipeError + ): raise exception_holder.exception else: @@ -133,6 +138,11 @@ def _stream_from_subprocess( raise exc.AirbyteSubprocessFailedError( run_args=args, exit_code=exit_code, + original_exception=( + exception_holder.exception + if not isinstance(exception_holder.exception, BrokenPipeError) + else None + ), ) diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index 55ff9436..22133513 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -295,7 +295,8 @@ def _write_airbyte_message_stream( raise exc.AirbyteConnectorWriteError( connector_name=self.name, log_text=self._last_log_messages, - ) from ex + original_exception=ex, + ) from None __all__ = [ diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index b4d60e73..0713bb64 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -54,6 +54,8 @@ DOCS_URL_BASE = "https://airbytehq.github.io/PyAirbyte" DOCS_URL = f"{DOCS_URL_BASE}/airbyte.html" +VERTICAL_SEPARATOR = "\n" + "-" * 60 + # Base error class @@ -68,6 +70,7 @@ class PyAirbyteError(Exception): log_file: Path | None = None context: dict[str, Any] | None = None message: str | None = None + original_exception: Exception | None = None def get_message(self) -> str: """Return the best description for the exception. @@ -83,7 +86,15 @@ def get_message(self) -> str: def __str__(self) -> str: """Return a string representation of the exception.""" - special_properties = ["message", "guidance", "help_url", "log_text", "context", "log_file"] + special_properties = [ + "message", + "guidance", + "help_url", + "log_text", + "context", + "log_file", + "original_exception", + ] display_properties = { k: v for k, v in self.__dict__.items() @@ -93,9 +104,23 @@ def __str__(self) -> str: context_str = "\n ".join( f"{str(k).replace('_', ' ').title()}: {v!r}" for k, v in display_properties.items() ) - exception_str = f"{self.__class__.__name__}: {self.get_message()}\n" + exception_str = ( + f"{self.get_message()} ({self.__class__.__name__})" + + VERTICAL_SEPARATOR + + f"\n{self.__class__.__name__}: {self.get_message()}" + ) + + if self.guidance: + exception_str += f"\n {self.guidance}" + + if self.help_url: + exception_str += f"\n More info: {self.help_url}" + if context_str: - exception_str += " " + context_str + exception_str += "\n " + context_str + + if self.log_file: + exception_str += f"\n Log file: {self.log_file.absolute()!s}" if self.log_text: if isinstance(self.log_text, list): @@ -103,14 +128,8 @@ def __str__(self) -> str: exception_str += f"\n Log output: \n {indent(self.log_text, ' ')}" - if self.log_file: - exception_str += f"\n Log file: {self.log_file.absolute()!s}" - - if self.guidance: - exception_str += f"\n Suggestion: {self.guidance}" - - if self.help_url: - exception_str += f"\n More info: {self.help_url}" + if self.original_exception: + exception_str += VERTICAL_SEPARATOR + f"\nCaused by: {self.original_exception!s}" return exception_str @@ -275,6 +294,8 @@ class AirbyteConnectorError(PyAirbyteError): def __post_init__(self) -> None: """Set the log file path for the connector.""" self.log_file = self._get_log_file() + if not self.guidance and self.log_file: + self.guidance = "Please review the log file for more information." def _get_log_file(self) -> Path | None: """Return the log file path for the connector.""" @@ -306,15 +327,15 @@ class AirbyteConnectorReadError(AirbyteConnectorError): class AirbyteConnectorWriteError(AirbyteConnectorError): - """Error when reading from the connector.""" + """Error when writing to the connector.""" class AirbyteConnectorSpecFailedError(AirbyteConnectorError): - """Error when reading from the connector.""" + """Error when getting spec from the connector.""" class AirbyteConnectorDiscoverFailedError(AirbyteConnectorError): - """Error when reading from the connector.""" + """Error when running discovery on the connector.""" class AirbyteNoDataFromConnectorError(AirbyteConnectorError):