diff --git a/airbyte_cdk/manifest_server/api_models/__init__.py b/airbyte_cdk/manifest_server/api_models/__init__.py index 3469fd7e3..1f8dcdce5 100644 --- a/airbyte_cdk/manifest_server/api_models/__init__.py +++ b/airbyte_cdk/manifest_server/api_models/__init__.py @@ -10,6 +10,7 @@ CheckResponse, DiscoverRequest, DiscoverResponse, + ErrorResponse, FullResolveRequest, ManifestResponse, RequestContext, @@ -40,6 +41,7 @@ "CheckResponse", "DiscoverRequest", "DiscoverResponse", + "ErrorResponse", # Stream models "AuxiliaryRequest", "HttpRequest", diff --git a/airbyte_cdk/manifest_server/api_models/manifest.py b/airbyte_cdk/manifest_server/api_models/manifest.py index a17ac9c63..fc969385f 100644 --- a/airbyte_cdk/manifest_server/api_models/manifest.py +++ b/airbyte_cdk/manifest_server/api_models/manifest.py @@ -83,3 +83,9 @@ class FullResolveRequest(BaseModel): config: ConnectorConfig stream_limit: int = Field(default=100, ge=1, le=100) context: Optional[RequestContext] = None + + +class ErrorResponse(BaseModel): + """Error response for API requests.""" + + detail: str diff --git a/airbyte_cdk/manifest_server/app.py b/airbyte_cdk/manifest_server/app.py index 89e98045e..11812b0a8 100644 --- a/airbyte_cdk/manifest_server/app.py +++ b/airbyte_cdk/manifest_server/app.py @@ -11,7 +11,7 @@ app = FastAPI( title="Manifest Server", description="A service for running low-code Airbyte connectors", - version="0.1.0", + version="0.2.0", contact={ "name": "Airbyte", "url": "https://airbyte.com", diff --git a/airbyte_cdk/manifest_server/openapi.yaml b/airbyte_cdk/manifest_server/openapi.yaml index b807953e6..66d69af59 100644 --- a/airbyte_cdk/manifest_server/openapi.yaml +++ b/airbyte_cdk/manifest_server/openapi.yaml @@ -8,7 +8,7 @@ info: contact: name: Airbyte url: https://airbyte.com/ - version: 0.1.0 + version: 0.2.0 paths: /health/: get: @@ -62,6 +62,12 @@ paths: application/json: schema: $ref: '#/components/schemas/StreamReadResponse' + '400': + description: Bad Request - Error processing request + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' '422': description: Validation Error content: @@ -90,6 +96,12 @@ paths: application/json: schema: $ref: '#/components/schemas/CheckResponse' + '400': + description: Bad Request - Error processing request + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' '422': description: Validation Error content: @@ -118,6 +130,12 @@ paths: application/json: schema: $ref: '#/components/schemas/DiscoverResponse' + '400': + description: Bad Request - Error processing request + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' '422': description: Validation Error content: @@ -146,6 +164,12 @@ paths: application/json: schema: $ref: '#/components/schemas/ManifestResponse' + '400': + description: Bad Request - Error processing request + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' '422': description: Validation Error content: @@ -180,6 +204,12 @@ paths: application/json: schema: $ref: '#/components/schemas/ManifestResponse' + '400': + description: Bad Request - Error processing request + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' '422': description: Validation Error content: @@ -353,6 +383,16 @@ components: - catalog title: DiscoverResponse description: Response to discover a manifest. + ErrorResponse: + properties: + detail: + type: string + title: Detail + type: object + required: + - detail + title: ErrorResponse + description: Error response for API requests. FullResolveRequest: properties: manifest: diff --git a/airbyte_cdk/manifest_server/routers/manifest.py b/airbyte_cdk/manifest_server/routers/manifest.py index 4fefb2129..5a3106ea3 100644 --- a/airbyte_cdk/manifest_server/routers/manifest.py +++ b/airbyte_cdk/manifest_server/routers/manifest.py @@ -14,12 +14,14 @@ INJECTED_COMPONENTS_PY, INJECTED_COMPONENTS_PY_CHECKSUMS, ) +from airbyte_cdk.utils.traced_exception import AirbyteTracedException from ..api_models import ( CheckRequest, CheckResponse, DiscoverRequest, DiscoverResponse, + ErrorResponse, FullResolveRequest, Manifest, ManifestResponse, @@ -64,7 +66,13 @@ def safe_build_source( ) -@router.post("/test_read", operation_id="testRead") +@router.post( + "/test_read", + operation_id="testRead", + responses={ + 400: {"description": "Bad Request - Error processing request", "model": ErrorResponse} + }, +) def test_read(request: StreamTestReadRequest) -> StreamReadResponse: """ Test reading from a specific stream in the manifest. @@ -98,18 +106,30 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse: ) runner = ManifestCommandProcessor(source) - cdk_result = runner.test_read( - config_dict, - catalog, - converted_state, - request.record_limit, - request.page_limit, - request.slice_limit, - ) - return StreamReadResponse.model_validate(asdict(cdk_result)) + try: + cdk_result = runner.test_read( + config_dict, + catalog, + converted_state, + request.record_limit, + request.page_limit, + request.slice_limit, + ) + return StreamReadResponse.model_validate(asdict(cdk_result)) + except Exception as exc: + error = AirbyteTracedException.from_exception( + exc, message=f"Error reading stream: {str(exc)}" + ) + raise HTTPException(status_code=400, detail=error.message) -@router.post("/check", operation_id="check") +@router.post( + "/check", + operation_id="check", + responses={ + 400: {"description": "Bad Request - Error processing request", "model": ErrorResponse} + }, +) def check(request: CheckRequest) -> CheckResponse: """Check configuration against a manifest""" # Apply trace tags from context if provided @@ -119,13 +139,25 @@ def check(request: CheckRequest) -> CheckResponse: project_id=request.context.project_id, ) - source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) - runner = ManifestCommandProcessor(source) - success, message = runner.check_connection(request.config.model_dump()) - return CheckResponse(success=success, message=message) + try: + source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) + runner = ManifestCommandProcessor(source) + success, message = runner.check_connection(request.config.model_dump()) + return CheckResponse(success=success, message=message) + except Exception as exc: + error = AirbyteTracedException.from_exception( + exc, message=f"Error checking connection: {str(exc)}" + ) + raise HTTPException(status_code=400, detail=error.message) -@router.post("/discover", operation_id="discover") +@router.post( + "/discover", + operation_id="discover", + responses={ + 400: {"description": "Bad Request - Error processing request", "model": ErrorResponse} + }, +) def discover(request: DiscoverRequest) -> DiscoverResponse: """Discover streams from a manifest""" # Apply trace tags from context if provided @@ -135,15 +167,32 @@ def discover(request: DiscoverRequest) -> DiscoverResponse: project_id=request.context.project_id, ) - source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) - runner = ManifestCommandProcessor(source) - catalog = runner.discover(request.config.model_dump()) - if catalog is None: - raise HTTPException(status_code=422, detail="Connector did not return a discovered catalog") - return DiscoverResponse(catalog=catalog) + try: + source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) + runner = ManifestCommandProcessor(source) + catalog = runner.discover(request.config.model_dump()) + if catalog is None: + raise HTTPException( + status_code=422, detail="Connector did not return a discovered catalog" + ) + return DiscoverResponse(catalog=catalog) + except HTTPException: + # Re-raise HTTPExceptions as-is (like the catalog None check above) + raise + except Exception as exc: + error = AirbyteTracedException.from_exception( + exc, message=f"Error discovering streams: {str(exc)}" + ) + raise HTTPException(status_code=400, detail=error.message) -@router.post("/resolve", operation_id="resolve") +@router.post( + "/resolve", + operation_id="resolve", + responses={ + 400: {"description": "Bad Request - Error processing request", "model": ErrorResponse} + }, +) def resolve(request: ResolveRequest) -> ManifestResponse: """Resolve a manifest to its final configuration.""" # Apply trace tags from context if provided @@ -153,11 +202,23 @@ def resolve(request: ResolveRequest) -> ManifestResponse: project_id=request.context.project_id, ) - source = safe_build_source(request.manifest.model_dump(), {}) - return ManifestResponse(manifest=Manifest(**source.resolved_manifest)) + try: + source = safe_build_source(request.manifest.model_dump(), {}) + return ManifestResponse(manifest=Manifest(**source.resolved_manifest)) + except Exception as exc: + error = AirbyteTracedException.from_exception( + exc, message=f"Error resolving manifest: {str(exc)}" + ) + raise HTTPException(status_code=400, detail=error.message) -@router.post("/full_resolve", operation_id="fullResolve") +@router.post( + "/full_resolve", + operation_id="fullResolve", + responses={ + 400: {"description": "Bad Request - Error processing request", "model": ErrorResponse} + }, +) def full_resolve(request: FullResolveRequest) -> ManifestResponse: """ Fully resolve a manifest, including dynamic streams. @@ -171,21 +232,27 @@ def full_resolve(request: FullResolveRequest) -> ManifestResponse: project_id=request.context.project_id, ) - source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) - manifest = {**source.resolved_manifest} - streams = manifest.get("streams", []) - for stream in streams: - stream["dynamic_stream_name"] = None + try: + source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) + manifest = {**source.resolved_manifest} + streams = manifest.get("streams", []) + for stream in streams: + stream["dynamic_stream_name"] = None - mapped_streams: Dict[str, List[Dict[str, Any]]] = {} - for stream in source.dynamic_streams: - generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], []) + mapped_streams: Dict[str, List[Dict[str, Any]]] = {} + for stream in source.dynamic_streams: + generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], []) - if len(generated_streams) < request.stream_limit: - generated_streams += [stream] + if len(generated_streams) < request.stream_limit: + generated_streams += [stream] - for generated_streams_list in mapped_streams.values(): - streams.extend(generated_streams_list) + for generated_streams_list in mapped_streams.values(): + streams.extend(generated_streams_list) - manifest["streams"] = streams - return ManifestResponse(manifest=Manifest(**manifest)) + manifest["streams"] = streams + return ManifestResponse(manifest=Manifest(**manifest)) + except Exception as exc: + error = AirbyteTracedException.from_exception( + exc, message=f"Error full resolving manifest: {str(exc)}" + ) + raise HTTPException(status_code=400, detail=error.message) diff --git a/unit_tests/manifest_server/routers/test_manifest.py b/unit_tests/manifest_server/routers/test_manifest.py index 4f8a90b93..c53bb5aa2 100644 --- a/unit_tests/manifest_server/routers/test_manifest.py +++ b/unit_tests/manifest_server/routers/test_manifest.py @@ -527,3 +527,122 @@ def test_discover_endpoint_missing_catalog( assert response.status_code == 422 data = response.json() assert "Connector did not return a discovered catalog" in data["detail"] + + # Test cases for error handling improvements + @patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_test_read_cdk_error_handling( + self, mock_build_source, mock_runner_class, sample_manifest, sample_config, mock_source + ): + """Test that CDK errors in test_read are properly caught and converted to HTTP 400.""" + request_data = { + "manifest": sample_manifest, + "config": sample_config, + "stream_name": "products", + } + + mock_build_source.return_value = mock_source + + mock_runner = Mock() + # Simulate a CDK error (like datetime parsing error) + mock_runner.test_read.side_effect = ValueError( + "time data '' does not match format '%Y-%m-%dT%H:%M:%SZ'" + ) + mock_runner_class.return_value = mock_runner + + response = client.post("/v1/manifest/test_read", json=request_data) + + assert response.status_code == 400 + data = response.json() + assert "detail" in data + assert "Error reading stream:" in data["detail"] + assert "time data" in data["detail"] + assert "does not match format" in data["detail"] + + @patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_check_cdk_error_handling( + self, mock_build_source, mock_runner_class, sample_manifest, sample_config, mock_source + ): + """Test that CDK errors in check are properly caught and converted to HTTP 400.""" + request_data = { + "manifest": sample_manifest, + "config": sample_config, + } + + mock_build_source.return_value = mock_source + + mock_runner = Mock() + # Simulate a CDK error (like connection error) + mock_runner.check_connection.side_effect = ConnectionError("Failed to connect to API") + mock_runner_class.return_value = mock_runner + + response = client.post("/v1/manifest/check", json=request_data) + + assert response.status_code == 400 + data = response.json() + assert "detail" in data + assert "Error checking connection:" in data["detail"] + assert "Failed to connect to API" in data["detail"] + + @patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_discover_cdk_error_handling( + self, mock_build_source, mock_runner_class, sample_manifest, sample_config, mock_source + ): + """Test that CDK errors in discover are properly caught and converted to HTTP 400.""" + request_data = { + "manifest": sample_manifest, + "config": sample_config, + } + + mock_build_source.return_value = mock_source + + mock_runner = Mock() + # Simulate a CDK error + mock_runner.discover.side_effect = RuntimeError("Schema validation failed") + mock_runner_class.return_value = mock_runner + + response = client.post("/v1/manifest/discover", json=request_data) + + assert response.status_code == 400 + data = response.json() + assert "detail" in data + assert "Error discovering streams:" in data["detail"] + assert "Schema validation failed" in data["detail"] + + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_resolve_cdk_error_handling(self, mock_build_source, sample_manifest): + """Test that CDK errors in resolve are properly caught and converted to HTTP 400.""" + request_data = { + "manifest": sample_manifest, + } + + # Simulate a CDK error during source building + mock_build_source.side_effect = AttributeError("'NoneType' object has no attribute 'get'") + + response = client.post("/v1/manifest/resolve", json=request_data) + + assert response.status_code == 400 + data = response.json() + assert "detail" in data + assert "Error resolving manifest:" in data["detail"] + assert "'NoneType' object has no attribute 'get'" in data["detail"] + + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_full_resolve_cdk_error_handling( + self, mock_build_source, sample_manifest, sample_config + ): + """Test that CDK errors in full_resolve are properly caught and converted to HTTP 400.""" + request_data = {"manifest": sample_manifest, "config": sample_config, "stream_limit": 10} + + # Simulate a CDK error during source building + mock_build_source.side_effect = KeyError("Missing required field 'streams'") + + response = client.post("/v1/manifest/full_resolve", json=request_data) + + assert response.status_code == 400 + data = response.json() + assert "detail" in data + assert "Error full resolving manifest:" in data["detail"] + assert "Missing required field 'streams'" in data["detail"]