Skip to content

Commit c151aa4

Browse files
nishenghao.nshclaude
andcommitted
feat(memory_case): vector storage with lazy init, partial upsert, and auto backfill
- LazyCandidateCaseVectorIndex: defer ChromaDB creation until first tool call, so WorkerManagerFactory has time to register during startup - Partial field update in MemoryCaseDao.upsert: only overwrite fields with meaningful values, merge metadata_json to prevent context loss - Best-effort vector upsert: ChromaDB failure does not block MySQL write - Lazy backfill during search: DB-only results get auto-reindexed - Fix similar_search_with_scores() parameter names (query→text, +score_threshold) - Tool description: clarify scope is for routing isolation, not business context - 14 new tests covering DAO upsert, vector search, scope filtering, stress, lazy init Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent a459934 commit c151aa4

5 files changed

Lines changed: 628 additions & 21 deletions

File tree

packages/derisk-ext/src/derisk_ext/plugin/memory_case/service.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,11 @@ def list_tools(self) -> List[MemoryToolSpec]:
7272
"scope": {
7373
"type": "object",
7474
"description": (
75-
"Optional narrowing on metadata.case_context only (no DB columns): "
76-
"app_code, environment, tenant_id, team_id. "
77-
"app_code/environment omitted or 'default' → no filter on that key."
75+
"Routing isolation ONLY (app_code/tenant_id/team_id for "
76+
"multi-tenant; environment for deploy env prod/staging). "
77+
"Cloud-vendor or region info belongs in case metadata "
78+
"(region/tags), NOT in scope. "
79+
"Omit or set to 'default' for wildcard (recommended)."
7880
),
7981
},
8082
"query": {
@@ -236,6 +238,13 @@ async def _search(self, args: Dict[str, Any]) -> Dict[str, Any]:
236238
match = self._dao.get_by_case_id(case_id)
237239
if match:
238240
case_by_id[case_id] = match
241+
# Lazy backfill: DB hits missing from vector index get reindexed
242+
for case in lexical_cases:
243+
if case.case_id not in set(semantic_case_ids):
244+
try:
245+
await self._vector_index.upsert(case)
246+
except Exception:
247+
pass
239248
ordered_cases = sorted(
240249
case_by_id.values(),
241250
key=lambda item: (item.confidence, item.updated_at or datetime.min),
@@ -275,7 +284,14 @@ async def _upsert(self, args: Dict[str, Any]) -> Dict[str, Any]:
275284
if not case.markdown_summary:
276285
case.markdown_summary = render_case_markdown(case)
277286
saved = self._dao.upsert(case)
278-
await self._vector_index.upsert(saved)
287+
try:
288+
await self._vector_index.upsert(saved)
289+
except Exception:
290+
logger.warning(
291+
"memory_case vector upsert failed for %s, will retry later",
292+
saved.case_id,
293+
exc_info=True,
294+
)
279295
return {"code": "OK", "case": saved.model_dump(mode="json")}
280296

281297
async def _feedback(self, args: Dict[str, Any]) -> Dict[str, Any]:

packages/derisk-ext/src/derisk_ext/plugin/memory_case/sqlalchemy_dao.py

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from derisk.storage.metadata import BaseDao, Model
1414

1515
from .case_context import (
16+
CASE_CONTEXT_KEY,
1617
FULLTEXT_LEXICAL_COLUMNS,
1718
is_memory_search_scope_app_wildcard,
1819
is_memory_search_scope_env_wildcard,
@@ -192,21 +193,52 @@ def upsert(self, case: CandidateCase) -> CandidateCase:
192193
if entity is None:
193194
entity = MemoryCaseEntity(case_id=case.case_id)
194195
session.add(entity)
195-
entity.fingerprint = case.fingerprint
196-
entity.incident_title = case.incident_title or None
197-
entity.symptom_summary = case.symptom_summary
198-
entity.hypotheses = json.dumps(case.hypotheses, ensure_ascii=False)
199-
entity.actions = json.dumps(case.actions, ensure_ascii=False)
200-
entity.resolution = case.resolution
201-
entity.handling_path = case.handling_path or None
202-
entity.root_cause = case.root_cause or None
203-
entity.effectiveness = case.effectiveness
204-
entity.confidence = case.confidence
205-
entity.lifecycle = case.lifecycle.value
206-
entity.source_conv_id = case.source_conv_id
207-
entity.source_session_id = case.source_session_id
208-
entity.markdown_summary = case.markdown_summary
209-
entity.metadata_json = json.dumps(case.metadata or {}, ensure_ascii=False)
196+
entity.fingerprint = case.fingerprint
197+
entity.incident_title = case.incident_title or None
198+
entity.symptom_summary = case.symptom_summary
199+
entity.hypotheses = json.dumps(case.hypotheses, ensure_ascii=False)
200+
entity.actions = json.dumps(case.actions, ensure_ascii=False)
201+
entity.resolution = case.resolution
202+
entity.handling_path = case.handling_path or None
203+
entity.root_cause = case.root_cause or None
204+
entity.effectiveness = case.effectiveness
205+
entity.confidence = case.confidence
206+
entity.lifecycle = case.lifecycle.value
207+
entity.source_conv_id = case.source_conv_id
208+
entity.source_session_id = case.source_session_id
209+
entity.markdown_summary = case.markdown_summary
210+
entity.metadata_json = json.dumps(case.metadata or {}, ensure_ascii=False)
211+
else:
212+
entity.fingerprint = case.fingerprint
213+
if case.incident_title:
214+
entity.incident_title = case.incident_title
215+
if case.symptom_summary:
216+
entity.symptom_summary = case.symptom_summary
217+
if case.hypotheses:
218+
entity.hypotheses = json.dumps(case.hypotheses, ensure_ascii=False)
219+
if case.actions:
220+
entity.actions = json.dumps(case.actions, ensure_ascii=False)
221+
if case.resolution:
222+
entity.resolution = case.resolution
223+
if case.handling_path:
224+
entity.handling_path = case.handling_path
225+
if case.root_cause:
226+
entity.root_cause = case.root_cause
227+
if case.effectiveness:
228+
entity.effectiveness = case.effectiveness
229+
if case.markdown_summary:
230+
entity.markdown_summary = case.markdown_summary
231+
entity.confidence = case.confidence
232+
entity.lifecycle = case.lifecycle.value
233+
entity.source_conv_id = case.source_conv_id
234+
entity.source_session_id = case.source_session_id
235+
existing_meta = json.loads(entity.metadata_json) if entity.metadata_json else {}
236+
for k, v in (case.metadata or {}).items():
237+
if k == CASE_CONTEXT_KEY and isinstance(v, dict) and isinstance(existing_meta.get(k), dict):
238+
existing_meta[k].update(v)
239+
else:
240+
existing_meta[k] = v
241+
entity.metadata_json = json.dumps(existing_meta, ensure_ascii=False)
210242
session.commit()
211243
session.refresh(entity)
212244
return self.to_model(entity)
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
"""Test MemoryCaseDao.upsert() with a real SQLite database — verify that inserting
2+
new records does NOT delete existing ones.
3+
"""
4+
5+
import pytest
6+
from sqlalchemy import create_engine
7+
8+
from derisk.storage.metadata.db_manager import db as global_db
9+
from derisk.storage.metadata import Model as GlobalModel
10+
11+
from derisk_ext.plugin.memory_case.models import CandidateCase, CandidateCaseLifecycle
12+
from derisk_ext.plugin.memory_case.sqlalchemy_dao import MemoryCaseDao, MemoryCaseEntity
13+
14+
15+
@pytest.fixture
16+
def dao():
17+
"""Create a MemoryCaseDao backed by a throwaway SQLite in-memory database.
18+
19+
Since MemoryCaseEntity inherits from the global db.Model, we must
20+
temporarily repoint the global db to a test engine and create tables there.
21+
"""
22+
old_engine = global_db._engine
23+
old_session = global_db._session
24+
25+
test_engine = create_engine("sqlite:///:memory:")
26+
global_db._engine = test_engine
27+
global_db._session = None # force re-init below
28+
29+
from sqlalchemy.orm import sessionmaker, Session
30+
from derisk.storage.metadata.db_manager import BaseQuery
31+
session_factory = sessionmaker(
32+
bind=test_engine, class_=Session, query_cls=BaseQuery
33+
)
34+
global_db._session = session_factory
35+
36+
# Create the table on the test engine
37+
GlobalModel.metadata.create_all(test_engine)
38+
39+
try:
40+
yield MemoryCaseDao()
41+
finally:
42+
global_db._engine = old_engine
43+
global_db._session = old_session
44+
test_engine.dispose()
45+
46+
47+
def _make_case(case_id: str, symptom: str, confidence: float = 0.5) -> CandidateCase:
48+
return CandidateCase(
49+
case_id=case_id,
50+
symptom_summary=symptom,
51+
confidence=confidence,
52+
)
53+
54+
55+
def test_upsert_five_records_keeps_previous_four(dao):
56+
# --- insert 4 records ---
57+
for i in range(1, 5):
58+
case = _make_case(case_id=f"case-{i}", symptom=f"issue #{i}", confidence=0.6)
59+
saved = dao.upsert(case)
60+
assert saved.case_id == f"case-{i}"
61+
62+
# verify 4 exist
63+
assert dao.get_by_case_id("case-1") is not None
64+
assert dao.get_by_case_id("case-2") is not None
65+
assert dao.get_by_case_id("case-3") is not None
66+
assert dao.get_by_case_id("case-4") is not None
67+
68+
# --- insert 5th record ---
69+
case5 = _make_case(case_id="case-5", symptom="new issue #5", confidence=0.7)
70+
dao.upsert(case5)
71+
72+
# verify all 5 still exist (old 4 are NOT deleted)
73+
for i in range(1, 6):
74+
found = dao.get_by_case_id(f"case-{i}")
75+
assert found is not None, f"case-{i} should still exist after inserting case-5"
76+
77+
# also spot-check field values of an old record
78+
c1 = dao.get_by_case_id("case-1")
79+
assert c1.symptom_summary == "issue #1"
80+
assert c1.confidence == 0.6
81+
82+
83+
def test_upsert_existing_record_does_not_delete_others(dao):
84+
# same as above but simulate a merge (same case_id)
85+
for i in range(1, 4):
86+
dao.upsert(_make_case(case_id=f"case-{i}", symptom=f"old #{i}"))
87+
88+
# merge case-2: update resolution only
89+
merged = CandidateCase(
90+
case_id="case-2",
91+
symptom_summary="", # should NOT overwrite
92+
resolution="merged resolution",
93+
confidence=0.8,
94+
)
95+
dao.upsert(merged)
96+
97+
# old records still exist
98+
assert dao.get_by_case_id("case-1") is not None
99+
assert dao.get_by_case_id("case-3") is not None
100+
101+
# merged record: symptom preserved, resolution updated
102+
c2 = dao.get_by_case_id("case-2")
103+
assert c2.symptom_summary == "old #2", "existing symptom should be preserved"
104+
assert c2.resolution == "merged resolution", "new resolution should be set"
105+
assert c2.confidence == 0.8, "confidence should be updated"
106+
107+
108+
def test_upsert_concurrent_like_scenario(dao):
109+
"""Simulate the Agent writing many cases in rapid succession with different metadata."""
110+
import json
111+
112+
base_data = [
113+
{"case_id": "case-a", "symptom": "CPU 飙升", "metadata": {"case_context": {"app_code": "app1", "environment": "prod"}}},
114+
{"case_id": "case-b", "symptom": "OOM Kill", "metadata": {"case_context": {"app_code": "app1", "environment": "prod"}}},
115+
{"case_id": "case-c", "symptom": "Disk full", "metadata": {"case_context": {"app_code": "app2", "environment": "prod"}}},
116+
{"case_id": "case-d", "symptom": "Latency spike", "metadata": {"case_context": {"app_code": "app1", "environment": "staging"}}},
117+
]
118+
119+
for item in base_data:
120+
case = CandidateCase(**item)
121+
dao.upsert(case)
122+
123+
# verify 4 records
124+
results = dao.search(scope={"app_code": "default"}, limit=50)
125+
assert len(results) == 4, f"Expected 4 records before new insert, got {len(results)}"
126+
127+
# insert 5th
128+
new_case = CandidateCase(
129+
case_id="case-e",
130+
symptom_summary="Network timeout",
131+
metadata={"case_context": {"app_code": "app1", "environment": "prod"}},
132+
)
133+
dao.upsert(new_case)
134+
135+
# verify all 5 still exist
136+
results_after = dao.search(scope={"app_code": "default"}, limit=50)
137+
assert len(results_after) == 5, (
138+
f"Expected 5 records after insert, got {len(results_after)}. "
139+
f"Found case_ids: {[r.case_id for r in results_after]}"
140+
)
141+
142+
# verify individual lookup
143+
for cid in ["case-a", "case-b", "case-c", "case-d", "case-e"]:
144+
assert dao.get_by_case_id(cid) is not None, f"{cid} should exist"
145+
146+
147+
def test_stress_many_upserts_never_delete_old_records(dao):
148+
"""Stress-test: insert 20 records one by one, each time verify all previous exist."""
149+
ids_inserted = []
150+
for i in range(20):
151+
cid = f"stress-{i}"
152+
dao.upsert(_make_case(case_id=cid, symptom=f"stress symptom {i}"))
153+
ids_inserted.append(cid)
154+
# verify all previously inserted still exist
155+
for prev_cid in ids_inserted:
156+
assert dao.get_by_case_id(prev_cid) is not None, (
157+
f"{prev_cid} disappeared after inserting {cid}"
158+
)
159+
# final verification: all 20 exist
160+
results = dao.search(scope={"app_code": "default"}, limit=100)
161+
assert len(results) == 20, f"Expected 20, got {len(results)}"
162+
163+
164+
def test_full_service_upsert_flow_with_real_dao(dao):
165+
"""Exercise the complete service._upsert → dao path (no vector store)."""
166+
from derisk_ext.plugin.memory_case.service import MemoryCasePluginService
167+
168+
class _DummySystemApp:
169+
config = {}
170+
171+
class _FakeVector:
172+
async def upsert(self, case): pass
173+
async def search(self, query, scope, top_k): return []
174+
async def invalidate(self, case_id): pass
175+
176+
service = MemoryCasePluginService(
177+
system_app=_DummySystemApp(),
178+
dao=dao,
179+
vector_index=_FakeVector(),
180+
)
181+
182+
# Write 4 cases through the service (same path the Agent uses)
183+
import asyncio
184+
for i in range(1, 5):
185+
result = asyncio.get_event_loop().run_until_complete(
186+
service.call_tool("memory_case_upsert", {
187+
"case": {
188+
"case_id": f"svc-{i}",
189+
"symptom_summary": f"service issue {i}",
190+
"resolution": f"fix {i}",
191+
"metadata": {"case_context": {"app_code": "demo", "environment": "prod"}},
192+
}
193+
})
194+
)
195+
assert result["code"] == "OK"
196+
assert result["case"]["case_id"] == f"svc-{i}"
197+
198+
# Verify 4 exist
199+
results = dao.search(scope={"app_code": "default"}, limit=50)
200+
assert len(results) == 4, f"Expected 4, got {len(results)}"
201+
202+
# Write 5th
203+
result = asyncio.get_event_loop().run_until_complete(
204+
service.call_tool("memory_case_upsert", {
205+
"case": {
206+
"case_id": "svc-5",
207+
"symptom_summary": "new service issue",
208+
"metadata": {"case_context": {"app_code": "demo", "environment": "prod"}},
209+
}
210+
})
211+
)
212+
assert result["code"] == "OK"
213+
214+
# Verify ALL 5 still exist
215+
results_after = dao.search(scope={"app_code": "default"}, limit=50)
216+
assert len(results_after) == 5, (
217+
f"CRITICAL: Expected 5 records, got {len(results_after)}. "
218+
f"Found: {[r.case_id for r in results_after]}"
219+
)
220+
for i in range(1, 6):
221+
found = dao.get_by_case_id(f"svc-{i}")
222+
assert found is not None, f"svc-{i} disappeared after 5th insert"
223+
224+
# Spot-check: old record field values preserved
225+
c1 = dao.get_by_case_id("svc-1")
226+
assert c1.symptom_summary == "service issue 1"
227+
assert c1.resolution == "fix 1"
228+
229+
230+
def test_search_with_narrow_scope_does_not_mistake_scoping_for_deletion(dao):
231+
"""Scope filters narrow results — this is expected, not data loss."""
232+
cases = [
233+
CandidateCase(case_id="c1", symptom_summary="A", metadata={"case_context": {"app_code": "team-x"}}),
234+
CandidateCase(case_id="c2", symptom_summary="B", metadata={"case_context": {"app_code": "team-x"}}),
235+
CandidateCase(case_id="c3", symptom_summary="C", metadata={"case_context": {"app_code": "team-y"}}),
236+
]
237+
for c in cases:
238+
dao.upsert(c)
239+
240+
# search with team-x scope → returns 2
241+
r_x = dao.search(scope={"app_code": "team-x"}, limit=10)
242+
assert len(r_x) == 2, f"team-x should see 2 cases, got {len(r_x)}"
243+
244+
# search with team-y scope → returns 1
245+
r_y = dao.search(scope={"app_code": "team-y"}, limit=10)
246+
assert len(r_y) == 1
247+
248+
# search with default scope → returns ALL 3
249+
r_all = dao.search(scope={"app_code": "default"}, limit=10)
250+
assert len(r_all) == 3, f"default scope should see all 3, got {len(r_all)}"

0 commit comments

Comments
 (0)