Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1888,6 +1888,8 @@ def make_app(
dd_api_key: str | None,
disable_llmobs_data_forwarding: bool,
enable_web_ui: bool = False,
persist_llmobs_traces: bool = False,
persist_llmobs_db_path: Optional[str] = None,
) -> web.Application:
agent = Agent()

Expand Down Expand Up @@ -1977,7 +1979,11 @@ def make_app(

# Add LLM Observability Event Platform API routes
# These provide Datadog Event Platform compatible endpoints for local development
llmobs_event_platform_api = LLMObsEventPlatformAPI(agent)
llmobs_event_platform_api = LLMObsEventPlatformAPI(
agent,
persist_llmobs_traces=persist_llmobs_traces,
persist_llmobs_db_path=persist_llmobs_db_path,
)
app["llmobs_event_platform_api"] = llmobs_event_platform_api
app.add_routes(llmobs_event_platform_api.get_routes())

Expand All @@ -1996,6 +2002,11 @@ async def _cleanup_claude_proxy(app: web.Application) -> None:

app.on_cleanup.append(_cleanup_claude_proxy)

async def _close_llmobs_persist(app: web.Application) -> None:
llmobs_event_platform_api.close()

app.on_cleanup.append(_close_llmobs_persist)

checks = Checks(
checks=[
CheckMetaTracerVersionHeader,
Expand Down Expand Up @@ -2352,6 +2363,18 @@ def main(args: Optional[List[str]] = None) -> None:
default=os.environ.get("ENABLE_CLAUDE_CODE_HOOKS", "").lower() in ("true", "1", "yes"),
help="Enable writing Claude Code hooks to ~/.claude/settings.json",
)
parser.add_argument(
"--persist-llmobs-traces",
action="store_true",
default=os.environ.get("PERSIST_LLMOBS_TRACES", "").lower() in ("true", "1", "yes"),
help="Persist LLMObs spans to a local SQLite DB and load them on startup.",
)
parser.add_argument(
"--persist-llmobs-db",
type=str,
default=os.environ.get("PERSIST_LLMOBS_DB", ""),
help="Path to SQLite DB for LLMObs persistence (default: ~/.ddapm-test-agent/llmobs.db).",
)
parsed_args = parser.parse_args(args=args)
logging.basicConfig(level=parsed_args.log_level)

Expand Down Expand Up @@ -2381,6 +2404,12 @@ def main(args: Optional[List[str]] = None) -> None:
"default snapshot directory %r does not exist or is not readable. Snapshotting will not work.",
os.path.abspath(parsed_args.snapshot_dir),
)
persist_llmobs_db_path = parsed_args.persist_llmobs_db or None
if parsed_args.persist_llmobs_traces and not persist_llmobs_db_path:
from .llmobs_persistence import DEFAULT_DB_PATH

persist_llmobs_db_path = str(DEFAULT_DB_PATH)

app = make_app(
enabled_checks=parsed_args.enabled_checks,
log_span_fmt=parsed_args.log_span_fmt,
Expand All @@ -2402,6 +2431,8 @@ def main(args: Optional[List[str]] = None) -> None:
dd_api_key=parsed_args.dd_api_key,
disable_llmobs_data_forwarding=parsed_args.disable_llmobs_data_forwarding,
enable_web_ui=parsed_args.web_ui_port > 0,
persist_llmobs_traces=parsed_args.persist_llmobs_traces,
persist_llmobs_db_path=persist_llmobs_db_path,
)

# Validate port configuration
Expand Down
8 changes: 7 additions & 1 deletion ddapm_test_agent/lapdog_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,13 @@ def _start_lapdog(port: int, extra_args: Optional[List[str]] = None) -> None:
"""Start lapdog in background with logs to the log file; wait until ready or exit on timeout. Return (process, log_path)."""
log_path = _log_file_path()
os.makedirs(os.path.dirname(log_path), exist_ok=True)
args = [sys.executable, "-m", "ddapm_test_agent.agent", "--enable-claude-code-hooks"]
args = [
sys.executable,
"-m",
"ddapm_test_agent.agent",
"--enable-claude-code-hooks",
"--persist-llmobs-traces",
]
if extra_args:
args += extra_args
with open(log_path, "w") as log_file:
Expand Down
56 changes: 43 additions & 13 deletions ddapm_test_agent/llmobs_event_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import re
import sqlite3
import time
from typing import Any
from typing import Awaitable
Expand All @@ -20,6 +21,9 @@
import msgpack

from . import llmobs_query_parser
from .llmobs_persistence import init_llmobs_db
from .llmobs_persistence import load_all_spans
from .llmobs_persistence import upsert_spans

if TYPE_CHECKING:
from .agent import Agent
Expand Down Expand Up @@ -649,18 +653,35 @@ def build_event_platform_list_response(
class LLMObsEventPlatformAPI:
"""Handler for Event Platform API requests."""

def __init__(self, agent: "Agent"):
def __init__(
self,
agent: "Agent",
persist_llmobs_traces: bool = False,
persist_llmobs_db_path: Optional[str] = None,
):
self.agent = agent
self._query_results: Dict[str, Dict[str, Any]] = {}
self.decoded_llmobs_span_events: Dict[int, List[Dict[str, Any]]] = {}
self._claude_hooks_api: Optional["ClaudeHooksAPI"] = None
self._persist_conn: Optional[sqlite3.Connection] = None
self._persisted_spans: List[Dict[str, Any]] = []

if persist_llmobs_traces and persist_llmobs_db_path:
self._persist_conn = init_llmobs_db(persist_llmobs_db_path)
self._persisted_spans = load_all_spans(self._persist_conn)

def set_claude_hooks_api(self, api: "ClaudeHooksAPI") -> None:
"""Wire up the Claude hooks API so its spans appear in LLMObs queries."""
self._claude_hooks_api = api

def close(self) -> None:
"""Close the SQLite persistence connection, if open."""
if self._persist_conn is not None:
self._persist_conn.close()
self._persist_conn = None

def get_llmobs_spans(self, token: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get all LLMObs spans from stored requests."""
"""Get all LLMObs spans from stored requests and persisted DB."""
requests = self.agent._requests_by_session(token) if token else self.agent._requests
all_spans = []

Expand All @@ -674,14 +695,21 @@ def get_llmobs_spans(self, token: Optional[str] = None) -> List[Dict[str, Any]]:
events = decode_llmobs_payload(data, content_type)
spans = extract_spans_from_events(events)
self.decoded_llmobs_span_events[req_id] = spans
if self._persist_conn is not None and spans:
upsert_spans(self._persist_conn, spans)
else:
spans = self.decoded_llmobs_span_events[req_id]
all_spans.extend(spans)
except Exception as e:
log.warning(f"Failed to extract spans from request: {e}")

all_spans.extend(self._persisted_spans)

if self._claude_hooks_api:
all_spans.extend(self._claude_hooks_api._assembled_spans)
assembled = self._claude_hooks_api._assembled_spans
if self._persist_conn is not None and assembled:
upsert_spans(self._persist_conn, assembled)
all_spans.extend(assembled)

all_spans.sort(key=lambda s: s.get("start_ns", 0), reverse=True)
return all_spans
Expand All @@ -691,26 +719,28 @@ def update_spans(self, update_data: bytes, content_type: str) -> int:
events = decode_llmobs_payload(update_data, content_type)
update_span_list = extract_spans_from_events(events)

# Build index of all existing spans (stored requests + hooks assembled)
# Build index of all existing spans (stored requests + persisted + hooks)
# Use str(span_id) as key so int/str don't create duplicate entries
all_spans = self.get_llmobs_spans()
span_index = {s.get("span_id"): s for s in all_spans}
span_index = {}
for s in all_spans:
sid = s.get("span_id")
if sid is not None:
span_index[str(sid)] = s

updated = 0
updated_spans: List[Dict[str, Any]] = []
for update in update_span_list:
sid = update.get("span_id")
existing = span_index.get(sid)
existing = span_index.get(str(sid)) if sid is not None else None
if existing:
_deep_merge(update, existing)
updated_spans.append(existing)
updated += 1
if self._persist_conn is not None and updated_spans:
upsert_spans(self._persist_conn, updated_spans)
return updated

async def handle_llmobs_update(self, request: Request) -> web.Response:
"""Handle POST /evp_proxy/v2/api/v2/llmobs/update — update existing spans."""
data = await request.read()
content_type = request.content_type or ""
updated = self.update_spans(data, content_type)
return web.json_response({"updated": updated})

async def handle_logs_analytics_list(self, request: Request) -> web.Response:
"""Handle POST /api/unstable/llm-obs-query-rewriter/list endpoint."""
try:
Expand Down
89 changes: 89 additions & 0 deletions ddapm_test_agent/llmobs_persistence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""SQLite persistence for LLMObs spans (load on startup, append on decode)."""

import json
import logging
import sqlite3
import time
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

log = logging.getLogger(__name__)

DEFAULT_DB_PATH = Path.home() / ".ddapm-test-agent" / "llmobs.db"


def _ensure_db_dir(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)


def init_llmobs_db(db_path: Optional[str] = None) -> sqlite3.Connection:
"""Create or open SQLite DB and ensure llmobs_spans table exists."""
path = Path(db_path) if db_path else DEFAULT_DB_PATH
_ensure_db_dir(path)
conn = sqlite3.connect(str(path))
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS llmobs_spans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
span_id TEXT UNIQUE,
span_json TEXT NOT NULL,
created_at REAL
)
"""
)
conn.commit()
log.info("LLMObs persistence initialized db_path=%s", path.resolve())
return conn


def upsert_spans(conn: sqlite3.Connection, spans: List[Dict[str, Any]]) -> None:
"""Insert or update spans by span_id. Updates existing rows with merged content."""
if not spans:
return
now = time.time()
updated_count = 0
inserted_count = 0
cur = conn.cursor()
for s in spans:
span_id = s.get("span_id")
if span_id is None:
continue
span_id_str = str(span_id)
duration = s.get("duration")
span_json = json.dumps(s)
cur.execute(
"UPDATE llmobs_spans SET span_json = ?, created_at = ? WHERE span_id = ?",
(span_json, now, span_id_str),
)
if cur.rowcount == 0:
cur.execute(
"INSERT INTO llmobs_spans (span_id, span_json, created_at) VALUES (?, ?, ?)",
(span_id_str, span_json, now),
)
inserted_count += 1
log.debug("upsert span_id=%s INSERT duration=%s", span_id_str, duration)
else:
updated_count += 1
log.debug("upsert span_id=%s UPDATE duration=%s", span_id_str, duration)
conn.commit()
if updated_count or inserted_count:
log.info("upsert_spans total=%d updated=%d inserted=%d", len(spans), updated_count, inserted_count)


def load_all_spans(conn: sqlite3.Connection) -> List[Dict[str, Any]]:
"""Load all persisted spans from the DB (for startup)."""
cur = conn.cursor()
cur.execute("SELECT span_json FROM llmobs_spans ORDER BY id")
rows = cur.fetchall()
result: List[Dict[str, Any]] = []
for (span_json,) in rows:
try:
result.append(json.loads(span_json))
except (json.JSONDecodeError, TypeError):
continue
log.info("load_all_spans count=%d", len(result))
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
features:
- |
Adds new options ``--persist-llmobs-traces`` (``PERSIST_LLMOBS_TRACES``) and ``--persist-llmobs-db`` (``PERSIST_LLMOBS_DB``) that will persist LLM Observability events sent to the test agent locally to the given path (defaulting to ~/.ddapm-test-agent/llmobs.db).
- |
`lapdog start` and `lapdog claude` now enable the test agent with --persist-llmobs-traces
14 changes: 14 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ def disable_llmobs_data_forwarding() -> Generator[bool, None, None]:
yield True


@pytest.fixture
def persist_llmobs_traces() -> Generator[bool, None, None]:
yield False


@pytest.fixture
def persist_llmobs_db_path() -> Generator[Optional[str], None, None]:
yield None


@pytest.fixture
async def agent_app(
aiohttp_server,
Expand All @@ -175,6 +185,8 @@ async def agent_app(
dd_site,
dd_api_key,
disable_llmobs_data_forwarding,
persist_llmobs_traces,
persist_llmobs_db_path,
):
app = await aiohttp_server(
make_app(
Expand All @@ -197,6 +209,8 @@ async def agent_app(
dd_site=dd_site,
dd_api_key=dd_api_key,
disable_llmobs_data_forwarding=disable_llmobs_data_forwarding,
persist_llmobs_traces=persist_llmobs_traces,
persist_llmobs_db_path=persist_llmobs_db_path,
)
)
yield app
Expand Down
Loading
Loading