Skip to content

Commit d87a28e

Browse files
committed
chore: address latest review follow-ups
1 parent 3a3bb04 commit d87a28e

7 files changed

Lines changed: 174 additions & 33 deletions

File tree

docs/ReviewProtocol.md

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,20 @@ request and review feedback needs to be handled systematically.
1010
- keep fixes narrow and scoped to the verified issue
1111
- resolve handled review threads and request a fresh review cycle
1212

13+
## Required Latest-Page Query
14+
15+
Use this command exactly as written for metadata-first traversal to the latest
16+
review-thread page, followed by a narrow detail fetch only for unresolved
17+
threads found on that last page. Do not rewrite, broaden, or replace it with an
18+
equivalent query.
19+
20+
If this command fails for any reason, stop and report the failure explicitly
21+
before taking any further review-handling action.
22+
23+
```bash
24+
deno eval 'const o="OWNER",r="REPO",n="PR_NUMBER",mq="query($o:String!,$r:String!,$n:Int!,$a:String){repository(owner:$o,name:$r){pullRequest(number:$n){reviewThreads(first:20,after:$a){pageInfo{hasNextPage endCursor}nodes{id isResolved isOutdated path}}}}}",dq="query($ids:[ID!]!){nodes(ids:$ids){... on PullRequestReviewThread{id path isResolved isOutdated comments(first:10){nodes{author{login}body url createdAt}}}}}";let a=null,t;for(;;){const c=new Deno.Command("gh",{args:["api","graphql","-f",`query=${mq}`,"-F",`o=${o}`,"-F",`r=${r}`,"-F",`n=${n}`,...(a?["-F",`a=${a}`]:[])]});const x=await c.output();if(!x.success){console.error(new TextDecoder().decode(x.stderr));Deno.exit(x.code)}t=JSON.parse(new TextDecoder().decode(x.stdout)).data.repository.pullRequest.reviewThreads;if(!t.pageInfo.hasNextPage)break;a=t.pageInfo.endCursor}const u=t.nodes.filter(x=>!x.isResolved),ids=u.map(x=>x.id);let d=[];if(ids.length){const c=new Deno.Command("gh",{args:["api","graphql","-f",`query=${dq}`,...ids.flatMap(id=>["-F",`ids[]=${id}`])]});const x=await c.output();if(!x.success){console.error(new TextDecoder().decode(x.stderr));Deno.exit(x.code)}d=JSON.parse(new TextDecoder().decode(x.stdout)).data.nodes.filter(Boolean)}console.log(JSON.stringify({pageInfo:t.pageInfo,unresolved:d.length?d:u}))'
25+
```
26+
1327
## Workflow
1428

1529
1. Detect the active PR.
@@ -21,22 +35,20 @@ request and review feedback needs to be handled systematically.
2135
- Use GraphQL `reviewThreads` as the source of truth for unresolved state;
2236
REST review comments do not expose thread resolution and cannot be filtered
2337
to unresolved-only.
38+
- Run the required latest-page query command exactly as written in
39+
`Required Latest-Page Query`.
40+
- If the command fails, stop and report the failure explicitly.
2441
- Keep GraphQL payloads narrow: request small pages (`first: 20` or similar)
2542
and fetch only thread metadata first (`id`, `isResolved`, `isOutdated`,
26-
`path`, `pageInfo`). Do not request full comment bodies for every thread in
27-
the first pass.
28-
- Filter unresolved threads locally from that metadata page set, then run a
29-
second narrow GraphQL query only for the unresolved thread IDs you still
30-
need details for.
43+
`path`, `pageInfo`) while paginating. Do not request full comment bodies
44+
for every thread in the first pass.
45+
- Traverse metadata pages sequentially until you reach the latest page, then
46+
filter unresolved threads locally from that latest page result.
47+
- If unresolved items are found there, run a second narrow GraphQL query only
48+
for those unresolved thread IDs to fetch review contents.
3149
- If you need latest-first lightweight browsing, use REST review comments as
3250
a secondary view (`/pulls/{number}/comments?sort=updated&direction=desc`),
3351
but do not use REST as the authoritative unresolved-thread source.
34-
- Minified unresolved-only page pattern:
35-
`gh api graphql -f query='query($o:String!,$r:String!,$n:Int!,$a:String){repository(owner:$o,name:$r){pullRequest(number:$n){reviewThreads(first:20,after:$a){pageInfo{hasNextPage endCursor}nodes{id isResolved isOutdated path}}}}}' -F o=OWNER -F r=REPO -F n=PR_NUMBER > /tmp/review-page.json && python3 - <<'PY'
36-
import json
37-
with open('/tmp/review-page.json') as f:t=json.load(f)['data']['repository']['pullRequest']['reviewThreads']
38-
print(json.dumps({'pageInfo':t['pageInfo'],'unresolved':[x for x in t['nodes'] if not x['isResolved']]},separators=(',',':')))
39-
PY`
4052
- Normalize unresolved items into discrete review claims with:
4153
- thread id
4254
- file/path

src/config.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ function makeAdapter(options?: {
1414
searchResult?: unknown | null;
1515
loadResult?: unknown | null;
1616
searchError?: Error;
17+
loadError?: Error;
1718
}): ConfigExplorerAdapter {
1819
return {
1920
search() {
@@ -23,6 +24,7 @@ function makeAdapter(options?: {
2324
: { config: options.searchResult };
2425
},
2526
load() {
27+
if (options?.loadError) throw options.loadError;
2628
return options?.loadResult == null
2729
? null
2830
: { config: options.loadResult };
@@ -243,6 +245,23 @@ describe("config", () => {
243245
assertEquals(config.redis.batchSize, 20);
244246
});
245247

248+
it("fails open to defaults when the legacy config file cannot be loaded", () => {
249+
using _homedir = stub(os, "homedir", () => "/users/tester");
250+
setConfigExplorerAdapterForTesting(() =>
251+
makeAdapter({
252+
loadError: new Error("legacy load failed"),
253+
})
254+
);
255+
256+
const config = loadConfig();
257+
258+
assertEquals(config.graphiti.endpoint, "http://localhost:8000/mcp");
259+
assertEquals(config.graphiti.groupIdPrefix, "opencode");
260+
assertEquals(config.graphiti.driftThreshold, 0.5);
261+
assertEquals(config.redis.endpoint, "redis://localhost:6379");
262+
assertEquals(config.redis.batchSize, 20);
263+
});
264+
246265
it("fails open to defaults when config discovery initialization fails", () => {
247266
setConfigExplorerAdapterForTesting(() => {
248267
throw new Error("cosmiconfig unavailable");

src/config.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,11 @@ const loadLegacyConfig = (
305305
);
306306
};
307307

308-
const isDiscoveryInfrastructureFailure = (error: unknown): boolean =>
308+
const isRecoverableConfigLoadFailure = (error: unknown): boolean =>
309309
error instanceof ConfigLoadError &&
310310
(error.code === "config-discovery-init" ||
311-
error.code === "config-discovery-search");
311+
error.code === "config-discovery-search" ||
312+
error.code === "config-file-load");
312313

313314
export function loadConfig(directory?: string): GraphitiConfig {
314315
try {
@@ -320,7 +321,7 @@ export function loadConfig(directory?: string): GraphitiConfig {
320321
} catch (error) {
321322
if (
322323
!(error instanceof ConfigLoadError) ||
323-
!isDiscoveryInfrastructureFailure(error)
324+
!isRecoverableConfigLoadFailure(error)
324325
) {
325326
throw error;
326327
}

src/services/batch-drain.test.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,41 @@ describe("batch drain", () => {
335335
}
336336
});
337337

338+
it("serializes claim heartbeat refreshes so they never overlap", async () => {
339+
const { events, drain } = await createDeps({
340+
events: { claimLockTtlSeconds: 2 },
341+
drain: { batchSize: 1, claimHeartbeatIntervalMs: 250 },
342+
});
343+
const event = createSessionEvent("message", "user", {
344+
summary: "long running",
345+
body: "long running",
346+
});
347+
await events.recordEvent("session-1", "group-1", event);
348+
349+
const originalRefreshClaimLease = events.refreshClaimLease.bind(events);
350+
let inFlight = 0;
351+
let maxInFlight = 0;
352+
let refreshCalls = 0;
353+
events.refreshClaimLease = async (...args) => {
354+
refreshCalls += 1;
355+
inFlight += 1;
356+
maxInFlight = Math.max(maxInFlight, inFlight);
357+
await new Promise((resolve) => setTimeout(resolve, 300));
358+
inFlight -= 1;
359+
return await originalRefreshClaimLease(...args);
360+
};
361+
362+
const result = await drain.drainGroup("group-1", {
363+
async addMemory() {
364+
await new Promise((resolve) => setTimeout(resolve, 650));
365+
},
366+
} as never);
367+
368+
assertEquals(result, { status: "success", drained: 1 });
369+
assertEquals(refreshCalls >= 3, true);
370+
assertEquals(maxInFlight, 1);
371+
});
372+
338373
it("limits batches using serialized Graphiti episode bodies", async () => {
339374
const first = createSessionEvent("message", "user", {
340375
summary: "first",

src/services/batch-drain.ts

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -191,22 +191,41 @@ export class BatchDrainService {
191191
}
192192

193193
let lostClaim = false;
194-
const refreshClaimOwnership = async (): Promise<boolean> => {
195-
if (lostClaim) return false;
196-
try {
197-
const refreshed = await this.events.refreshClaimLease(
198-
groupId,
199-
claimed.claimToken,
200-
claimed.lockTtlSeconds,
201-
);
202-
if (!refreshed) lostClaim = true;
203-
} catch {
204-
lostClaim = true;
205-
}
206-
return !lostClaim;
194+
let claimRefreshChain: Promise<void> = Promise.resolve();
195+
let heartbeatTimer: number | null = null;
196+
let refreshClaimHeartbeatRunning = false;
197+
const refreshClaimOwnership = (): Promise<boolean> => {
198+
const refreshTask = claimRefreshChain.then(async () => {
199+
if (lostClaim) return false;
200+
try {
201+
const refreshed = await this.events.refreshClaimLease(
202+
groupId,
203+
claimed.claimToken,
204+
claimed.lockTtlSeconds,
205+
);
206+
if (!refreshed) lostClaim = true;
207+
} catch {
208+
lostClaim = true;
209+
}
210+
return !lostClaim;
211+
});
212+
claimRefreshChain = refreshTask.then(() => undefined, () => undefined);
213+
return refreshTask;
207214
};
208215
const refreshClaimHeartbeat = async (): Promise<void> => {
209-
await refreshClaimOwnership();
216+
if (refreshClaimHeartbeatRunning) return;
217+
refreshClaimHeartbeatRunning = true;
218+
try {
219+
await refreshClaimOwnership();
220+
} finally {
221+
refreshClaimHeartbeatRunning = false;
222+
if (!lostClaim) {
223+
heartbeatTimer = setTimeout(
224+
refreshClaimHeartbeat,
225+
this.getClaimHeartbeatIntervalMs(claimed.lockTtlSeconds),
226+
) as unknown as number;
227+
}
228+
}
210229
};
211230
const confirmClaimOwnership = (): Promise<boolean> =>
212231
refreshClaimOwnership();
@@ -215,9 +234,10 @@ export class BatchDrainService {
215234
throw new DrainClaimLostError();
216235
}
217236
};
218-
const heartbeatInterval = setInterval(() => {
219-
void refreshClaimHeartbeat();
220-
}, this.getClaimHeartbeatIntervalMs(claimed.lockTtlSeconds));
237+
heartbeatTimer = setTimeout(
238+
refreshClaimHeartbeat,
239+
this.getClaimHeartbeatIntervalMs(claimed.lockTtlSeconds),
240+
) as unknown as number;
221241
let checkpointedCount = 0;
222242

223243
try {
@@ -300,7 +320,8 @@ export class BatchDrainService {
300320
logger.warn("Drain batch failed; will retry later", { groupId, err });
301321
return { status: "retry", drained: 0 };
302322
} finally {
303-
clearInterval(heartbeatInterval);
323+
if (heartbeatTimer !== null) clearTimeout(heartbeatTimer);
324+
await claimRefreshChain;
304325
}
305326
}
306327
}

src/services/graphiti-mcp.test.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@ import { assertEquals, assertRejects } from "jsr:@std/assert@^1.0.0";
22
import { describe, it } from "jsr:@std/testing@^1.0.0/bdd";
33
import { GraphitiOfflineError } from "./connection-manager.ts";
44
import { GraphitiMcpClient } from "./graphiti-mcp.ts";
5+
import {
6+
setOpenCodeClient,
7+
setSuppressConsoleWarningsDuringTestsOverride,
8+
setWarningTaskScheduler,
9+
} from "./opencode-warning.ts";
510

611
describe("GraphitiMcpClient", () => {
712
it("connect rejects explicitly after stop", async () => {
@@ -57,4 +62,52 @@ describe("GraphitiMcpClient", () => {
5762
degraded: true,
5863
});
5964
});
65+
66+
it("reports searchNodesWithStatus availability warnings with the correct operation name", async () => {
67+
const scheduledTasks: Array<() => void> = [];
68+
const appLogCalls: unknown[] = [];
69+
setSuppressConsoleWarningsDuringTestsOverride(true);
70+
setWarningTaskScheduler((callback) => {
71+
scheduledTasks.push(callback);
72+
});
73+
setOpenCodeClient({
74+
app: {
75+
log(input: unknown) {
76+
appLogCalls.push(input);
77+
},
78+
},
79+
});
80+
81+
try {
82+
const client = new GraphitiMcpClient({
83+
start() {},
84+
stop() {
85+
return Promise.resolve();
86+
},
87+
ready() {
88+
return Promise.resolve(true);
89+
},
90+
callTool() {
91+
return Promise.reject(new GraphitiOfflineError("offline", "offline"));
92+
},
93+
});
94+
95+
assertEquals(await client.searchNodesWithStatus({ query: "test" }), {
96+
nodes: [],
97+
degraded: true,
98+
});
99+
assertEquals(scheduledTasks.length, 1);
100+
assertEquals(appLogCalls.length, 0);
101+
for (const task of scheduledTasks) task();
102+
assertEquals(
103+
(appLogCalls[0] as { body: { extra: { operation: string } } }).body
104+
.extra.operation,
105+
"searchNodesWithStatus",
106+
);
107+
} finally {
108+
setOpenCodeClient(undefined);
109+
setWarningTaskScheduler(undefined);
110+
setSuppressConsoleWarningsDuringTestsOverride(undefined);
111+
}
112+
});
60113
});

src/services/graphiti-mcp.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ export class GraphitiMcpClient {
214214
notifyGraphitiAvailabilityIssue(
215215
"Graphiti unavailable; continuing without memory nodes.",
216216
{
217-
operation: "searchNodes",
217+
operation: "searchNodesWithStatus",
218218
err,
219219
},
220220
);

0 commit comments

Comments
 (0)