From a2d6f27426c0c8d55395215bd712db12170b27b3 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 16 Sep 2025 01:07:47 +0000 Subject: [PATCH 1/2] feat(cloud): Add log reading capabilities to SyncAttempt - Add get_log_text_tail() method for retrieving last N lines of logs - Add get_full_log_text() method for complete log retrieval - Use Config API /v1/attempts/get_for_job endpoint for log access - Handle both structured (LogEvents) and unstructured (LogRead) formats - Fix field names to match Config API schema (events vs logEvents) - Follow existing PyAirbyte patterns for lazy loading and caching Co-Authored-By: AJ Steers --- airbyte/cloud/sync_results.py | 77 +++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/airbyte/cloud/sync_results.py b/airbyte/cloud/sync_results.py index feb7d41b..3f170c24 100644 --- a/airbyte/cloud/sync_results.py +++ b/airbyte/cloud/sync_results.py @@ -182,6 +182,83 @@ def _fetch_attempt_info(self) -> dict[str, Any]: ) return self._attempt_info + def get_log_text_tail(self, num_lines: int = 1000) -> str: + """Return the last N lines of log text for this attempt. + + Args: + num_lines: Maximum number of lines to return from the end of the logs. + Defaults to 1000 lines. + + Returns: + String containing the last N lines of log text, with lines separated by newlines. + """ + attempt_info = self._fetch_attempt_info() + logs_data = attempt_info.get("logs") + + if not logs_data: + return "" + + if "events" in logs_data: + log_events = logs_data["events"] + if not log_events: + return "" + + tail_events = log_events[-num_lines:] if len(log_events) > num_lines else log_events + log_lines = [] + + for event in tail_events: + timestamp = event.get("timestamp", "") + level = event.get("level", "INFO") + message = event.get("message", "") + log_lines.append(f"[{timestamp}] {level}: {message}") + + return "\n".join(log_lines) + + if "logLines" in logs_data: + log_lines = logs_data["logLines"] + if not log_lines: + return "" + + tail_lines = log_lines[-num_lines:] if len(log_lines) > num_lines else log_lines + return "\n".join(tail_lines) + + return "" + + def get_full_log_text(self) -> str: + """Return the complete log text for this attempt. + + Returns: + String containing all log text for this attempt, with lines separated by newlines. + """ + attempt_info = self._fetch_attempt_info() + logs_data = attempt_info.get("logs") + + if not logs_data: + return "" + + if "events" in logs_data: + log_events = logs_data["events"] + if not log_events: + return "" + + log_lines = [] + for event in log_events: + timestamp = event.get("timestamp", "") + level = event.get("level", "INFO") + message = event.get("message", "") + log_lines.append(f"[{timestamp}] {level}: {message}") + + return "\n".join(log_lines) + + if "logLines" in logs_data: + log_lines = logs_data["logLines"] + if not log_lines: + return "" + + return "\n".join(log_lines) + + return "" + @dataclass class SyncResult: From 4d07482703bae27a19833f193b4f3d84e09728c5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 16 Sep 2025 01:13:16 +0000 Subject: [PATCH 2/2] fix(cloud): Remove get_log_text_tail() method as requested - Address GitHub comment from @aaronsteers - Keep only get_full_log_text() method for complete log retrieval - Simplify SyncAttempt log interface to single method Co-Authored-By: AJ Steers --- airbyte/cloud/sync_results.py | 42 ----------------------------------- 1 file changed, 42 deletions(-) diff --git a/airbyte/cloud/sync_results.py b/airbyte/cloud/sync_results.py index 3f170c24..a75a8589 100644 --- a/airbyte/cloud/sync_results.py +++ b/airbyte/cloud/sync_results.py @@ -182,48 +182,6 @@ def _fetch_attempt_info(self) -> dict[str, Any]: ) return self._attempt_info - def get_log_text_tail(self, num_lines: int = 1000) -> str: - """Return the last N lines of log text for this attempt. - - Args: - num_lines: Maximum number of lines to return from the end of the logs. - Defaults to 1000 lines. - - Returns: - String containing the last N lines of log text, with lines separated by newlines. - """ - attempt_info = self._fetch_attempt_info() - logs_data = attempt_info.get("logs") - - if not logs_data: - return "" - - if "events" in logs_data: - log_events = logs_data["events"] - if not log_events: - return "" - - tail_events = log_events[-num_lines:] if len(log_events) > num_lines else log_events - log_lines = [] - - for event in tail_events: - timestamp = event.get("timestamp", "") - level = event.get("level", "INFO") - message = event.get("message", "") - log_lines.append(f"[{timestamp}] {level}: {message}") - - return "\n".join(log_lines) - - if "logLines" in logs_data: - log_lines = logs_data["logLines"] - if not log_lines: - return "" - - tail_lines = log_lines[-num_lines:] if len(log_lines) > num_lines else log_lines - return "\n".join(tail_lines) - - return "" - def get_full_log_text(self) -> str: """Return the complete log text for this attempt.