Skip to content

Commit 0d24c7d

Browse files
committed
fix: address latest review follow-ups
1 parent d50c88a commit 0d24c7d

7 files changed

Lines changed: 148 additions & 17 deletions

src/services/batch-drain.test.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,37 @@ describe("batch drain", () => {
559559
assertEquals(await redis.getListLength(drainPendingKey("group-1")), 0);
560560
});
561561

562+
it("clears parsed but invalid retry state before retrying a batch", async () => {
563+
const invalidStates = [
564+
{ attempts: -1, nextAttemptAt: 0 },
565+
{ attempts: 1, nextAttemptAt: "later" },
566+
];
567+
568+
for (const invalidState of invalidStates) {
569+
const { redis, events, drain } = await createDeps();
570+
const event = createSessionEvent("message", "user", {
571+
summary: "recover invalid retry state",
572+
body: "recover invalid retry state",
573+
});
574+
await events.recordEvent("session-1", "group-1", event);
575+
576+
const retryKey = drainRetryKey("group-1", `${event.id}:${event.id}`);
577+
await redis.setString(retryKey, JSON.stringify(invalidState), 60);
578+
579+
let calls = 0;
580+
const result = await drain.drainGroup("group-1", {
581+
addMemory() {
582+
calls += 1;
583+
},
584+
} as never);
585+
586+
assertEquals(result, { status: "success", drained: 1 });
587+
assertEquals(calls, 1);
588+
assertEquals(await redis.getString(retryKey), null);
589+
assertEquals(await redis.getListLength(drainPendingKey("group-1")), 0);
590+
}
591+
});
592+
562593
it("reports only successfully ingested events when a batch dead-letters mid-batch", async () => {
563594
const { redis, events, drain } = await createDeps({
564595
drain: { batchSize: 2 },

src/services/batch-drain.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@ export interface BatchDrainServiceOptions {
2323

2424
type RetryState = { attempts: number; nextAttemptAt: number };
2525

26+
const isValidRetryState = (value: unknown): value is RetryState => {
27+
if (!value || typeof value !== "object") return false;
28+
const state = value as Partial<RetryState>;
29+
return typeof state.attempts === "number" &&
30+
Number.isFinite(state.attempts) &&
31+
state.attempts >= 0 &&
32+
typeof state.nextAttemptAt === "number" &&
33+
Number.isFinite(state.nextAttemptAt);
34+
};
35+
2636
class DrainClaimLostError extends Error {
2737
constructor() {
2838
super("Drain claim lease lost during batch processing");
@@ -150,7 +160,14 @@ export class BatchDrainService {
150160
const raw = await this.redis.getString(key);
151161
if (!raw) return null;
152162
try {
153-
return JSON.parse(raw) as RetryState;
163+
const parsed = JSON.parse(raw);
164+
if (isValidRetryState(parsed)) return parsed;
165+
await this.redis.deleteKey(key);
166+
logger.warn("Cleared invalid drain retry state", {
167+
groupId,
168+
batchKey,
169+
});
170+
return null;
154171
} catch {
155172
await this.redis.deleteKey(key);
156173
logger.warn("Cleared corrupted drain retry state", {

src/services/connection-manager.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -924,7 +924,7 @@ describe("connection manager", () => {
924924
}),
925925
}),
926926
Error,
927-
"Invalid URL",
927+
'Invalid Graphiti endpoint: "not a valid url"',
928928
);
929929
});
930930

src/services/connection-manager.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,18 @@ const validateEndpoint = (endpoint: string): string => {
120120
if (!normalized) {
121121
throw new Error("Graphiti endpoint must not be empty");
122122
}
123-
new URL(normalized);
123+
124+
try {
125+
new URL(normalized);
126+
} catch (cause) {
127+
throw new Error(
128+
`Invalid Graphiti endpoint: ${JSON.stringify(normalized)}`,
129+
{
130+
cause,
131+
},
132+
);
133+
}
134+
124135
return normalized;
125136
};
126137

src/services/graphiti-async.test.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,77 @@ describe("GraphitiAsyncService", () => {
248248
assertEquals(timers.clearedTimers.length, 1);
249249
});
250250

251+
it("preserves fact-only cache refreshes when node search degrades", async () => {
252+
const cacheSets: Array<{
253+
query: string;
254+
nodes: Array<{ uuid: string; name: string; summary: string }>;
255+
episodeSummaries?: string[];
256+
nodeRefs: string[];
257+
}> = [];
258+
259+
const service = new GraphitiAsyncService(
260+
{
261+
getEpisodes() {
262+
return Promise.resolve([]);
263+
},
264+
searchMemoryFacts() {
265+
return Promise.resolve([
266+
{
267+
fact: "fact:alpha",
268+
source_node: { name: "Source" },
269+
target_node: { name: "alpha" },
270+
},
271+
]);
272+
},
273+
searchNodesWithStatus() {
274+
return Promise.resolve({
275+
nodes: [{ uuid: "node:alpha", name: "alpha", summary: "unused" }],
276+
degraded: true,
277+
});
278+
},
279+
} as never,
280+
{
281+
get() {
282+
return Promise.resolve(null);
283+
},
284+
getMeta() {
285+
return Promise.resolve({ lastQuery: "alpha" });
286+
},
287+
rememberRefreshQuery() {
288+
return Promise.resolve();
289+
},
290+
set(
291+
_groupId: string,
292+
entry: {
293+
query: string;
294+
refreshedAt: number;
295+
nodes: Array<{ uuid: string; name: string; summary: string }>;
296+
episodeSummaries?: string[];
297+
nodeRefs: string[];
298+
},
299+
) {
300+
cacheSets.push(entry);
301+
return Promise.resolve();
302+
},
303+
} as never,
304+
{
305+
drainGroup: () => Promise.resolve({ status: "idle" as const }),
306+
} as never,
307+
);
308+
309+
service.scheduleCacheRefresh("group-1", "alpha");
310+
await flushMicrotasks();
311+
await service.dispose();
312+
313+
assertEquals(cacheSets.length, 1);
314+
assertEquals(cacheSets[0]?.query, "alpha");
315+
assertEquals(cacheSets[0]?.nodes, []);
316+
assertEquals(cacheSets[0]?.episodeSummaries, [
317+
"Source → alpha: fact:alpha",
318+
]);
319+
assertEquals(cacheSets[0]?.nodeRefs, []);
320+
});
321+
251322
it("does not start a second drain while a slow drain is still in flight", async () => {
252323
const timers = createFakeTimers();
253324
const drainDeferred = deferred<{ status: "idle" }>();

src/services/graphiti-async.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ export class GraphitiAsyncService {
186186
maxNodes: 12,
187187
}),
188188
]);
189-
if (result.degraded) return;
190189
if (this.stopped) return;
191190

192191
const [meta, current] = await Promise.all([
@@ -202,7 +201,7 @@ export class GraphitiAsyncService {
202201
}
203202
if (this.stopped) return;
204203

205-
const { nodes } = result;
204+
const nodes = result.degraded ? [] : result.nodes;
206205
await this.cache.set(groupId, {
207206
query: normalized,
208207
refreshedAt: Date.now(),

src/services/hot-tier-slice.test.ts

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,7 +1450,7 @@ describe("hot-tier vertical slice", () => {
14501450
}
14511451
});
14521452

1453-
it("preserves warm cache entry when refresh degrades during node search", async () => {
1453+
it("stores fact-only refreshes with empty nodes when node search degrades", async () => {
14541454
const redis = new RedisClient({
14551455
endpoint: "redis://unused",
14561456
runtimeFactory: () => new ReconnectingRedisRuntime({ available: true }),
@@ -1478,7 +1478,13 @@ describe("hot-tier vertical slice", () => {
14781478
return Promise.resolve([]);
14791479
},
14801480
searchMemoryFacts() {
1481-
return Promise.resolve([]);
1481+
return Promise.resolve([
1482+
{
1483+
fact: "fact:outage",
1484+
source_node: { name: "WarmNode" },
1485+
target_node: { name: "OutageTopic" },
1486+
},
1487+
]);
14821488
},
14831489
searchNodesWithStatus() {
14841490
return Promise.resolve({ nodes: [], degraded: true });
@@ -1499,16 +1505,12 @@ describe("hot-tier vertical slice", () => {
14991505
const cached = await redisCache.get("group-1");
15001506
const meta = await redisCache.getMeta("group-1");
15011507

1502-
assertEquals(cached, {
1503-
query: "warm query",
1504-
refreshedAt: 111,
1505-
nodes: [{
1506-
uuid: "warm-node",
1507-
name: "WarmNode",
1508-
summary: "Existing warm cache entry",
1509-
}],
1510-
nodeRefs: ["warm-node"],
1511-
});
1508+
assertEquals(cached?.query, "outage query");
1509+
assertEquals(cached?.nodes, []);
1510+
assertEquals(cached?.nodeRefs, []);
1511+
assertEquals(cached?.episodeSummaries, [
1512+
"WarmNode → OutageTopic: fact:outage",
1513+
]);
15121514
assertEquals(meta?.lastQuery, "outage query");
15131515
});
15141516

0 commit comments

Comments
 (0)