|
4 | 4 | import logging |
5 | 5 | from contextlib import asynccontextmanager |
6 | 6 | from typing import Union, Callable, get_type_hints, Generator, AsyncGenerator, Iterator, AsyncIterator |
7 | | -from fastapi import APIRouter, FastAPI, Response |
8 | | -from fastapi.responses import JSONResponse |
| 7 | +from fastapi import APIRouter, FastAPI, Request, Response, status |
| 8 | +from fastapi.exceptions import HTTPException |
| 9 | +from fastapi.responses import JSONResponse, StreamingResponse |
9 | 10 |
|
10 | | -from apipod.common.settings import APIPOD_PORT, APIPOD_HOST, SERVER_DOMAIN |
| 11 | +from apipod.common.settings import APIPOD_PORT, APIPOD_HOST |
11 | 12 | from apipod.common.constants import SERVER_HEALTH |
12 | 13 | from apipod.engine.jobs.job_result import JobResultFactory, JobResult |
13 | 14 | from apipod.engine.endpoint_config import FastApiEndpointConfigurator, EndpointExecutionPlan |
@@ -50,10 +51,14 @@ def __init__( |
50 | 51 | job_queue: Optional custom JobQueue implementation |
51 | 52 | lifespan: Optional async context manager for custom startup/shutdown logic |
52 | 53 | args: Additional arguments |
53 | | - kwargs: Additional keyword arguments |
| 54 | + kwargs: May include ``stream_store`` (SSE backend for GET /stream/{job_id}) and |
| 55 | + ``gateway_stream_url_prefix`` for absolute stream URLs in JobResult, plus |
| 56 | + additional keyword arguments for parent classes. |
54 | 57 | """ |
55 | 58 | # Extract user-provided lifespan (explicit param or kwarg) before parent init |
56 | 59 | user_lifespan = lifespan or kwargs.pop('lifespan', None) |
| 60 | + stream_store = kwargs.pop("stream_store", None) |
| 61 | + gateway_stream_url_prefix = kwargs.pop("gateway_stream_url_prefix", "") |
57 | 62 |
|
58 | 63 | # Initialize parent classes |
59 | 64 | api_router_params = inspect.signature(APIRouter.__init__).parameters |
@@ -92,6 +97,8 @@ def __init__( |
92 | 97 |
|
93 | 98 | self.app: FastAPI = app |
94 | 99 | self.prefix = prefix |
| 100 | + self.stream_store = stream_store |
| 101 | + self.gateway_stream_url_prefix = gateway_stream_url_prefix |
95 | 102 | self.add_standard_routes() |
96 | 103 |
|
97 | 104 | self._endpoint_configurator = FastApiEndpointConfigurator(self) |
@@ -173,7 +180,11 @@ def _stop_background_worker(self): |
173 | 180 | def add_standard_routes(self): |
174 | 181 | """Add standard API routes for status and health checks.""" |
175 | 182 | if self.job_queue is not None: |
176 | | - self.api_route(path="/status", methods=["POST"])(self.get_job) |
| 183 | + self.api_route(path="/status/{job_id}", methods=["GET"], response_model_exclude_none=True)(self.get_job) |
| 184 | + self.api_route(path="/status", methods=["POST"], response_model_exclude_none=True)(self.get_job) |
| 185 | + self.api_route(path="/cancel/{job_id}", methods=["POST"])(self.post_cancel_job) |
| 186 | + if self.stream_store is not None: |
| 187 | + self.api_route(path="/stream/{job_id}", methods=["GET"])(self.stream_job_sse) |
177 | 188 | self.api_route(path="/health", methods=["GET"])(self.get_health) |
178 | 189 |
|
179 | 190 | def get_health(self) -> Response: |
@@ -216,19 +227,76 @@ def get_job(self, job_id: str, return_format: str = 'json') -> JobResult: |
216 | 227 | if self.job_queue is None: |
217 | 228 | return JobResultFactory.job_not_found(job_id) |
218 | 229 |
|
219 | | - base_job = self.job_queue.get_job(job_id) |
220 | | - if base_job is None: |
| 230 | + ret_job = self.job_queue.get_job_result(job_id) |
| 231 | + if ret_job is None: |
221 | 232 | return JobResultFactory.job_not_found(job_id) |
222 | 233 |
|
223 | | - ret_job = JobResultFactory.from_base_job(base_job) |
224 | | - ret_job.refresh_job_url = f"{SERVER_DOMAIN}/status?job_id={ret_job.id}" |
225 | | - ret_job.cancel_job_url = f"{SERVER_DOMAIN}/cancel?job_id={ret_job.id}" |
226 | | - |
227 | 234 | if return_format != 'json': |
228 | 235 | ret_job = JobResultFactory.gzip_job_result(ret_job) |
229 | 236 |
|
230 | 237 | return ret_job |
231 | 238 |
|
| 239 | + def post_cancel_job(self, job_id: str) -> dict: |
| 240 | + """Cancel a background job (gateway / orchestrator integration).""" |
| 241 | + job_id = job_id.strip().strip('"').strip("'").strip("?").strip("#") |
| 242 | + if self.job_queue is None: |
| 243 | + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Job queue not configured.") |
| 244 | + |
| 245 | + cancel_fn = getattr(self.job_queue, "cancel_gateway_job", None) |
| 246 | + if callable(cancel_fn): |
| 247 | + return cancel_fn(job_id) |
| 248 | + |
| 249 | + try: |
| 250 | + self.job_queue.cancel_job(job_id) |
| 251 | + except NotImplementedError: |
| 252 | + raise HTTPException( |
| 253 | + status_code=status.HTTP_501_NOT_IMPLEMENTED, |
| 254 | + detail="Cancellation is not supported for this job queue.", |
| 255 | + ) from None |
| 256 | + return {"id": job_id, "status": "cancelled", "message": "Job cancelled."} |
| 257 | + |
| 258 | + async def stream_job_sse(self, job_id: str, request: Request): |
| 259 | + """Server-Sent Events for streaming job output (requires stream_store).""" |
| 260 | + job_id = job_id.strip().strip('"').strip("'").strip("?").strip("#") |
| 261 | + if self.stream_store is None: |
| 262 | + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Streaming not configured.") |
| 263 | + if self.job_queue is None: |
| 264 | + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Job queue not configured.") |
| 265 | + |
| 266 | + jq = self.job_queue |
| 267 | + job_data = jq.get_job_status(job_id) if hasattr(jq, "get_job_status") else None |
| 268 | + |
| 269 | + if job_data is None: |
| 270 | + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Job '{job_id}' not found.") |
| 271 | + |
| 272 | + st = (job_data.get("status") or "").lower() |
| 273 | + if st != "streaming" and not self.stream_store.stream_exists(job_id): |
| 274 | + raise HTTPException( |
| 275 | + status_code=status.HTTP_409_CONFLICT, |
| 276 | + detail=f"Job '{job_id}' is not streaming (status: {job_data.get('status')}).", |
| 277 | + ) |
| 278 | + |
| 279 | + async def _event_generator(): |
| 280 | + try: |
| 281 | + async for chunk in self.stream_store.read_chunks(job_id): |
| 282 | + if await request.is_disconnected(): |
| 283 | + break |
| 284 | + yield chunk |
| 285 | + except Exception: |
| 286 | + self._logger.exception("Error during stream delivery | job_id=%s", job_id) |
| 287 | + yield 'data: {"error": "Internal stream error"}\n\n' |
| 288 | + |
| 289 | + return StreamingResponse( |
| 290 | + _event_generator(), |
| 291 | + media_type="text/event-stream", |
| 292 | + headers={ |
| 293 | + "Cache-Control": "no-cache", |
| 294 | + "Connection": "keep-alive", |
| 295 | + "X-Accel-Buffering": "no", |
| 296 | + }, |
| 297 | + ) |
| 298 | + |
| 299 | + |
232 | 300 | def endpoint(self, path: str, methods: list[str] | None = None, max_upload_file_size_mb: int = None, queue_size: int = 500, use_queue: bool = None, *args, **kwargs): |
233 | 301 | """ |
234 | 302 | Unified endpoint decorator. |
@@ -356,12 +424,16 @@ async def _unified_worker(*w_args, **w_kwargs): |
356 | 424 | plan.max_upload_file_size_mb |
357 | 425 | ) |
358 | 426 |
|
| 427 | + route_kwargs = dict(plan.route_kwargs) |
| 428 | + if plan.should_use_queue: |
| 429 | + route_kwargs["response_model_exclude_none"] = True |
| 430 | + |
359 | 431 | self.api_route( |
360 | 432 | path=plan.path, |
361 | 433 | methods=plan.active_methods, |
362 | 434 | response_model=JobResult if plan.should_use_queue else res_model, |
363 | 435 | *plan.route_args, |
364 | | - **plan.route_kwargs |
| 436 | + **route_kwargs |
365 | 437 | )(final_handler) |
366 | 438 |
|
367 | 439 | return final_handler |
@@ -472,12 +544,14 @@ def _determine_generator_fun(self, func: Callable) -> bool: |
472 | 544 | def _create_task_endpoint_decorator(self, path: str, methods: list[str] | None, max_upload_file_size_mb: int, queue_size: int, args, kwargs): |
473 | 545 | """Create a decorator for task endpoints (background job execution).""" |
474 | 546 | # FastAPI route decorator (returning JobResult) |
| 547 | + task_kwargs = dict(kwargs) |
| 548 | + task_kwargs["response_model_exclude_none"] = True |
475 | 549 | fastapi_route_decorator = self.api_route( |
476 | 550 | path=path, |
477 | 551 | methods=["POST"] if methods is None else methods, |
478 | 552 | response_model=JobResult, |
479 | 553 | *args, |
480 | | - **kwargs |
| 554 | + **task_kwargs |
481 | 555 | ) |
482 | 556 |
|
483 | 557 | # Queue decorator |
|
0 commit comments