-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathllmobs_persistence.py
More file actions
89 lines (78 loc) · 2.85 KB
/
llmobs_persistence.py
File metadata and controls
89 lines (78 loc) · 2.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""SQLite persistence for LLMObs spans (load on startup, append on decode)."""
import json
import logging
import sqlite3
import time
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
log = logging.getLogger(__name__)
DEFAULT_DB_PATH = Path.home() / ".ddapm-test-agent" / "llmobs.db"
def _ensure_db_dir(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
def init_llmobs_db(db_path: Optional[str] = None) -> sqlite3.Connection:
"""Create or open SQLite DB and ensure llmobs_spans table exists."""
path = Path(db_path) if db_path else DEFAULT_DB_PATH
_ensure_db_dir(path)
conn = sqlite3.connect(str(path))
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS llmobs_spans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
span_id TEXT UNIQUE,
span_json TEXT NOT NULL,
created_at REAL
)
"""
)
conn.commit()
log.info("LLMObs persistence initialized db_path=%s", path.resolve())
return conn
def upsert_spans(conn: sqlite3.Connection, spans: List[Dict[str, Any]]) -> None:
"""Insert or update spans by span_id. Updates existing rows with merged content."""
if not spans:
return
now = time.time()
updated_count = 0
inserted_count = 0
cur = conn.cursor()
for s in spans:
span_id = s.get("span_id")
if span_id is None:
continue
span_id_str = str(span_id)
duration = s.get("duration")
span_json = json.dumps(s)
cur.execute(
"UPDATE llmobs_spans SET span_json = ?, created_at = ? WHERE span_id = ?",
(span_json, now, span_id_str),
)
if cur.rowcount == 0:
cur.execute(
"INSERT INTO llmobs_spans (span_id, span_json, created_at) VALUES (?, ?, ?)",
(span_id_str, span_json, now),
)
inserted_count += 1
log.debug("upsert span_id=%s INSERT duration=%s", span_id_str, duration)
else:
updated_count += 1
log.debug("upsert span_id=%s UPDATE duration=%s", span_id_str, duration)
conn.commit()
if updated_count or inserted_count:
log.info("upsert_spans total=%d updated=%d inserted=%d", len(spans), updated_count, inserted_count)
def load_all_spans(conn: sqlite3.Connection) -> List[Dict[str, Any]]:
"""Load all persisted spans from the DB (for startup)."""
cur = conn.cursor()
cur.execute("SELECT span_json FROM llmobs_spans ORDER BY id")
rows = cur.fetchall()
result: List[Dict[str, Any]] = []
for (span_json,) in rows:
try:
result.append(json.loads(span_json))
except (json.JSONDecodeError, TypeError):
continue
log.info("load_all_spans count=%d", len(result))
return result