Skip to content

Feat: Improved exception logging #351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ def check(self) -> None:
except exc.AirbyteConnectorFailedError as ex:
raise exc.AirbyteConnectorCheckFailedError(
connector_name=self.name,
log_text=ex.log_text,
) from ex
original_exception=ex,
) from None

def install(self) -> None:
"""Install the connector if it is not yet installed."""
Expand Down Expand Up @@ -373,11 +373,22 @@ def _execute(
# This is likely a log message, so log it as INFO.
self._print_info_message(line)

except exc.AirbyteSubprocessFailedError as ex:
# Generic subprocess failure, so raise a connector error.
raise exc.AirbyteConnectorFailedError(
connector_name=self.name,
log_text=ex.log_text,
context={
"exit_code": ex.exit_code,
},
) from None
except Exception as e:
# This is an unexpected error, so wrap the original exception when raising.
raise exc.AirbyteConnectorFailedError(
connector_name=self.name,
log_text=self._last_log_messages,
) from e
original_exception=e,
) from None


__all__ = [
Expand Down
15 changes: 14 additions & 1 deletion airbyte/_executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def _pump_input(
try:
pipe.writelines(message.model_dump_json() + "\n" for message in messages)
pipe.flush() # Ensure data is sent immediately
except BrokenPipeError:
# If the pipe is broken, ignore the exception
# The subprocess will handle the error
exception_holder.set_exception(exc.AirbyteConnectorBrokenPipeError())
except Exception as ex:
exception_holder.set_exception(ex)

Expand Down Expand Up @@ -84,7 +88,9 @@ def _stream_from_subprocess(
)
input_thread.start()
input_thread.join() # Ensure the input thread has finished
if exception_holder.exception:
if exception_holder.exception and not isinstance(
exception_holder.exception, exc.AirbyteConnectorBrokenPipeError
):
raise exception_holder.exception

else:
Expand Down Expand Up @@ -133,6 +139,13 @@ def _stream_from_subprocess(
raise exc.AirbyteSubprocessFailedError(
run_args=args,
exit_code=exit_code,
original_exception=(
exception_holder.exception
if not isinstance(
exception_holder.exception, exc.AirbyteConnectorBrokenPipeError
)
else None
),
)


Expand Down
3 changes: 2 additions & 1 deletion airbyte/destinations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ def _write_airbyte_message_stream(
raise exc.AirbyteConnectorWriteError(
connector_name=self.name,
log_text=self._last_log_messages,
) from ex
original_exception=ex,
) from None


__all__ = [
Expand Down
60 changes: 46 additions & 14 deletions airbyte/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
DOCS_URL_BASE = "https://airbytehq.github.io/PyAirbyte"
DOCS_URL = f"{DOCS_URL_BASE}/airbyte.html"

VERTICAL_SEPARATOR = "\n" + "-" * 60


# Base error class

Expand All @@ -68,6 +70,7 @@ class PyAirbyteError(Exception):
log_file: Path | None = None
context: dict[str, Any] | None = None
message: str | None = None
original_exception: Exception | None = None

def get_message(self) -> str:
"""Return the best description for the exception.
Expand All @@ -83,7 +86,15 @@ def get_message(self) -> str:

def __str__(self) -> str:
"""Return a string representation of the exception."""
special_properties = ["message", "guidance", "help_url", "log_text", "context", "log_file"]
special_properties = [
"message",
"guidance",
"help_url",
"log_text",
"context",
"log_file",
"original_exception",
]
display_properties = {
k: v
for k, v in self.__dict__.items()
Expand All @@ -93,24 +104,32 @@ def __str__(self) -> str:
context_str = "\n ".join(
f"{str(k).replace('_', ' ').title()}: {v!r}" for k, v in display_properties.items()
)
exception_str = f"{self.__class__.__name__}: {self.get_message()}\n"
exception_str = (
f"{self.get_message()} ({self.__class__.__name__})"
+ VERTICAL_SEPARATOR
+ f"\n{self.__class__.__name__}: {self.get_message()}"
)

if self.guidance:
exception_str += f"\n {self.guidance}"

if self.help_url:
exception_str += f"\n More info: {self.help_url}"

if context_str:
exception_str += " " + context_str
exception_str += "\n " + context_str

if self.log_file:
exception_str += f"\n Log file: {self.log_file.absolute()!s}"

if self.log_text:
if isinstance(self.log_text, list):
self.log_text = "\n".join(self.log_text)

exception_str += f"\n Log output: \n {indent(self.log_text, ' ')}"

if self.log_file:
exception_str += f"\n Log file: {self.log_file.absolute()!s}"

if self.guidance:
exception_str += f"\n Suggestion: {self.guidance}"

if self.help_url:
exception_str += f"\n More info: {self.help_url}"
if self.original_exception:
exception_str += VERTICAL_SEPARATOR + f"\nCaused by: {self.original_exception!s}"

return exception_str

Expand Down Expand Up @@ -275,6 +294,8 @@ class AirbyteConnectorError(PyAirbyteError):
def __post_init__(self) -> None:
"""Set the log file path for the connector."""
self.log_file = self._get_log_file()
if not self.guidance and self.log_file:
self.guidance = "Please review the log file for more information."

def _get_log_file(self) -> Path | None:
"""Return the log file path for the connector."""
Expand All @@ -293,6 +314,17 @@ def _get_log_file(self) -> Path | None:
return None


class AirbyteConnectorBrokenPipeError(AirbyteConnectorError, BrokenPipeError):
"""Broken pipe error occurred.

This indicates that an upstream source failed and closed the pipe.
"""

guidance = (
"Please check the upstream source logs for more information on the cause of the failure."
)


class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError):
"""Connector executable not found."""

Expand All @@ -306,15 +338,15 @@ class AirbyteConnectorReadError(AirbyteConnectorError):


class AirbyteConnectorWriteError(AirbyteConnectorError):
"""Error when reading from the connector."""
"""Error when writing to the connector."""


class AirbyteConnectorSpecFailedError(AirbyteConnectorError):
"""Error when reading from the connector."""
"""Error when getting spec from the connector."""


class AirbyteConnectorDiscoverFailedError(AirbyteConnectorError):
"""Error when reading from the connector."""
"""Error when running discovery on the connector."""


class AirbyteNoDataFromConnectorError(AirbyteConnectorError):
Expand Down