Skip to content
2 changes: 2 additions & 0 deletions airbyte_cdk/manifest_server/api_models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
CheckResponse,
DiscoverRequest,
DiscoverResponse,
ErrorResponse,
FullResolveRequest,
ManifestResponse,
RequestContext,
Expand Down Expand Up @@ -40,6 +41,7 @@
"CheckResponse",
"DiscoverRequest",
"DiscoverResponse",
"ErrorResponse",
# Stream models
"AuxiliaryRequest",
"HttpRequest",
Expand Down
6 changes: 6 additions & 0 deletions airbyte_cdk/manifest_server/api_models/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,9 @@ class FullResolveRequest(BaseModel):
config: ConnectorConfig
stream_limit: int = Field(default=100, ge=1, le=100)
context: Optional[RequestContext] = None


class ErrorResponse(BaseModel):
"""Error response for API requests."""

detail: str
2 changes: 1 addition & 1 deletion airbyte_cdk/manifest_server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
app = FastAPI(
title="Manifest Server",
description="A service for running low-code Airbyte connectors",
version="0.1.0",
version="0.2.0",
contact={
"name": "Airbyte",
"url": "https://airbyte.com",
Expand Down
42 changes: 41 additions & 1 deletion airbyte_cdk/manifest_server/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ info:
contact:
name: Airbyte
url: https://airbyte.com/
version: 0.1.0
version: 0.2.0
paths:
/health/:
get:
Expand Down Expand Up @@ -62,6 +62,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/StreamReadResponse'
'400':
description: Bad Request - Error processing request
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -90,6 +96,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/CheckResponse'
'400':
description: Bad Request - Error processing request
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -118,6 +130,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/DiscoverResponse'
'400':
description: Bad Request - Error processing request
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -146,6 +164,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/ManifestResponse'
'400':
description: Bad Request - Error processing request
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -180,6 +204,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/ManifestResponse'
'400':
description: Bad Request - Error processing request
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -353,6 +383,16 @@ components:
- catalog
title: DiscoverResponse
description: Response to discover a manifest.
ErrorResponse:
properties:
detail:
type: string
title: Detail
type: object
required:
- detail
title: ErrorResponse
description: Error response for API requests.
FullResolveRequest:
properties:
manifest:
Expand Down
147 changes: 107 additions & 40 deletions airbyte_cdk/manifest_server/routers/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
INJECTED_COMPONENTS_PY,
INJECTED_COMPONENTS_PY_CHECKSUMS,
)
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

from ..api_models import (
CheckRequest,
CheckResponse,
DiscoverRequest,
DiscoverResponse,
ErrorResponse,
FullResolveRequest,
Manifest,
ManifestResponse,
Expand Down Expand Up @@ -64,7 +66,13 @@ def safe_build_source(
)


@router.post("/test_read", operation_id="testRead")
@router.post(
"/test_read",
operation_id="testRead",
responses={
400: {"description": "Bad Request - Error processing request", "model": ErrorResponse}
},
)
def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
"""
Test reading from a specific stream in the manifest.
Expand Down Expand Up @@ -98,18 +106,30 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
)

runner = ManifestCommandProcessor(source)
cdk_result = runner.test_read(
config_dict,
catalog,
converted_state,
request.record_limit,
request.page_limit,
request.slice_limit,
)
return StreamReadResponse.model_validate(asdict(cdk_result))
try:
cdk_result = runner.test_read(
config_dict,
catalog,
converted_state,
request.record_limit,
request.page_limit,
request.slice_limit,
)
return StreamReadResponse.model_validate(asdict(cdk_result))
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error reading stream: {str(exc)}"
)
raise HTTPException(status_code=400, detail=error.message)


@router.post("/check", operation_id="check")
@router.post(
"/check",
operation_id="check",
responses={
400: {"description": "Bad Request - Error processing request", "model": ErrorResponse}
},
)
def check(request: CheckRequest) -> CheckResponse:
"""Check configuration against a manifest"""
# Apply trace tags from context if provided
Expand All @@ -119,13 +139,25 @@ def check(request: CheckRequest) -> CheckResponse:
project_id=request.context.project_id,
)

source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
runner = ManifestCommandProcessor(source)
success, message = runner.check_connection(request.config.model_dump())
return CheckResponse(success=success, message=message)
try:
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
runner = ManifestCommandProcessor(source)
success, message = runner.check_connection(request.config.model_dump())
return CheckResponse(success=success, message=message)
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error checking connection: {str(exc)}"
)
raise HTTPException(status_code=400, detail=error.message)


@router.post("/discover", operation_id="discover")
@router.post(
"/discover",
operation_id="discover",
responses={
400: {"description": "Bad Request - Error processing request", "model": ErrorResponse}
},
)
def discover(request: DiscoverRequest) -> DiscoverResponse:
"""Discover streams from a manifest"""
# Apply trace tags from context if provided
Expand All @@ -135,15 +167,32 @@ def discover(request: DiscoverRequest) -> DiscoverResponse:
project_id=request.context.project_id,
)

source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
runner = ManifestCommandProcessor(source)
catalog = runner.discover(request.config.model_dump())
if catalog is None:
raise HTTPException(status_code=422, detail="Connector did not return a discovered catalog")
return DiscoverResponse(catalog=catalog)
try:
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
runner = ManifestCommandProcessor(source)
catalog = runner.discover(request.config.model_dump())
if catalog is None:
raise HTTPException(
status_code=422, detail="Connector did not return a discovered catalog"
)
return DiscoverResponse(catalog=catalog)
except HTTPException:
# Re-raise HTTPExceptions as-is (like the catalog None check above)
raise
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error discovering streams: {str(exc)}"
)
raise HTTPException(status_code=400, detail=error.message)


@router.post("/resolve", operation_id="resolve")
@router.post(
"/resolve",
operation_id="resolve",
responses={
400: {"description": "Bad Request - Error processing request", "model": ErrorResponse}
},
)
def resolve(request: ResolveRequest) -> ManifestResponse:
"""Resolve a manifest to its final configuration."""
# Apply trace tags from context if provided
Expand All @@ -153,11 +202,23 @@ def resolve(request: ResolveRequest) -> ManifestResponse:
project_id=request.context.project_id,
)

source = safe_build_source(request.manifest.model_dump(), {})
return ManifestResponse(manifest=Manifest(**source.resolved_manifest))
try:
source = safe_build_source(request.manifest.model_dump(), {})
return ManifestResponse(manifest=Manifest(**source.resolved_manifest))
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error resolving manifest: {str(exc)}"
)
raise HTTPException(status_code=400, detail=error.message)


@router.post("/full_resolve", operation_id="fullResolve")
@router.post(
"/full_resolve",
operation_id="fullResolve",
responses={
400: {"description": "Bad Request - Error processing request", "model": ErrorResponse}
},
)
def full_resolve(request: FullResolveRequest) -> ManifestResponse:
"""
Fully resolve a manifest, including dynamic streams.
Expand All @@ -171,21 +232,27 @@ def full_resolve(request: FullResolveRequest) -> ManifestResponse:
project_id=request.context.project_id,
)

source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
manifest = {**source.resolved_manifest}
streams = manifest.get("streams", [])
for stream in streams:
stream["dynamic_stream_name"] = None
try:
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
manifest = {**source.resolved_manifest}
streams = manifest.get("streams", [])
for stream in streams:
stream["dynamic_stream_name"] = None

mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
for stream in source.dynamic_streams:
generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])
mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
for stream in source.dynamic_streams:
generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])

if len(generated_streams) < request.stream_limit:
generated_streams += [stream]
if len(generated_streams) < request.stream_limit:
generated_streams += [stream]

for generated_streams_list in mapped_streams.values():
streams.extend(generated_streams_list)
for generated_streams_list in mapped_streams.values():
streams.extend(generated_streams_list)

manifest["streams"] = streams
return ManifestResponse(manifest=Manifest(**manifest))
manifest["streams"] = streams
return ManifestResponse(manifest=Manifest(**manifest))
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error full resolving manifest: {str(exc)}"
)
raise HTTPException(status_code=400, detail=error.message)
Loading
Loading