Skip to content

Commit 1030088

Browse files
committed
fix: harden session memory and drain safety
Tighten session-memory rewriting signals, improve warning fallbacks, dedupe repeated event text, and clamp unsafe drain heartbeat settings against the claim TTL.
1 parent 9fee54a commit 1030088

10 files changed

Lines changed: 244 additions & 29 deletions

README.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ This plugin uses a two-layer memory architecture:
7373
- Saves compaction summaries as episodes so knowledge survives across session
7474
boundaries
7575

76-
No Graphiti call ever blocks a hook return.
76+
Graphiti stays off the steady-state hook path for cached recall, but fresh
77+
long-term fact lookups can still run during session memory preparation.
7778

7879
## Prerequisites
7980

@@ -255,8 +256,9 @@ sources:
255256
- Cached long-term facts from Graphiti
256257

257258
These are composed into a `<session_memory>` envelope and staged for the
258-
transform hook. All reads are local (sub-ms); no Graphiti call is made at this
259-
stage.
259+
transform hook. The baseline reads are local (sub-ms), and Graphiti is only
260+
consulted here when the plugin asks for fresh long-term facts for the current
261+
request.
260262

261263
### User Message Injection (`experimental.chat.messages.transform`)
262264

@@ -275,8 +277,8 @@ that produced the cached memory. When Jaccard similarity on cached fact UUIDs
275277
drops below `driftThreshold` (default 0.5), a background cache refresh is
276278
scheduled via Graphiti. The current cached context is still injected
277279
immediately; the refreshed cache becomes available on the next message. This
278-
trades one message of staleness for keeping Graphiti entirely off the
279-
response-time path.
280+
trades one message of staleness for keeping most long-term memory refresh work
281+
off the response-time path.
280282

281283
### Event Extraction and Buffering (`event`)
282284

src/handlers/messages.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { assertEquals, assertStringIncludes } from "jsr:@std/assert@^1.0.0";
22
import { describe, it } from "jsr:@std/testing@^1.0.0/bdd";
3+
import { spy } from "jsr:@std/testing@^1.0.0/mock";
4+
import { logger } from "../services/logger.ts";
35
import { createMessagesHandler } from "./messages.ts";
46

57
describe("messages handler", () => {
@@ -322,6 +324,63 @@ describe("messages handler", () => {
322324
);
323325
});
324326

327+
it("reports rewroteExistingMemory when canonical or legacy blocks were scrubbed", async () => {
328+
const state = {
329+
isMain: true,
330+
visibleFactUuids: new Set<string>(),
331+
pendingInjection: {
332+
envelope:
333+
'<session_memory version="1"><last_request>next</last_request></session_memory>',
334+
factUuids: [],
335+
nodeRefs: [],
336+
refreshDecision: {
337+
classification: "aligned",
338+
shouldRefresh: false,
339+
similarity: 1,
340+
threshold: 0.5,
341+
cachedQuery: "next",
342+
},
343+
},
344+
};
345+
const infoSpy = spy(logger, "info");
346+
try {
347+
const handler = createMessagesHandler({
348+
sessionManager: {
349+
getState() {
350+
return state;
351+
},
352+
prepareInjection() {
353+
return state.pendingInjection;
354+
},
355+
} as never,
356+
});
357+
358+
const output = {
359+
messages: [{
360+
info: { role: "user", sessionID: "session-1" },
361+
parts: [{
362+
type: "text",
363+
text: '<memory data-uuids="fact-legacy-1"></memory>\n\nnext',
364+
}],
365+
}],
366+
};
367+
368+
await handler({}, output as never);
369+
370+
const call = infoSpy.calls.find((entry) =>
371+
entry.args[0] === "Injected canonical session_memory block"
372+
);
373+
assertEquals(Boolean(call), true);
374+
assertEquals(
375+
(call?.args[1] as { rewroteExistingMemory: boolean })
376+
.rewroteExistingMemory,
377+
true,
378+
);
379+
} finally {
380+
infoSpy.restore();
381+
}
382+
});
383+
325384
it("passes current-turn visible fact uuids into prepareInjection", async () => {
326385
const state = {
327386
isMain: true,

src/handlers/messages.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,17 @@ export function createMessagesHandler(deps: MessagesHandlerDeps) {
7979
if (!state?.isMain) return;
8080

8181
const allVisibleUuids = new Set<string>();
82+
let rewroteExistingMemory = false;
8283
for (const entry of output.messages) {
8384
for (const part of entry.parts) {
8485
if (isTextPart(part)) {
86+
if (
87+
!rewroteExistingMemory &&
88+
(part.text.includes("<session_memory") ||
89+
part.text.includes("<memory"))
90+
) {
91+
rewroteExistingMemory = true;
92+
}
8593
for (const uuid of extractVisibleUuids(part.text)) {
8694
allVisibleUuids.add(uuid);
8795
}
@@ -106,8 +114,6 @@ export function createMessagesHandler(deps: MessagesHandlerDeps) {
106114

107115
const textPart = lastUserEntry.parts.find(isTextPart);
108116
if (!textPart) return;
109-
const hadCanonical = textPart.text.includes("<session_memory");
110-
const hadLegacy = textPart.text.includes("<memory");
111117
const effectiveUserText = sanitizeMemoryInput(
112118
stripInjectedMemoryBlocks(textPart.text),
113119
);
@@ -121,7 +127,7 @@ export function createMessagesHandler(deps: MessagesHandlerDeps) {
121127
logger.info("Injected canonical session_memory block", {
122128
sessionID,
123129
factCount: prepared.factUuids.length,
124-
rewroteExistingMemory: hadCanonical || hadLegacy,
130+
rewroteExistingMemory,
125131
});
126132
if (state.pendingInjection === prepared) {
127133
state.pendingInjection = undefined;

src/services/batch-drain.test.ts

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { assertEquals } from "jsr:@std/assert@^1.0.0";
22
import { describe, it } from "jsr:@std/testing@^1.0.0/bdd";
3+
import { spy } from "jsr:@std/testing@^1.0.0/mock";
4+
import { logger } from "./logger.ts";
35
import { BatchDrainService } from "./batch-drain.ts";
46
import { createSessionEvent } from "./event-extractor.ts";
57
import { RedisClient } from "./redis-client.ts";
@@ -13,22 +15,69 @@ import {
1315
RedisEventsService,
1416
} from "./redis-events.ts";
1517

16-
const createDeps = () => {
18+
const createDeps = (options?: {
19+
events?: { claimLockTtlSeconds?: number };
20+
drain?: { claimHeartbeatIntervalMs?: number | null };
21+
}) => {
1722
const redis = new RedisClient({ endpoint: "redis://unused" });
1823
const events = new RedisEventsService(redis, {
1924
sessionTtlSeconds: 60,
20-
claimLockTtlSeconds: 1,
25+
claimLockTtlSeconds: options?.events?.claimLockTtlSeconds ?? 1,
2126
});
22-
const drain = new BatchDrainService(redis, events, {
27+
const drainOptions = {
2328
batchSize: 2,
2429
batchMaxBytes: 20_000,
2530
drainRetryMax: 2,
26-
claimHeartbeatIntervalMs: 100,
27-
});
31+
};
32+
const heartbeatIntervalMs = options?.drain?.claimHeartbeatIntervalMs;
33+
const drain = new BatchDrainService(
34+
redis,
35+
events,
36+
heartbeatIntervalMs === null ? drainOptions : {
37+
...drainOptions,
38+
claimHeartbeatIntervalMs: heartbeatIntervalMs ?? 100,
39+
},
40+
);
2841
return { redis, events, drain };
2942
};
3043

3144
describe("batch drain", () => {
45+
it("uses a sub-TTL default heartbeat when the claim TTL is small", () => {
46+
const { drain } = createDeps({
47+
events: { claimLockTtlSeconds: 0.2 },
48+
drain: { claimHeartbeatIntervalMs: null },
49+
});
50+
51+
const heartbeatIntervalMs = (drain as unknown as {
52+
getClaimHeartbeatIntervalMs: (ttl: number) => number;
53+
}).getClaimHeartbeatIntervalMs(1);
54+
55+
assertEquals(heartbeatIntervalMs, 333);
56+
});
57+
58+
it("warns and clamps an explicit heartbeat interval that exceeds the claim TTL budget", () => {
59+
const warnSpy = spy(logger, "warn");
60+
try {
61+
const { drain } = createDeps({
62+
events: { claimLockTtlSeconds: 1 },
63+
drain: { claimHeartbeatIntervalMs: 1_500 },
64+
});
65+
66+
const heartbeatIntervalMs = (drain as unknown as {
67+
getClaimHeartbeatIntervalMs: (ttl: number) => number;
68+
}).getClaimHeartbeatIntervalMs(1);
69+
70+
assertEquals(heartbeatIntervalMs, 500);
71+
assertEquals(warnSpy.calls.length, 1);
72+
assertEquals(
73+
warnSpy.calls[0].args[0],
74+
"Clamped drain heartbeat interval to stay below claim TTL",
75+
);
76+
} finally {
77+
warnSpy.restore();
78+
}
79+
});
80+
3281
it("claims oldest events, drains them FIFO, and leaves newer items pending", async () => {
3382
const { redis, events, drain } = createDeps();
3483
const added: string[] = [];

src/services/batch-drain.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,23 @@ export class BatchDrainService {
8282
) {}
8383

8484
private getClaimHeartbeatIntervalMs(lockTtlSeconds: number): number {
85-
return this.options.claimHeartbeatIntervalMs ??
86-
Math.max(1_000, Math.floor((lockTtlSeconds * 1000) / 3));
85+
const ttlMs = Math.max(1_000, Math.floor(lockTtlSeconds * 1000));
86+
const defaultIntervalMs = Math.max(250, Math.floor(ttlMs / 3));
87+
const configuredIntervalMs = this.options.claimHeartbeatIntervalMs;
88+
const requestedIntervalMs = configuredIntervalMs ?? defaultIntervalMs;
89+
const maxSafeIntervalMs = Math.max(250, Math.floor(ttlMs / 2));
90+
91+
if (requestedIntervalMs <= maxSafeIntervalMs) {
92+
return requestedIntervalMs;
93+
}
94+
95+
logger.warn("Clamped drain heartbeat interval to stay below claim TTL", {
96+
claimLockTtlSeconds: lockTtlSeconds,
97+
requestedHeartbeatIntervalMs: requestedIntervalMs,
98+
effectiveHeartbeatIntervalMs: maxSafeIntervalMs,
99+
configuredHeartbeatIntervalMs: configuredIntervalMs,
100+
});
101+
return maxSafeIntervalMs;
87102
}
88103

89104
private async getRetryState(

src/services/event-extractor.test.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,24 @@ describe("event-extractor", () => {
115115
);
116116
});
117117

118+
it("dedupes repeated detail fragments in compactParts-backed task updates", () => {
119+
const [event] = extractStructuredEvents({
120+
eventType: "task.updated",
121+
properties: {
122+
task: {
123+
id: "t1",
124+
summary:
125+
"yes, keep the review-refine loop until no more issues are found.",
126+
},
127+
},
128+
});
129+
130+
assertEquals(
131+
event.detail,
132+
"Task update — yes, keep the review-refine loop until no more issues are found.",
133+
);
134+
});
135+
118136
it("extracts rules, environment, and subagent signals while filtering assistant operational blocker chatter", () => {
119137
const rules = extractStructuredEvents({
120138
eventType: "rules.loaded",

src/services/event-extractor.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,10 +286,17 @@ const hasKeyword = (
286286
const compactParts = (
287287
...parts: Array<string | undefined>
288288
): string | undefined => {
289-
const compact = parts
290-
.map((part) => part ? normalizeWhitespace(part) : "")
291-
.filter(Boolean)
292-
.join(" — ");
289+
const fragments: string[] = [];
290+
for (const part of parts) {
291+
const value = part ? normalizeWhitespace(part) : "";
292+
if (!value) continue;
293+
const normalized = value.toLowerCase();
294+
if (fragments.some((fragment) => fragment.toLowerCase() === normalized)) {
295+
continue;
296+
}
297+
fragments.push(value);
298+
}
299+
const compact = fragments.join(" — ");
293300
return compact || undefined;
294301
};
295302

src/services/logger.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,32 @@ describe("logger", () => {
139139
}]);
140140
});
141141

142+
it("falls back to console.warn when structured warn logging rejects later", async () => {
143+
const scheduledTasks: Array<() => void> = [];
144+
setWarningTaskScheduler((callback) => {
145+
scheduledTasks.push(callback);
146+
});
147+
setOpenCodeClient({
148+
app: {
149+
log: () => Promise.reject(new Error("structured warn failed")),
150+
},
151+
});
152+
153+
const { logger } = await import("./logger.ts");
154+
logger.warn("warning", { code: 42 });
155+
156+
assertEquals(consoleWarnSpy.calls.length, 0);
157+
assertEquals(scheduledTasks.length, 1);
158+
for (const task of scheduledTasks) task();
159+
await Promise.resolve();
160+
assertEquals(consoleWarnSpy.calls.length, 1);
161+
assertEquals(consoleWarnSpy.calls[0].args[0], "[graphiti]");
162+
assertEquals(consoleWarnSpy.calls[0].args[1], "warning");
163+
assertEquals(consoleWarnSpy.calls[0].args[2], {
164+
data: [{ code: 42 }],
165+
});
166+
});
167+
142168
it("should forward multiple arguments to error", async () => {
143169
const { logger } = await import("./logger.ts");
144170
const error = new Error("test");

src/services/opencode-warning.ts

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,23 @@ export const logStructuredWarning = (
7878
const client = getClient();
7979
if (!client?.app?.log) return false;
8080

81-
runSafely(() =>
82-
client.app!.log({
83-
body: {
84-
service: "graphiti",
85-
level: "warn",
86-
message,
87-
...(extra === undefined ? {} : { extra: asRecord(extra) }),
88-
},
89-
})
81+
runSafely(
82+
() =>
83+
client.app!.log({
84+
body: {
85+
service: "graphiti",
86+
level: "warn",
87+
message,
88+
...(extra === undefined ? {} : { extra: asRecord(extra) }),
89+
},
90+
}),
91+
(error) => {
92+
if (extra === undefined) {
93+
console.warn(PREFIX, message, error);
94+
return;
95+
}
96+
console.warn(PREFIX, message, extra, error);
97+
},
9098
);
9199
return true;
92100
};

0 commit comments

Comments
 (0)