Skip to content

Commit e14f630

Browse files
Merge pull request #107 from ajimenez1503/feat/StreamingTraceManager
Create StreamingTraceManager once
2 parents 81bf034 + a098e40 commit e14f630

7 files changed

Lines changed: 231 additions & 145 deletions

File tree

src/agentevals/api/app.py

Lines changed: 117 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
"""FastAPI application for agentevals REST API."""
22

3+
from __future__ import annotations
4+
35
import asyncio
46
import json
57
import logging
68
import os
79
from contextlib import asynccontextmanager
810
from pathlib import Path
11+
from typing import TYPE_CHECKING
912

1013
from fastapi import FastAPI
1114
from fastapi.middleware.cors import CORSMiddleware
@@ -17,6 +20,9 @@
1720
from .debug_routes import debug_router
1821
from .routes import router
1922

23+
if TYPE_CHECKING:
24+
from ..streaming.ws_server import StreamingTraceManager
25+
2026
try:
2127
from dotenv import load_dotenv
2228

@@ -27,107 +33,122 @@
2733
pass
2834

2935

30-
@asynccontextmanager
31-
async def lifespan(app: FastAPI):
32-
log_level_str = os.getenv("AGENTEVALS_LOG_LEVEL", "INFO").upper()
33-
log_level = getattr(logging, log_level_str, logging.INFO)
34-
logging.basicConfig(
35-
level=log_level,
36-
format="%(levelname)s:%(name)s:%(message)s",
37-
force=True,
36+
def _build_lifespan():
37+
@asynccontextmanager
38+
async def lifespan(app: FastAPI):
39+
log_level_str = os.getenv("AGENTEVALS_LOG_LEVEL", "INFO").upper()
40+
log_level = getattr(logging, log_level_str, logging.INFO)
41+
logging.basicConfig(
42+
level=log_level,
43+
format="%(levelname)s:%(name)s:%(message)s",
44+
force=True,
45+
)
46+
ae_logger = logging.getLogger("agentevals")
47+
ae_logger.setLevel(log_level)
48+
if log_buffer not in ae_logger.handlers:
49+
log_buffer.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s"))
50+
ae_logger.addHandler(log_buffer)
51+
mgr = getattr(app.state, "trace_manager", None)
52+
if mgr:
53+
mgr.start_cleanup_task()
54+
yield
55+
if mgr:
56+
await mgr.shutdown()
57+
ae_logger.removeHandler(log_buffer)
58+
59+
return lifespan
60+
61+
62+
def create_app(
63+
*,
64+
trace_manager: StreamingTraceManager | None = None,
65+
enable_streaming: bool = False,
66+
) -> FastAPI:
67+
"""Create the main agentevals API app."""
68+
app = FastAPI(
69+
title="agentevals API",
70+
version=__version__,
71+
description="REST API for evaluating agent traces using ADK's scoring framework",
72+
lifespan=_build_lifespan(),
3873
)
39-
ae_logger = logging.getLogger("agentevals")
40-
ae_logger.setLevel(log_level)
41-
if log_buffer not in ae_logger.handlers:
42-
log_buffer.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s"))
43-
ae_logger.addHandler(log_buffer)
44-
mgr = getattr(app.state, "trace_manager", None)
45-
if mgr:
46-
mgr.start_cleanup_task()
47-
yield
48-
if mgr:
49-
await mgr.shutdown()
50-
ae_logger.removeHandler(log_buffer)
51-
52-
53-
app = FastAPI(
54-
title="agentevals API",
55-
version=__version__,
56-
description="REST API for evaluating agent traces using ADK's scoring framework",
57-
lifespan=lifespan,
58-
)
59-
60-
app.add_middleware(
61-
CORSMiddleware,
62-
allow_origins=["http://localhost:5173", "http://localhost:5174"],
63-
allow_credentials=True,
64-
allow_methods=["*"],
65-
allow_headers=["*"],
66-
expose_headers=["*"],
67-
)
68-
69-
app.include_router(router, prefix="/api")
70-
app.include_router(debug_router, prefix="/api/debug")
71-
72-
_live_mode = os.getenv("AGENTEVALS_LIVE") == "1"
73-
74-
if _live_mode:
75-
from fastapi import Request as _Request
76-
from fastapi import WebSocket
7774

78-
from ..streaming.ws_server import StreamingTraceManager
79-
from .streaming_routes import streaming_router
80-
81-
app.include_router(streaming_router, prefix="/api/streaming")
82-
app.state.trace_manager = StreamingTraceManager()
83-
84-
@app.websocket("/ws/traces")
85-
async def websocket_endpoint(websocket: WebSocket):
86-
await websocket.app.state.trace_manager.handle_connection(websocket)
87-
88-
@app.get("/stream/ui-updates")
89-
async def ui_updates_stream(request: _Request):
90-
mgr = request.app.state.trace_manager
91-
92-
async def event_generator():
93-
queue = mgr.register_sse_client()
94-
try:
95-
while True:
96-
event = await queue.get()
97-
if event is None:
98-
break
99-
yield f"data: {json.dumps(event)}\n\n"
100-
except asyncio.CancelledError:
101-
pass
102-
finally:
103-
mgr.unregister_sse_client(queue)
104-
105-
return StreamingResponse(
106-
event_generator(),
107-
media_type="text/event-stream",
108-
headers={
109-
"Cache-Control": "no-cache",
110-
"Connection": "keep-alive",
111-
},
112-
)
75+
app.add_middleware(
76+
CORSMiddleware,
77+
allow_origins=["http://localhost:5173", "http://localhost:5174"],
78+
allow_credentials=True,
79+
allow_methods=["*"],
80+
allow_headers=["*"],
81+
expose_headers=["*"],
82+
)
83+
84+
app.include_router(router, prefix="/api")
85+
app.include_router(debug_router, prefix="/api/debug")
86+
87+
if trace_manager is not None:
88+
app.state.trace_manager = trace_manager
89+
90+
if enable_streaming:
91+
if trace_manager is None:
92+
raise ValueError("enable_streaming requires a trace_manager")
93+
94+
from fastapi import Request as _Request
95+
from fastapi import WebSocket
96+
97+
from .streaming_routes import streaming_router
98+
99+
app.include_router(streaming_router, prefix="/api/streaming")
100+
101+
@app.websocket("/ws/traces")
102+
async def websocket_endpoint(websocket: WebSocket):
103+
await websocket.app.state.trace_manager.handle_connection(websocket)
104+
105+
@app.get("/stream/ui-updates")
106+
async def ui_updates_stream(request: _Request):
107+
mgr = request.app.state.trace_manager
108+
109+
async def event_generator():
110+
queue = mgr.register_sse_client()
111+
try:
112+
while True:
113+
event = await queue.get()
114+
if event is None:
115+
break
116+
yield f"data: {json.dumps(event)}\n\n"
117+
except asyncio.CancelledError:
118+
pass
119+
finally:
120+
mgr.unregister_sse_client(queue)
121+
122+
return StreamingResponse(
123+
event_generator(),
124+
media_type="text/event-stream",
125+
headers={
126+
"Cache-Control": "no-cache",
127+
"Connection": "keep-alive",
128+
},
129+
)
130+
131+
static_dir = Path(__file__).parent.parent / "_static"
132+
has_ui = static_dir.is_dir() and (static_dir / "index.html").exists()
133+
134+
if has_ui and not os.getenv("AGENTEVALS_HEADLESS"):
135+
from fastapi.responses import FileResponse
136+
from fastapi.staticfiles import StaticFiles
113137

138+
app.mount("/assets", StaticFiles(directory=static_dir / "assets"), name="ui-assets")
114139

115-
_static_dir = Path(__file__).parent.parent / "_static"
116-
_has_ui = _static_dir.is_dir() and (_static_dir / "index.html").exists()
140+
@app.get("/")
141+
async def root():
142+
return FileResponse(static_dir / "index.html")
117143

118-
if _has_ui and not os.getenv("AGENTEVALS_HEADLESS"):
119-
from fastapi.responses import FileResponse
120-
from fastapi.staticfiles import StaticFiles
144+
@app.get("/{path:path}")
145+
async def spa_fallback(path: str):
146+
file_path = static_dir / path
147+
if file_path.is_file():
148+
return FileResponse(file_path)
149+
return FileResponse(static_dir / "index.html")
121150

122-
app.mount("/assets", StaticFiles(directory=_static_dir / "assets"), name="ui-assets")
151+
return app
123152

124-
@app.get("/")
125-
async def root():
126-
return FileResponse(_static_dir / "index.html")
127153

128-
@app.get("/{path:path}")
129-
async def spa_fallback(path: str):
130-
file_path = _static_dir / path
131-
if file_path.is_file():
132-
return FileResponse(file_path)
133-
return FileResponse(_static_dir / "index.html")
154+
app = create_app()

src/agentevals/api/dependencies.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,3 @@ def require_trace_manager(request: Request) -> StreamingTraceManager:
2626
if mgr is None:
2727
raise HTTPException(status_code=503, detail="Live mode not enabled")
2828
return mgr
29-
30-
31-
def require_trace_manager_from_app(app: Any) -> StreamingTraceManager:
32-
"""Return the StreamingTraceManager from app, raising RuntimeError if missing."""
33-
mgr = get_trace_manager_from_app(app)
34-
if mgr is None:
35-
raise RuntimeError("Live mode not enabled")
36-
return mgr

src/agentevals/api/otlp_app.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,27 @@
11
"""Minimal FastAPI app for the OTLP HTTP receiver on port 4318.
22
3-
Shares the StreamingTraceManager with the main app (port 8001).
43
Mounts only the /v1/traces and /v1/logs endpoints.
54
"""
65

7-
from contextlib import asynccontextmanager
6+
from __future__ import annotations
7+
8+
from typing import TYPE_CHECKING
89

910
from fastapi import FastAPI
1011

1112
from .otlp_routes import otlp_router
1213

14+
if TYPE_CHECKING:
15+
from ..streaming.ws_server import StreamingTraceManager
1316

14-
@asynccontextmanager
15-
async def lifespan(app: FastAPI):
16-
from .app import app as main_app
1717

18-
mgr = getattr(main_app.state, "trace_manager", None)
19-
if mgr:
20-
app.state.trace_manager = mgr
21-
yield
18+
def create_otlp_app(*, trace_manager: StreamingTraceManager | None = None) -> FastAPI:
19+
"""Create the OTLP HTTP receiver app."""
20+
app = FastAPI(title="agentevals OTLP receiver")
21+
if trace_manager is not None:
22+
app.state.trace_manager = trace_manager
23+
app.include_router(otlp_router)
24+
return app
2225

2326

24-
otlp_app = FastAPI(title="agentevals OTLP receiver", lifespan=lifespan)
25-
otlp_app.include_router(otlp_router)
27+
otlp_app = create_otlp_app()

src/agentevals/cli.py

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -534,27 +534,26 @@ async def _run_servers(
534534
otlp_grpc_port: int,
535535
*,
536536
mcp_port: int | None = None,
537-
reload: bool = False,
538-
reload_dirs: list[str] | None = None,
539537
log_level: str = "warning",
540538
) -> None:
541539
"""Start API, OTLP HTTP+gRPC receivers, and optional MCP (Streamable HTTP)."""
542540
import uvicorn
543541

542+
from .api.app import create_app
543+
from .api.otlp_app import create_otlp_app
544+
from .streaming.ws_server import StreamingTraceManager
545+
544546
shared_kwargs: dict = {
545547
"host": host,
546-
"reload": reload,
547548
"log_level": log_level,
548549
}
549-
if reload_dirs:
550-
shared_kwargs["reload_dirs"] = reload_dirs
551550

552-
# TODO #99 Create the manager and pass it into the Server constructors instead of injecting it into the app state.
551+
mgr = StreamingTraceManager()
552+
main_app = create_app(trace_manager=mgr, enable_streaming=True)
553+
otlp_app = create_otlp_app(trace_manager=mgr)
553554

554-
main_server = uvicorn.Server(uvicorn.Config("agentevals.api.app:app", port=port, **shared_kwargs))
555-
otlp_http_server = uvicorn.Server(
556-
uvicorn.Config("agentevals.api.otlp_app:otlp_app", port=otlp_http_port, **shared_kwargs)
557-
)
555+
main_server = uvicorn.Server(uvicorn.Config(main_app, port=port, **shared_kwargs))
556+
otlp_http_server = uvicorn.Server(uvicorn.Config(otlp_app, port=otlp_http_port, **shared_kwargs))
558557
uvicorn_servers: list = [main_server, otlp_http_server]
559558

560559
if mcp_port is not None:
@@ -571,10 +570,6 @@ async def _run_servers(
571570
mcp_uvicorn = uvicorn.Server(uvicorn.Config(mcp_app, **mcp_kwargs))
572571
uvicorn_servers.append(mcp_uvicorn)
573572

574-
from .api.app import app as main_app
575-
from .api.dependencies import require_trace_manager_from_app
576-
577-
mgr = require_trace_manager_from_app(main_app)
578573
otlp_grpc_server = create_otlp_grpc_server(host=host, port=otlp_grpc_port, manager=mgr)
579574
await otlp_grpc_server.start()
580575

@@ -703,17 +698,13 @@ def serve(
703698
click.echo("Waiting for agent connections...")
704699
click.echo()
705700

706-
src_path = Path(__file__).parent.parent
707-
reload_dirs = [str(src_path)]
708701
asyncio.run(
709702
_run_servers(
710703
host,
711704
port,
712705
otlp_http_port,
713706
otlp_grpc_port,
714707
mcp_port=mcp_port,
715-
reload=True,
716-
reload_dirs=reload_dirs,
717708
log_level="info",
718709
)
719710
)

tests/integration/conftest.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,18 +101,12 @@ def live_servers():
101101
os.environ["AGENTEVALS_LIVE"] = "1"
102102
os.environ["AGENTEVALS_HEADLESS"] = "1"
103103

104-
import importlib
104+
from agentevals.api.app import create_app
105+
from agentevals.api.otlp_app import create_otlp_app
105106

106-
from agentevals.api import app as app_module
107-
108-
importlib.reload(app_module)
109-
110-
from agentevals.api.app import app
111-
from agentevals.api.otlp_app import otlp_app
112-
113-
mgr = getattr(app.state, "trace_manager", None)
114-
if mgr:
115-
otlp_app.state.trace_manager = mgr
107+
mgr = StreamingTraceManager()
108+
app = create_app(trace_manager=mgr, enable_streaming=True)
109+
otlp_app = create_otlp_app(trace_manager=mgr)
116110

117111
main_config = uvicorn.Config(app, host="127.0.0.1", port=main_port, log_level="warning")
118112
otlp_config = uvicorn.Config(otlp_app, host="127.0.0.1", port=otlp_http_port, log_level="warning")

0 commit comments

Comments
 (0)