Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c15db69
chore: update poetry.lock
devin-ai-integration[bot] Sep 10, 2025
a544cea
feat(cloud): Add basic SyncAttempt class and get_attempts() method
devin-ai-integration[bot] Sep 11, 2025
7dda7c9
fix(cloud): Update get_attempts() to return all job attempts
devin-ai-integration[bot] Sep 11, 2025
de6df3b
fix(cloud): Add timezone import to resolve MyPy error
devin-ai-integration[bot] Sep 11, 2025
8a4d5b1
fix(cloud): Suppress SLF001 lint warnings for Config API helper
devin-ai-integration[bot] Sep 11, 2025
ddd22e6
add todo about caching
aaronsteers Sep 11, 2025
f45834b
Merge branch 'main' into devin/1757543890-sync-attempt-abstraction
aaronsteers Sep 16, 2025
9cf622f
feat(cloud): Add log reading capabilities to SyncAttempt (#786)
aaronsteers Sep 16, 2025
aaf469e
feat(mcp): Add get_cloud_sync_logs tool and enhance get_cloud_sync_st…
devin-ai-integration[bot] Sep 16, 2025
425acec
fix(mcp): Address CodeRabbit feedback and resolve MyPy missing return…
devin-ai-integration[bot] Sep 16, 2025
9db2b97
fix(mcp): Remove TODO comment and fix whitespace issues
devin-ai-integration[bot] Sep 16, 2025
952c745
fix(mcp): Suppress TRY300 lint warnings with noqa comments
devin-ai-integration[bot] Sep 16, 2025
3ddc25a
Merge branch 'main' into devin/1757543890-sync-attempt-abstraction
devin-ai-integration[bot] Sep 16, 2025
284f09a
fix(mcp): Auto-register .envrc files for secret management
devin-ai-integration[bot] Sep 16, 2025
747bb1e
feat(mcp): Add MCP mode detection to prevent interactive prompts and …
devin-ai-integration[bot] Sep 16, 2025
26eb09c
refactor(meta): Address GitHub comments - rename MCP functions and si…
devin-ai-integration[bot] Sep 16, 2025
c892235
feat(mcp): Add MCP mode detection to prevent interactive prompts and …
devin-ai-integration[bot] Sep 16, 2025
18dfaf8
refactor(meta): Address GitHub comments - rename MCP functions and si…
devin-ai-integration[bot] Sep 16, 2025
c2501a6
fixes, cleanup
aaronsteers Sep 16, 2025
c30ce24
explicit prompt disablement
aaronsteers Sep 16, 2025
941ccea
Apply suggestion from @aaronsteers
aaronsteers Sep 16, 2025
7715581
Merge remote-tracking branch 'origin/devin/1758055597-fix-envrc-loadi…
aaronsteers Sep 16, 2025
c461afc
fix(cloud): Fix datetime parsing in SyncAttempt and SyncResult using …
devin-ai-integration[bot] Sep 16, 2025
e966433
fix(cloud): Fix datetime parsing in SyncAttempt and SyncResult
devin-ai-integration[bot] Sep 16, 2025
225d69a
fix(cloud): Fix duplicate /v1 in API endpoint URLs
devin-ai-integration[bot] Sep 16, 2025
0bf1bdd
feat(api): Enhance error handling in _make_config_api_request to incl…
devin-ai-integration[bot] Sep 16, 2025
52ca58f
fix(cloud): Use attempts array from jobs/get response instead of inte…
devin-ai-integration[bot] Sep 17, 2025
174845c
fix(cloud): Use explicit start=0 in enumerate for improved readability
devin-ai-integration[bot] Sep 17, 2025
083c9c4
pr cleanup
aaronsteers Sep 17, 2025
b02d09c
Merge branch 'main' into devin/1757543890-sync-attempt-abstraction
aaronsteers Sep 17, 2025
e3adb61
fix imports
aaronsteers Sep 17, 2025
4d915ad
Merge branch 'main' into devin/1757543890-sync-attempt-abstraction
aaronsteers Sep 19, 2025
f8db6c6
Merge branch 'main' into devin/1757543890-sync-attempt-abstraction
aaronsteers Sep 20, 2025
a3c8b28
fix: resolve lint and format issues for CI compliance
devin-ai-integration[bot] Sep 20, 2025
d241ea5
chore: update poetry.lock with dependency markers
devin-ai-integration[bot] Sep 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from __future__ import annotations

import json
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Literal

import airbyte_api
Expand Down Expand Up @@ -860,18 +861,27 @@ def _make_config_api_request(
"Authorization": f"Bearer {bearer_token}",
"User-Agent": "PyAirbyte Client",
}
full_url = config_api_root + path
response = requests.request(
method="POST",
url=config_api_root + path,
url=full_url,
headers=headers,
json=json,
)
if not status_ok(response.status_code):
try:
response.raise_for_status()
except requests.HTTPError as ex:
error_message = f"API request failed with status {response.status_code}"
if response.status_code == HTTPStatus.FORBIDDEN: # 403 error
error_message += f" (Forbidden) when accessing: {full_url}"
raise AirbyteError(
message=error_message,
context={
"full_url": full_url,
"config_api_root": config_api_root,
"path": path,
"status_code": response.status_code,
"url": response.request.url,
"body": response.request.body,
"response": response.__dict__,
Expand Down
142 changes: 138 additions & 4 deletions airbyte/cloud/sync_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@
import time
from collections.abc import Iterator, Mapping
from dataclasses import asdict, dataclass
from datetime import datetime
from typing import TYPE_CHECKING, Any, final
from typing import TYPE_CHECKING, Any

from typing_extensions import final

from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse

from airbyte._util import api_util
from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES
Expand All @@ -117,6 +120,8 @@
"""The default timeout for waiting for a sync job to complete, in seconds."""

if TYPE_CHECKING:
from datetime import datetime

import sqlalchemy

from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum
Expand All @@ -125,6 +130,88 @@
from airbyte.cloud.workspaces import CloudWorkspace


@dataclass
class SyncAttempt:
"""Represents a single attempt of a sync job.

**This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by
calling `.SyncResult.get_attempts()`.
"""

workspace: CloudWorkspace
connection: CloudConnection
job_id: int
attempt_number: int
_attempt_data: dict[str, Any] | None = None

@property
def attempt_id(self) -> int:
"""Return the attempt ID."""
return self._get_attempt_data()["id"]

@property
def status(self) -> str:
"""Return the attempt status."""
return self._get_attempt_data()["status"]

@property
def bytes_synced(self) -> int:
"""Return the number of bytes synced in this attempt."""
return self._get_attempt_data().get("bytesSynced", 0)

@property
def records_synced(self) -> int:
"""Return the number of records synced in this attempt."""
return self._get_attempt_data().get("recordsSynced", 0)

@property
def created_at(self) -> datetime:
"""Return the creation time of the attempt."""
timestamp = self._get_attempt_data()["createdAt"]
return ab_datetime_parse(timestamp)

def _get_attempt_data(self) -> dict[str, Any]:
"""Get attempt data from the provided attempt data."""
if self._attempt_data is None:
raise ValueError(
"Attempt data not provided. SyncAttempt should be created via "
"SyncResult.get_attempts()."
)
return self._attempt_data["attempt"]

def get_full_log_text(self) -> str:
"""Return the complete log text for this attempt.

Returns:
String containing all log text for this attempt, with lines separated by newlines.
"""
if self._attempt_data is None:
return ""

logs_data = self._attempt_data.get("logs")
if not logs_data:
return ""

result = ""

if "events" in logs_data:
log_events = logs_data["events"]
if log_events:
log_lines = []
for event in log_events:
timestamp = event.get("timestamp", "")
level = event.get("level", "INFO")
message = event.get("message", "")
log_lines.append(f"[{timestamp}] {level}: {message}")
result = "\n".join(log_lines)
elif "logLines" in logs_data:
log_lines = logs_data["logLines"]
if log_lines:
result = "\n".join(log_lines)

return result


@dataclass
class SyncResult:
"""The result of a sync operation.
Expand All @@ -141,6 +228,7 @@ class SyncResult:
_latest_job_info: JobResponse | None = None
_connection_response: ConnectionResponse | None = None
_cache: CacheBase | None = None
_job_with_attempts_info: dict[str, Any] | None = None

@property
def job_url(self) -> str:
Expand Down Expand Up @@ -213,8 +301,53 @@ def records_synced(self) -> int:
@property
def start_time(self) -> datetime:
"""Return the start time of the sync job in UTC."""
# Parse from ISO 8601 format:
return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
try:
return ab_datetime_parse(self._fetch_latest_job_info().start_time)
except (ValueError, TypeError) as e:
if "Invalid isoformat string" in str(e):
job_info_raw = api_util._make_config_api_request( # noqa: SLF001
api_root=self.workspace.api_root,
path="/jobs/get",
json={"id": self.job_id},
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
)
raw_start_time = job_info_raw.get("startTime")
if raw_start_time:
return ab_datetime_parse(raw_start_time)
raise

def _fetch_job_with_attempts(self) -> dict[str, Any]:
"""Fetch job info with attempts from Config API using lazy loading pattern."""
if self._job_with_attempts_info is not None:
return self._job_with_attempts_info

self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper
api_root=self.workspace.api_root,
path="/jobs/get",
json={
"id": self.job_id,
},
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
)
return self._job_with_attempts_info

def get_attempts(self) -> list[SyncAttempt]:
"""Return a list of attempts for this sync job."""
job_with_attempts = self._fetch_job_with_attempts()
attempts_data = job_with_attempts.get("attempts", [])

return [
SyncAttempt(
workspace=self.workspace,
connection=self.connection,
job_id=self.job_id,
attempt_number=i,
_attempt_data=attempt_data,
)
for i, attempt_data in enumerate(attempts_data, start=0)
]

def raise_failure_status(
self,
Expand Down Expand Up @@ -362,4 +495,5 @@ def __len__(self) -> int:

__all__ = [
"SyncResult",
"SyncAttempt",
]
124 changes: 116 additions & 8 deletions airbyte/mcp/_cloud_ops.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Airbyte Cloud MCP operations."""

from typing import Annotated
from typing import Annotated, Any

from fastmcp import FastMCP
from pydantic import Field

from airbyte import cloud, get_destination, get_source
from airbyte._util.api_imports import JobStatusEnum
from airbyte.cloud.auth import (
resolve_cloud_api_url,
resolve_cloud_client_id,
Expand Down Expand Up @@ -343,19 +342,64 @@ def get_cloud_sync_status(
default=None,
),
],
) -> JobStatusEnum | None:
*,
include_attempts: Annotated[
bool,
Field(
description="Whether to include detailed attempts information.",
default=False,
),
],
) -> dict[str, Any]:
"""Get the status of a sync job from the Airbyte Cloud.

By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
Airbyte Cloud API.
"""
workspace: CloudWorkspace = _get_cloud_workspace()
connection = workspace.get_connection(connection_id=connection_id)
try:
workspace: CloudWorkspace = _get_cloud_workspace()
connection = workspace.get_connection(connection_id=connection_id)

# If a job ID is provided, get the job by ID.
sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
return sync_result.get_job_status() if sync_result else None
# If a job ID is provided, get the job by ID.
sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)

if not sync_result:
return {"status": None, "job_id": None, "attempts": []}

result = {
"status": sync_result.get_job_status(),
"job_id": sync_result.job_id,
"bytes_synced": sync_result.bytes_synced,
"records_synced": sync_result.records_synced,
"start_time": sync_result.start_time.isoformat(),
"job_url": sync_result.job_url,
"attempts": [],
}

if include_attempts:
attempts = sync_result.get_attempts()
result["attempts"] = [
{
"attempt_number": attempt.attempt_number,
"attempt_id": attempt.attempt_id,
"status": attempt.status,
"bytes_synced": attempt.bytes_synced,
"records_synced": attempt.records_synced,
"created_at": attempt.created_at.isoformat(),
}
for attempt in attempts
]

return result # noqa: TRY300

except Exception as ex:
return {
"status": None,
"job_id": job_id,
"error": f"Failed to get sync status for connection '{connection_id}': {ex}",
"attempts": [],
}


# @app.tool() # << deferred
Expand All @@ -382,6 +426,69 @@ def list_deployed_cloud_destination_connectors() -> list[CloudDestination]:
return workspace.list_destinations()


# @app.tool() # << deferred
def get_cloud_sync_logs(
connection_id: Annotated[
str,
Field(description="The ID of the Airbyte Cloud connection."),
],
job_id: Annotated[
int | None,
Field(description="Optional job ID. If not provided, the latest job will be used."),
] = None,
attempt_number: Annotated[
int | None,
Field(
description="Optional attempt number. If not provided, the latest attempt will be used."
),
] = None,
) -> str:
"""Get the logs from a sync job attempt on Airbyte Cloud.

By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
Airbyte Cloud API.
"""
try:
workspace: CloudWorkspace = _get_cloud_workspace()
connection = workspace.get_connection(connection_id=connection_id)

sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)

if not sync_result:
return f"No sync job found for connection '{connection_id}'"

attempts = sync_result.get_attempts()

if not attempts:
return f"No attempts found for job '{sync_result.job_id}'"

if attempt_number is not None:
target_attempt = None
for attempt in attempts:
if attempt.attempt_number == attempt_number:
target_attempt = attempt
break

if target_attempt is None:
return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'"
else:
target_attempt = max(attempts, key=lambda a: a.attempt_number)

logs = target_attempt.get_full_log_text()

if not logs:
return (
f"No logs available for job '{sync_result.job_id}', "
f"attempt {target_attempt.attempt_number}"
)

return logs # noqa: TRY300

except Exception as ex:
return f"Failed to get logs for connection '{connection_id}': {ex}"


# @app.tool() # << deferred
def list_deployed_cloud_connections() -> list[CloudConnection]:
"""List all deployed connections in the Airbyte Cloud workspace.
Expand All @@ -403,6 +510,7 @@ def register_cloud_ops_tools(app: FastMCP) -> None:
app.tool(create_connection_on_cloud)
app.tool(run_cloud_sync)
app.tool(get_cloud_sync_status)
app.tool(get_cloud_sync_logs)
app.tool(list_deployed_cloud_source_connectors)
app.tool(list_deployed_cloud_destination_connectors)
app.tool(list_deployed_cloud_connections)
4 changes: 4 additions & 0 deletions bin/test_mcp_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

poe mcp-tool-test check_airbyte_cloud_workspace '{}'
poe mcp-tool-test list_deployed_cloud_connections '{}'
poe mcp-tool-test get_cloud_sync_status \
'{"connection_id": "0791e193-811b-4fcf-91c3-f8c5963e74a0", "include_attempts": true}'
poe mcp-tool-test get_cloud_sync_logs \
'{"connection_id": "0791e193-811b-4fcf-91c3-f8c5963e74a0"}'
"""

import asyncio
Expand Down
Loading