diff --git a/ddapm_test_agent/agent.py b/ddapm_test_agent/agent.py index bf86cbf6..c9eb53e1 100644 --- a/ddapm_test_agent/agent.py +++ b/ddapm_test_agent/agent.py @@ -1888,6 +1888,8 @@ def make_app( dd_api_key: str | None, disable_llmobs_data_forwarding: bool, enable_web_ui: bool = False, + persist_llmobs_traces: bool = False, + persist_llmobs_db_path: Optional[str] = None, ) -> web.Application: agent = Agent() @@ -1977,7 +1979,11 @@ def make_app( # Add LLM Observability Event Platform API routes # These provide Datadog Event Platform compatible endpoints for local development - llmobs_event_platform_api = LLMObsEventPlatformAPI(agent) + llmobs_event_platform_api = LLMObsEventPlatformAPI( + agent, + persist_llmobs_traces=persist_llmobs_traces, + persist_llmobs_db_path=persist_llmobs_db_path, + ) app["llmobs_event_platform_api"] = llmobs_event_platform_api app.add_routes(llmobs_event_platform_api.get_routes()) @@ -1996,6 +2002,11 @@ async def _cleanup_claude_proxy(app: web.Application) -> None: app.on_cleanup.append(_cleanup_claude_proxy) + async def _close_llmobs_persist(app: web.Application) -> None: + llmobs_event_platform_api.close() + + app.on_cleanup.append(_close_llmobs_persist) + checks = Checks( checks=[ CheckMetaTracerVersionHeader, @@ -2352,6 +2363,18 @@ def main(args: Optional[List[str]] = None) -> None: default=os.environ.get("ENABLE_CLAUDE_CODE_HOOKS", "").lower() in ("true", "1", "yes"), help="Enable writing Claude Code hooks to ~/.claude/settings.json", ) + parser.add_argument( + "--persist-llmobs-traces", + action="store_true", + default=os.environ.get("PERSIST_LLMOBS_TRACES", "").lower() in ("true", "1", "yes"), + help="Persist LLMObs spans to a local SQLite DB and load them on startup.", + ) + parser.add_argument( + "--persist-llmobs-db", + type=str, + default=os.environ.get("PERSIST_LLMOBS_DB", ""), + help="Path to SQLite DB for LLMObs persistence (default: ~/.ddapm-test-agent/llmobs.db).", + ) parsed_args = parser.parse_args(args=args) logging.basicConfig(level=parsed_args.log_level) @@ -2381,6 +2404,12 @@ def main(args: Optional[List[str]] = None) -> None: "default snapshot directory %r does not exist or is not readable. Snapshotting will not work.", os.path.abspath(parsed_args.snapshot_dir), ) + persist_llmobs_db_path = parsed_args.persist_llmobs_db or None + if parsed_args.persist_llmobs_traces and not persist_llmobs_db_path: + from .llmobs_persistence import DEFAULT_DB_PATH + + persist_llmobs_db_path = str(DEFAULT_DB_PATH) + app = make_app( enabled_checks=parsed_args.enabled_checks, log_span_fmt=parsed_args.log_span_fmt, @@ -2402,6 +2431,8 @@ def main(args: Optional[List[str]] = None) -> None: dd_api_key=parsed_args.dd_api_key, disable_llmobs_data_forwarding=parsed_args.disable_llmobs_data_forwarding, enable_web_ui=parsed_args.web_ui_port > 0, + persist_llmobs_traces=parsed_args.persist_llmobs_traces, + persist_llmobs_db_path=persist_llmobs_db_path, ) # Validate port configuration diff --git a/ddapm_test_agent/lapdog_cli.py b/ddapm_test_agent/lapdog_cli.py index 78806a64..daee0bdc 100644 --- a/ddapm_test_agent/lapdog_cli.py +++ b/ddapm_test_agent/lapdog_cli.py @@ -96,7 +96,13 @@ def _start_lapdog(port: int, extra_args: Optional[List[str]] = None) -> None: """Start lapdog in background with logs to the log file; wait until ready or exit on timeout. Return (process, log_path).""" log_path = _log_file_path() os.makedirs(os.path.dirname(log_path), exist_ok=True) - args = [sys.executable, "-m", "ddapm_test_agent.agent", "--enable-claude-code-hooks"] + args = [ + sys.executable, + "-m", + "ddapm_test_agent.agent", + "--enable-claude-code-hooks", + "--persist-llmobs-traces", + ] if extra_args: args += extra_args with open(log_path, "w") as log_file: diff --git a/ddapm_test_agent/llmobs_event_platform.py b/ddapm_test_agent/llmobs_event_platform.py index 11d23117..50db6117 100644 --- a/ddapm_test_agent/llmobs_event_platform.py +++ b/ddapm_test_agent/llmobs_event_platform.py @@ -5,6 +5,7 @@ import json import logging import re +import sqlite3 import time from typing import Any from typing import Awaitable @@ -20,6 +21,9 @@ import msgpack from . import llmobs_query_parser +from .llmobs_persistence import init_llmobs_db +from .llmobs_persistence import load_all_spans +from .llmobs_persistence import upsert_spans if TYPE_CHECKING: from .agent import Agent @@ -649,18 +653,35 @@ def build_event_platform_list_response( class LLMObsEventPlatformAPI: """Handler for Event Platform API requests.""" - def __init__(self, agent: "Agent"): + def __init__( + self, + agent: "Agent", + persist_llmobs_traces: bool = False, + persist_llmobs_db_path: Optional[str] = None, + ): self.agent = agent self._query_results: Dict[str, Dict[str, Any]] = {} self.decoded_llmobs_span_events: Dict[int, List[Dict[str, Any]]] = {} self._claude_hooks_api: Optional["ClaudeHooksAPI"] = None + self._persist_conn: Optional[sqlite3.Connection] = None + self._persisted_spans: List[Dict[str, Any]] = [] + + if persist_llmobs_traces and persist_llmobs_db_path: + self._persist_conn = init_llmobs_db(persist_llmobs_db_path) + self._persisted_spans = load_all_spans(self._persist_conn) def set_claude_hooks_api(self, api: "ClaudeHooksAPI") -> None: """Wire up the Claude hooks API so its spans appear in LLMObs queries.""" self._claude_hooks_api = api + def close(self) -> None: + """Close the SQLite persistence connection, if open.""" + if self._persist_conn is not None: + self._persist_conn.close() + self._persist_conn = None + def get_llmobs_spans(self, token: Optional[str] = None) -> List[Dict[str, Any]]: - """Get all LLMObs spans from stored requests.""" + """Get all LLMObs spans from stored requests and persisted DB.""" requests = self.agent._requests_by_session(token) if token else self.agent._requests all_spans = [] @@ -674,14 +695,21 @@ def get_llmobs_spans(self, token: Optional[str] = None) -> List[Dict[str, Any]]: events = decode_llmobs_payload(data, content_type) spans = extract_spans_from_events(events) self.decoded_llmobs_span_events[req_id] = spans + if self._persist_conn is not None and spans: + upsert_spans(self._persist_conn, spans) else: spans = self.decoded_llmobs_span_events[req_id] all_spans.extend(spans) except Exception as e: log.warning(f"Failed to extract spans from request: {e}") + all_spans.extend(self._persisted_spans) + if self._claude_hooks_api: - all_spans.extend(self._claude_hooks_api._assembled_spans) + assembled = self._claude_hooks_api._assembled_spans + if self._persist_conn is not None and assembled: + upsert_spans(self._persist_conn, assembled) + all_spans.extend(assembled) all_spans.sort(key=lambda s: s.get("start_ns", 0), reverse=True) return all_spans @@ -691,26 +719,28 @@ def update_spans(self, update_data: bytes, content_type: str) -> int: events = decode_llmobs_payload(update_data, content_type) update_span_list = extract_spans_from_events(events) - # Build index of all existing spans (stored requests + hooks assembled) + # Build index of all existing spans (stored requests + persisted + hooks) + # Use str(span_id) as key so int/str don't create duplicate entries all_spans = self.get_llmobs_spans() - span_index = {s.get("span_id"): s for s in all_spans} + span_index = {} + for s in all_spans: + sid = s.get("span_id") + if sid is not None: + span_index[str(sid)] = s updated = 0 + updated_spans: List[Dict[str, Any]] = [] for update in update_span_list: sid = update.get("span_id") - existing = span_index.get(sid) + existing = span_index.get(str(sid)) if sid is not None else None if existing: _deep_merge(update, existing) + updated_spans.append(existing) updated += 1 + if self._persist_conn is not None and updated_spans: + upsert_spans(self._persist_conn, updated_spans) return updated - async def handle_llmobs_update(self, request: Request) -> web.Response: - """Handle POST /evp_proxy/v2/api/v2/llmobs/update — update existing spans.""" - data = await request.read() - content_type = request.content_type or "" - updated = self.update_spans(data, content_type) - return web.json_response({"updated": updated}) - async def handle_logs_analytics_list(self, request: Request) -> web.Response: """Handle POST /api/unstable/llm-obs-query-rewriter/list endpoint.""" try: diff --git a/ddapm_test_agent/llmobs_persistence.py b/ddapm_test_agent/llmobs_persistence.py new file mode 100644 index 00000000..4ba1cb13 --- /dev/null +++ b/ddapm_test_agent/llmobs_persistence.py @@ -0,0 +1,89 @@ +"""SQLite persistence for LLMObs spans (load on startup, append on decode).""" + +import json +import logging +import sqlite3 +import time +from pathlib import Path +from typing import Any +from typing import Dict +from typing import List +from typing import Optional + +log = logging.getLogger(__name__) + +DEFAULT_DB_PATH = Path.home() / ".ddapm-test-agent" / "llmobs.db" + + +def _ensure_db_dir(path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + + +def init_llmobs_db(db_path: Optional[str] = None) -> sqlite3.Connection: + """Create or open SQLite DB and ensure llmobs_spans table exists.""" + path = Path(db_path) if db_path else DEFAULT_DB_PATH + _ensure_db_dir(path) + conn = sqlite3.connect(str(path)) + cur = conn.cursor() + cur.execute( + """ + CREATE TABLE IF NOT EXISTS llmobs_spans ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + span_id TEXT UNIQUE, + span_json TEXT NOT NULL, + created_at REAL + ) + """ + ) + conn.commit() + log.info("LLMObs persistence initialized db_path=%s", path.resolve()) + return conn + + +def upsert_spans(conn: sqlite3.Connection, spans: List[Dict[str, Any]]) -> None: + """Insert or update spans by span_id. Updates existing rows with merged content.""" + if not spans: + return + now = time.time() + updated_count = 0 + inserted_count = 0 + cur = conn.cursor() + for s in spans: + span_id = s.get("span_id") + if span_id is None: + continue + span_id_str = str(span_id) + duration = s.get("duration") + span_json = json.dumps(s) + cur.execute( + "UPDATE llmobs_spans SET span_json = ?, created_at = ? WHERE span_id = ?", + (span_json, now, span_id_str), + ) + if cur.rowcount == 0: + cur.execute( + "INSERT INTO llmobs_spans (span_id, span_json, created_at) VALUES (?, ?, ?)", + (span_id_str, span_json, now), + ) + inserted_count += 1 + log.debug("upsert span_id=%s INSERT duration=%s", span_id_str, duration) + else: + updated_count += 1 + log.debug("upsert span_id=%s UPDATE duration=%s", span_id_str, duration) + conn.commit() + if updated_count or inserted_count: + log.info("upsert_spans total=%d updated=%d inserted=%d", len(spans), updated_count, inserted_count) + + +def load_all_spans(conn: sqlite3.Connection) -> List[Dict[str, Any]]: + """Load all persisted spans from the DB (for startup).""" + cur = conn.cursor() + cur.execute("SELECT span_json FROM llmobs_spans ORDER BY id") + rows = cur.fetchall() + result: List[Dict[str, Any]] = [] + for (span_json,) in rows: + try: + result.append(json.loads(span_json)) + except (json.JSONDecodeError, TypeError): + continue + log.info("load_all_spans count=%d", len(result)) + return result diff --git a/releasenotes/notes/persisting-llmobs-traces-68f33d108d9a9441.yaml b/releasenotes/notes/persisting-llmobs-traces-68f33d108d9a9441.yaml new file mode 100644 index 00000000..9d9273c0 --- /dev/null +++ b/releasenotes/notes/persisting-llmobs-traces-68f33d108d9a9441.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Adds new options ``--persist-llmobs-traces`` (``PERSIST_LLMOBS_TRACES``) and ``--persist-llmobs-db`` (``PERSIST_LLMOBS_DB``) that will persist LLM Observability events sent to the test agent locally to the given path (defaulting to ~/.ddapm-test-agent/llmobs.db). + - | + `lapdog start` and `lapdog claude` now enable the test agent with --persist-llmobs-traces diff --git a/tests/conftest.py b/tests/conftest.py index d78bddce..f5a898fb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -153,6 +153,16 @@ def disable_llmobs_data_forwarding() -> Generator[bool, None, None]: yield True +@pytest.fixture +def persist_llmobs_traces() -> Generator[bool, None, None]: + yield False + + +@pytest.fixture +def persist_llmobs_db_path() -> Generator[Optional[str], None, None]: + yield None + + @pytest.fixture async def agent_app( aiohttp_server, @@ -175,6 +185,8 @@ async def agent_app( dd_site, dd_api_key, disable_llmobs_data_forwarding, + persist_llmobs_traces, + persist_llmobs_db_path, ): app = await aiohttp_server( make_app( @@ -197,6 +209,8 @@ async def agent_app( dd_site=dd_site, dd_api_key=dd_api_key, disable_llmobs_data_forwarding=disable_llmobs_data_forwarding, + persist_llmobs_traces=persist_llmobs_traces, + persist_llmobs_db_path=persist_llmobs_db_path, ) ) yield app diff --git a/tests/test_llmobs_event_platform.py b/tests/test_llmobs_event_platform.py index 59b6f5cf..601538ac 100644 --- a/tests/test_llmobs_event_platform.py +++ b/tests/test_llmobs_event_platform.py @@ -1,10 +1,26 @@ import gzip import time +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import cast import msgpack import pytest +# Override persistence fixtures for this file so agent uses a temp DB for LLMObs spans +@pytest.fixture +def persist_llmobs_traces(): + return True + + +@pytest.fixture +def persist_llmobs_db_path(tmp_path): + return str(tmp_path / "llmobs.db") + + @pytest.fixture def llmobs_payload(): return { @@ -383,3 +399,199 @@ async def test_facet_range_info_with_filter_query(agent): # Should only include app-1 spans (1s and 2s) assert data["result"]["min"] == 1000000000 assert data["result"]["max"] == 2000000000 + + +class _MockAgentEmptyRequests: + """Minimal agent with no stored requests (simulates post-restart).""" + + _requests: List[Any] = [] + + def _requests_by_session(self, token: Optional[str]) -> List[Any]: + return [] + + def _request_data(self, req: Any) -> bytes: + return b"" + + +async def _llmobs_list(agent_client: Any) -> Dict[str, Any]: + """POST list and return JSON.""" + resp = await agent_client.post( + "/api/unstable/llm-obs-query-rewriter/list?type=llmobs", + json={"list": {"search": {"query": ""}, "limit": 50}}, + ) + assert resp.status == 200 + return cast(Dict[str, Any], await resp.json()) + + +async def test_persistence_restart_sees_persisted_spans( + agent, persist_llmobs_db_path, llmobs_payload +): + """After submit + list, a new API instance loading from same DB sees the spans.""" + await _submit_llmobs_payload(agent, llmobs_payload) + list_data = await _llmobs_list(agent) + assert list_data["hitCount"] == 2 + + from ddapm_test_agent.llmobs_event_platform import LLMObsEventPlatformAPI + + mock_agent = _MockAgentEmptyRequests() + api_restart = LLMObsEventPlatformAPI( + mock_agent, persist_llmobs_traces=True, persist_llmobs_db_path=persist_llmobs_db_path + ) + spans = api_restart.get_llmobs_spans() + api_restart.close() + assert len(spans) == 2 + span_ids = {s.get("span_id") for s in spans} + assert span_ids == {"span-root", "span-child"} + + +async def test_persistence_upsert_updates_duration_after_restart( + agent, persist_llmobs_db_path +): + """Second payload with same span_id but new duration is upserted; restart sees new duration.""" + payload_v1 = { + "ml_app": "test-app", + "tags": [], + "spans": [ + { + "span_id": "span-one", + "trace_id": "t1", + "name": "x", + "status": "ok", + "duration": 0, + "start_ns": int(time.time() * 1_000_000_000), + "meta": {"span": {"kind": "llm"}}, + "tags": [], + }, + ], + } + payload_v2 = { + "ml_app": "test-app", + "tags": [], + "spans": [ + { + "span_id": "span-one", + "trace_id": "t1", + "name": "x", + "status": "ok", + "duration": 5_000_000_000, + "start_ns": int(time.time() * 1_000_000_000), + "meta": {"span": {"kind": "llm"}}, + "tags": [], + }, + ], + } + await _submit_llmobs_payload(agent, payload_v1) + await _llmobs_list(agent) + await _submit_llmobs_payload(agent, payload_v2) + await _llmobs_list(agent) + + from ddapm_test_agent.llmobs_event_platform import LLMObsEventPlatformAPI + + mock_agent = _MockAgentEmptyRequests() + api_restart = LLMObsEventPlatformAPI( + mock_agent, persist_llmobs_traces=True, persist_llmobs_db_path=persist_llmobs_db_path + ) + spans = api_restart.get_llmobs_spans() + api_restart.close() + assert len(spans) == 1 + assert spans[0]["duration"] == 5_000_000_000 + + +async def test_persistence_update_spans_persists_merged_duration( + agent, persist_llmobs_db_path +): + """update_spans (merge + upsert) persists so restart sees updated duration.""" + payload_initial = { + "ml_app": "test-app", + "tags": [], + "spans": [ + { + "span_id": "span-update-me", + "trace_id": "t1", + "name": "x", + "status": "ok", + "duration": 0, + "start_ns": int(time.time() * 1_000_000_000), + "meta": {"span": {"kind": "llm"}}, + "tags": [], + }, + ], + } + await _submit_llmobs_payload(agent, payload_initial) + await _llmobs_list(agent) + + update_payload = { + "ml_app": "test-app", + "tags": [], + "spans": [ + { + "span_id": "span-update-me", + "duration": 10_000_000_000, + }, + ], + } + data = gzip.compress(msgpack.packb(update_payload)) + resp = await agent.post( + "/evp_proxy/v2/api/v2/llmobs/update", + headers={"Content-Type": "application/msgpack", "Content-Encoding": "gzip"}, + data=data, + ) + assert resp.status == 200 + + from ddapm_test_agent.llmobs_event_platform import LLMObsEventPlatformAPI + + mock_agent = _MockAgentEmptyRequests() + api_restart = LLMObsEventPlatformAPI( + mock_agent, persist_llmobs_traces=True, persist_llmobs_db_path=persist_llmobs_db_path + ) + spans = api_restart.get_llmobs_spans() + api_restart.close() + assert len(spans) == 1 + assert spans[0]["duration"] == 10_000_000_000 + + +async def test_persistence_span_id_int_str_same_row( + agent, persist_llmobs_db_path +): + """Span with span_id int then update with str (or vice versa) updates same row.""" + payload_str_id = { + "ml_app": "test-app", + "tags": [], + "spans": [ + { + "span_id": "12345", + "trace_id": "t1", + "name": "x", + "status": "ok", + "duration": 1, + "start_ns": int(time.time() * 1_000_000_000), + "meta": {"span": {"kind": "llm"}}, + "tags": [], + }, + ], + } + await _submit_llmobs_payload(agent, payload_str_id) + await _llmobs_list(agent) + + update_payload = { + "ml_app": "test-app", + "tags": [], + "spans": [{"span_id": 12345, "duration": 99}], + } + data = gzip.compress(msgpack.packb(update_payload)) + await agent.post( + "/evp_proxy/v2/api/v2/llmobs/update", + headers={"Content-Type": "application/msgpack", "Content-Encoding": "gzip"}, + data=data, + ) + + from ddapm_test_agent.llmobs_event_platform import LLMObsEventPlatformAPI + + mock_agent = _MockAgentEmptyRequests() + api_restart = LLMObsEventPlatformAPI( + mock_agent, persist_llmobs_traces=True, persist_llmobs_db_path=persist_llmobs_db_path + ) + spans = api_restart.get_llmobs_spans() + api_restart.close() + assert len(spans) == 1 + assert spans[0]["duration"] == 99 diff --git a/tests/test_llmobs_persistence.py b/tests/test_llmobs_persistence.py new file mode 100644 index 00000000..fb87fac4 --- /dev/null +++ b/tests/test_llmobs_persistence.py @@ -0,0 +1,94 @@ +"""Unit tests for LLMObs SQLite persistence.""" + +import json +import sqlite3 +from pathlib import Path + +import pytest + +from ddapm_test_agent.llmobs_persistence import init_llmobs_db +from ddapm_test_agent.llmobs_persistence import load_all_spans +from ddapm_test_agent.llmobs_persistence import upsert_spans + + +@pytest.fixture +def db_path(tmp_path: Path) -> Path: + return tmp_path / "llmobs.db" + + +@pytest.fixture +def conn(db_path: Path) -> sqlite3.Connection: + return init_llmobs_db(str(db_path)) + + +def test_init_creates_db_and_table(conn: sqlite3.Connection, db_path: Path) -> None: + assert db_path.exists() + cur = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='llmobs_spans'") + assert cur.fetchone() is not None + cur = conn.execute("PRAGMA table_info(llmobs_spans)") + columns = {row[1] for row in cur.fetchall()} + assert columns >= {"id", "span_id", "span_json", "created_at"} + + +def test_init_idempotent(db_path: Path) -> None: + c1 = init_llmobs_db(str(db_path)) + c2 = init_llmobs_db(str(db_path)) + c1.close() + c2.close() + + +def test_upsert_spans_empty(conn: sqlite3.Connection) -> None: + upsert_spans(conn, []) + rows = conn.execute("SELECT COUNT(*) FROM llmobs_spans").fetchone() + assert rows[0] == 0 + + +def test_upsert_spans_skips_missing_span_id(conn: sqlite3.Connection) -> None: + upsert_spans(conn, [{"name": "x", "duration": 1}]) + rows = conn.execute("SELECT COUNT(*) FROM llmobs_spans").fetchone() + assert rows[0] == 0 + + +def test_upsert_new_span_insert(conn: sqlite3.Connection) -> None: + upsert_spans(conn, [{"span_id": "s1", "duration": 0}]) + loaded = load_all_spans(conn) + assert len(loaded) == 1 + assert loaded[0]["span_id"] == "s1" + assert loaded[0]["duration"] == 0 + + +def test_upsert_same_span_id_updates(conn: sqlite3.Connection) -> None: + upsert_spans(conn, [{"span_id": "s1", "duration": 0}]) + upsert_spans(conn, [{"span_id": "s1", "duration": 5}]) + loaded = load_all_spans(conn) + assert len(loaded) == 1 + assert loaded[0]["duration"] == 5 + + +def test_upsert_span_id_int_and_str_same_row(conn: sqlite3.Connection) -> None: + upsert_spans(conn, [{"span_id": 123, "x": 1}]) + upsert_spans(conn, [{"span_id": "123", "x": 2}]) + loaded = load_all_spans(conn) + assert len(loaded) == 1 + assert loaded[0]["x"] == 2 + + +def test_load_all_spans_skips_bad_json(conn: sqlite3.Connection) -> None: + conn.execute( + "INSERT INTO llmobs_spans (span_id, span_json, created_at) VALUES (?, ?, ?)", + ("bad", "not valid json", 0.0), + ) + conn.execute( + "INSERT INTO llmobs_spans (span_id, span_json, created_at) VALUES (?, ?, ?)", + ("ok", json.dumps({"span_id": "ok", "duration": 1}), 0.0), + ) + conn.commit() + loaded = load_all_spans(conn) + assert len(loaded) == 1 + assert loaded[0]["span_id"] == "ok" + + +def test_load_all_spans_order(conn: sqlite3.Connection) -> None: + upsert_spans(conn, [{"span_id": "s1"}, {"span_id": "s2"}, {"span_id": "s3"}]) + loaded = load_all_spans(conn) + assert [s["span_id"] for s in loaded] == ["s1", "s2", "s3"]