|
11 | 11 | import tempfile |
12 | 12 | from typing import Any |
13 | 13 |
|
14 | | -from fastapi import APIRouter, File, Form, HTTPException, UploadFile |
| 14 | +from fastapi import APIRouter, File, Form, HTTPException, Request, UploadFile |
15 | 15 | from fastapi.responses import StreamingResponse |
16 | 16 | from pydantic.alias_generators import to_camel |
17 | 17 |
|
|
27 | 27 | ) |
28 | 28 | from ..converter import convert_traces |
29 | 29 | from ..extraction import get_extractor |
30 | | -from ..runner import RunResult, get_loader, load_eval_set, run_evaluation |
| 30 | +from ..loader.otlp import OtlpJsonLoader |
| 31 | +from ..runner import ( |
| 32 | + RunResult, |
| 33 | + get_loader, |
| 34 | + load_eval_set, |
| 35 | + load_eval_set_from_dict, |
| 36 | + run_evaluation, |
| 37 | + run_evaluation_from_traces, |
| 38 | +) |
31 | 39 | from ..trace_metrics import extract_performance_metrics, extract_trace_metadata |
32 | 40 | from .models import ( |
33 | 41 | ApiKeyStatus, |
34 | 42 | ConfigData, |
35 | 43 | ConvertTracesData, |
36 | 44 | EvalSetValidation, |
| 45 | + EvaluateJsonRequest, |
37 | 46 | HealthData, |
38 | 47 | MetricInfo, |
39 | 48 | SSEDoneEvent, |
@@ -61,6 +70,8 @@ def _camel_keys(obj: Any) -> Any: |
61 | 70 |
|
62 | 71 | router = APIRouter() |
63 | 72 |
|
| 73 | +_MAX_JSON_BODY_BYTES = 50 * 1024 * 1024 # 50 MB (multipart endpoints allow 10 MB per file) |
| 74 | + |
64 | 75 | _TYPE_TO_MODEL = { |
65 | 76 | "builtin": BuiltinMetricDef, |
66 | 77 | "code": CodeEvaluatorDef, |
@@ -729,3 +740,148 @@ async def run_with_progress(): |
729 | 740 | "Connection": "keep-alive", |
730 | 741 | }, |
731 | 742 | ) |
| 743 | + |
| 744 | + |
| 745 | +def _parse_json_request(request: EvaluateJsonRequest): |
| 746 | + """Parse traces and eval set from an EvaluateJsonRequest. |
| 747 | +
|
| 748 | + Returns (traces, eval_set). Raises HTTPException on invalid input. |
| 749 | + """ |
| 750 | + try: |
| 751 | + traces = OtlpJsonLoader().load_from_dict(request.traces) |
| 752 | + except ValueError as exc: |
| 753 | + raise HTTPException(status_code=400, detail=str(exc)) from exc |
| 754 | + |
| 755 | + if not traces: |
| 756 | + raise HTTPException(status_code=400, detail="No traces found in OTLP JSON") |
| 757 | + |
| 758 | + eval_set = None |
| 759 | + if request.eval_set: |
| 760 | + try: |
| 761 | + eval_set = load_eval_set_from_dict(request.eval_set) |
| 762 | + except Exception as exc: |
| 763 | + raise HTTPException(status_code=400, detail=f"Invalid eval set: {exc}") from exc |
| 764 | + |
| 765 | + return traces, eval_set |
| 766 | + |
| 767 | + |
| 768 | +def _check_json_body_size(raw_request: Request): |
| 769 | + content_length = int(raw_request.headers.get("content-length", 0)) |
| 770 | + if content_length > _MAX_JSON_BODY_BYTES: |
| 771 | + raise HTTPException( |
| 772 | + status_code=413, |
| 773 | + detail=f"Request body exceeds {_MAX_JSON_BODY_BYTES // (1024 * 1024)}MB limit", |
| 774 | + ) |
| 775 | + |
| 776 | + |
| 777 | +def _sse_error(message: str) -> str: |
| 778 | + return f"data: {SSEErrorEvent(error=message).model_dump_json(by_alias=True)}\n\n" |
| 779 | + |
| 780 | + |
| 781 | +@router.post("/evaluate/json", response_model=StandardResponse[RunResult]) |
| 782 | +async def evaluate_traces_json(request: EvaluateJsonRequest, raw_request: Request): |
| 783 | + """Evaluate OTLP JSON traces passed in the request body.""" |
| 784 | + _check_json_body_size(raw_request) |
| 785 | + traces, eval_set = _parse_json_request(request) |
| 786 | + |
| 787 | + try: |
| 788 | + result = await run_evaluation_from_traces( |
| 789 | + traces=traces, |
| 790 | + config=request.config, |
| 791 | + eval_set=eval_set, |
| 792 | + ) |
| 793 | + return StandardResponse(data=_camel_keys(result.model_dump(by_alias=True))) |
| 794 | + except Exception as exc: |
| 795 | + logger.exception("JSON evaluation failed") |
| 796 | + raise HTTPException(status_code=500, detail=f"Internal error: {exc!s}") from exc |
| 797 | + |
| 798 | + |
| 799 | +@router.post("/evaluate/json/stream") |
| 800 | +async def evaluate_traces_json_stream(request: EvaluateJsonRequest, raw_request: Request): |
| 801 | + """Evaluate OTLP JSON traces with real-time progress via SSE.""" |
| 802 | + _check_json_body_size(raw_request) |
| 803 | + |
| 804 | + async def event_generator(): |
| 805 | + try: |
| 806 | + try: |
| 807 | + traces, eval_set = _parse_json_request(request) |
| 808 | + except HTTPException as exc: |
| 809 | + yield _sse_error(exc.detail) |
| 810 | + return |
| 811 | + |
| 812 | + for trace in traces: |
| 813 | + try: |
| 814 | + extractor = get_extractor(trace) |
| 815 | + perf_metrics = _camel_keys(extract_performance_metrics(trace, extractor)) |
| 816 | + trace_metadata = _camel_keys(extract_trace_metadata(trace, extractor)) |
| 817 | + evt = SSEPerformanceMetricsEvent( |
| 818 | + trace_id=trace.trace_id, |
| 819 | + performance_metrics=perf_metrics, |
| 820 | + trace_metadata=trace_metadata, |
| 821 | + ) |
| 822 | + yield f"event: performance_metrics\ndata: {evt.model_dump_json(by_alias=True)}\n\n" |
| 823 | + except Exception as e: |
| 824 | + logger.error(f"Failed to extract early performance metrics: {e}") |
| 825 | + |
| 826 | + queue: asyncio.Queue = asyncio.Queue() |
| 827 | + |
| 828 | + async def progress_callback(message: str): |
| 829 | + await queue.put(("progress", message)) |
| 830 | + |
| 831 | + async def trace_progress_callback(trace_result): |
| 832 | + await queue.put(("trace_progress", trace_result)) |
| 833 | + |
| 834 | + async def run_with_progress(): |
| 835 | + result = await run_evaluation_from_traces( |
| 836 | + traces=traces, |
| 837 | + config=request.config, |
| 838 | + eval_set=eval_set, |
| 839 | + progress_callback=progress_callback, |
| 840 | + trace_progress_callback=trace_progress_callback, |
| 841 | + ) |
| 842 | + await queue.put(("done", result)) |
| 843 | + |
| 844 | + eval_task = asyncio.create_task(run_with_progress()) |
| 845 | + |
| 846 | + try: |
| 847 | + while True: |
| 848 | + msg = await queue.get() |
| 849 | + tag, payload = msg |
| 850 | + |
| 851 | + if tag == "done": |
| 852 | + evt = SSEDoneEvent( |
| 853 | + result=_camel_keys(payload.model_dump(by_alias=True)), |
| 854 | + ) |
| 855 | + yield f"data: {evt.model_dump_json(by_alias=True)}\n\n" |
| 856 | + break |
| 857 | + elif tag == "trace_progress": |
| 858 | + evt = SSETraceProgressEvent( |
| 859 | + trace_progress=SSETraceProgress( |
| 860 | + trace_id=payload.trace_id, |
| 861 | + partial_result=_camel_keys(payload.model_dump(by_alias=True)), |
| 862 | + ) |
| 863 | + ) |
| 864 | + yield f"data: {evt.model_dump_json(by_alias=True)}\n\n" |
| 865 | + elif tag == "progress": |
| 866 | + evt = SSEProgressEvent(message=payload) |
| 867 | + yield f"data: {evt.model_dump_json(by_alias=True)}\n\n" |
| 868 | + finally: |
| 869 | + if not eval_task.done(): |
| 870 | + eval_task.cancel() |
| 871 | + try: |
| 872 | + await eval_task |
| 873 | + except asyncio.CancelledError: |
| 874 | + pass |
| 875 | + |
| 876 | + except Exception as exc: |
| 877 | + logger.exception("JSON evaluation stream failed") |
| 878 | + yield _sse_error(str(exc)) |
| 879 | + |
| 880 | + return StreamingResponse( |
| 881 | + event_generator(), |
| 882 | + media_type="text/event-stream", |
| 883 | + headers={ |
| 884 | + "Cache-Control": "no-cache", |
| 885 | + "Connection": "keep-alive", |
| 886 | + }, |
| 887 | + ) |
0 commit comments