Skip to content

Commit 44c3ff9

Browse files
committed
chore: address final review follow-ups
1 parent e09eaa8 commit 44c3ff9

3 files changed

Lines changed: 103 additions & 15 deletions

File tree

docs/ReviewProtocol.md

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,19 @@ request and review feedback needs to be handled systematically.
1010
- keep fixes narrow and scoped to the verified issue
1111
- resolve handled review threads and request a fresh review cycle
1212

13-
## Required Latest-Page Query
13+
## Required Unresolved-Batch Query
1414

15-
Use this command exactly as written for metadata-first traversal to the latest
16-
review-thread page, followed by a narrow detail fetch only for unresolved
17-
threads found on that last page. Do not rewrite, broaden, or replace it with an
18-
equivalent query.
15+
Use this command exactly as written for metadata-first traversal across
16+
review-thread pages until it collects the first 10 unresolved threads. The 10
17+
unresolved items may be sparse, non-contiguous, and spread across multiple
18+
pages. After that metadata pass, fetch narrow details only for that unresolved
19+
batch. Do not rewrite, broaden, or replace it with an equivalent query.
1920

2021
If this command fails for any reason, stop and report the failure explicitly
2122
before taking any further review-handling action.
2223

2324
```bash
24-
deno eval 'const o="OWNER",r="REPO",n="PR_NUMBER",mq="query($o:String!,$r:String!,$n:Int!,$a:String){repository(owner:$o,name:$r){pullRequest(number:$n){reviewThreads(first:20,after:$a){pageInfo{hasNextPage endCursor}nodes{id isResolved isOutdated path}}}}}",dq="query($ids:[ID!]!){nodes(ids:$ids){... on PullRequestReviewThread{id path isResolved isOutdated comments(first:10){nodes{author{login}body url createdAt}}}}}";let a=null,t;for(;;){const c=new Deno.Command("gh",{args:["api","graphql","-f",`query=${mq}`,"-F",`o=${o}`,"-F",`r=${r}`,"-F",`n=${n}`,...(a?["-F",`a=${a}`]:[])]});const x=await c.output();if(!x.success){console.error(new TextDecoder().decode(x.stderr));Deno.exit(x.code)}t=JSON.parse(new TextDecoder().decode(x.stdout)).data.repository.pullRequest.reviewThreads;if(!t.pageInfo.hasNextPage)break;a=t.pageInfo.endCursor}const u=t.nodes.filter(x=>!x.isResolved),ids=u.map(x=>x.id);let d=[];if(ids.length){const c=new Deno.Command("gh",{args:["api","graphql","-f",`query=${dq}`,...ids.flatMap(id=>["-F",`ids[]=${id}`])]});const x=await c.output();if(!x.success){console.error(new TextDecoder().decode(x.stderr));Deno.exit(x.code)}d=JSON.parse(new TextDecoder().decode(x.stdout)).data.nodes.filter(Boolean)}console.log(JSON.stringify({pageInfo:t.pageInfo,unresolved:d.length?d:u}))'
25+
deno eval 'const o="OWNER",r="REPO",n="PR_NUMBER",maxUnresolved=10,mq="query($o:String!,$r:String!,$n:Int!,$a:String){repository(owner:$o,name:$r){pullRequest(number:$n){reviewThreads(first:20,after:$a){pageInfo{hasNextPage endCursor}nodes{id isResolved isOutdated path}}}}}",dq="query($ids:[ID!]!){nodes(ids:$ids){... on PullRequestReviewThread{id path isResolved isOutdated comments(first:10){nodes{author{login}body url createdAt}}}}}";let a=null,t={pageInfo:{hasNextPage:false,endCursor:null},nodes:[]},u=[];for(;;){const c=new Deno.Command("gh",{args:["api","graphql","-f",`query=${mq}`,"-F",`o=${o}`,"-F",`r=${r}`,"-F",`n=${n}`,...(a?["-F",`a=${a}`]:[])]});const x=await c.output();if(!x.success){console.error(new TextDecoder().decode(x.stderr));Deno.exit(x.code)}t=JSON.parse(new TextDecoder().decode(x.stdout)).data.repository.pullRequest.reviewThreads;for(const node of t.nodes){if(!node.isResolved)u.push(node);if(u.length===maxUnresolved)break}if(u.length===maxUnresolved||!t.pageInfo.hasNextPage)break;a=t.pageInfo.endCursor}const ids=u.slice(0,maxUnresolved).map(x=>x.id);let d=[];if(ids.length){const c=new Deno.Command("gh",{args:["api","graphql","-f",`query=${dq}`,...ids.flatMap(id=>["-F",`ids[]=${id}`])]});const x=await c.output();if(!x.success){console.error(new TextDecoder().decode(x.stderr));Deno.exit(x.code)}d=JSON.parse(new TextDecoder().decode(x.stdout)).data.nodes.filter(Boolean)}console.log(JSON.stringify({pageInfo:t.pageInfo,batchSize:ids.length,exhausted:!t.pageInfo.hasNextPage&&ids.length<maxUnresolved,unresolved:d.length?d:u.slice(0,maxUnresolved)}))'
2526
```
2627

2728
## Workflow
@@ -35,17 +36,21 @@ deno eval 'const o="OWNER",r="REPO",n="PR_NUMBER",mq="query($o:String!,$r:String
3536
- Use GraphQL `reviewThreads` as the source of truth for unresolved state;
3637
REST review comments do not expose thread resolution and cannot be filtered
3738
to unresolved-only.
38-
- Run the required latest-page query command exactly as written in
39-
`Required Latest-Page Query`.
39+
- Run the required unresolved-batch query command exactly as written in
40+
`Required Unresolved-Batch Query`.
4041
- If the command fails, stop and report the failure explicitly.
4142
- Keep GraphQL payloads narrow: request small pages (`first: 20` or similar)
4243
and fetch only thread metadata first (`id`, `isResolved`, `isOutdated`,
4344
`path`, `pageInfo`) while paginating. Do not request full comment bodies
4445
for every thread in the first pass.
45-
- Traverse metadata pages sequentially until you reach the latest page, then
46-
filter unresolved threads locally from that latest page result.
47-
- If unresolved items are found there, run a second narrow GraphQL query only
48-
for those unresolved thread IDs to fetch review contents.
46+
- Traverse metadata pages sequentially until you either collect 10 unresolved
47+
threads or exhaust pagination.
48+
- Expect unresolved threads to be sparse and split across multiple pages; do
49+
not assume they are contiguous on one page.
50+
- If unresolved items are found, run a second narrow GraphQL query only for
51+
those unresolved thread IDs in the current batch to fetch review contents.
52+
- Re-run the same query after resolving a batch so the protocol can process
53+
the next 10 unresolved items in a pseudo while loop until none remain.
4954
- If you need latest-first lightweight browsing, use REST review comments as
5055
a secondary view (`/pulls/{number}/comments?sort=updated&direction=desc`),
5156
but do not use REST as the authoritative unresolved-thread source.

src/services/graphiti-async.test.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,4 +230,82 @@ describe("GraphitiAsyncService", () => {
230230
globalThis.clearTimeout = originalClearTimeout;
231231
}
232232
});
233+
234+
it("does not start a second drain while a slow drain is still in flight", async () => {
235+
const originalSetTimeout = globalThis.setTimeout;
236+
const originalClearTimeout = globalThis.clearTimeout;
237+
238+
let recoveryHandler: (() => void) | undefined;
239+
const clearedTimers: unknown[] = [];
240+
const drainDeferred = deferred<{ status: "idle" }>();
241+
let drainCalls = 0;
242+
243+
globalThis.setTimeout = ((handler: TimerHandler, timeout?: number) => {
244+
if (timeout === 50) {
245+
recoveryHandler = handler as () => void;
246+
}
247+
return { handler, timeout } as unknown as ReturnType<typeof setTimeout>;
248+
}) as typeof setTimeout;
249+
globalThis.clearTimeout = ((timer: unknown) => {
250+
clearedTimers.push(timer);
251+
}) as typeof clearTimeout;
252+
253+
try {
254+
const service = new GraphitiAsyncService(
255+
{
256+
getEpisodes() {
257+
return Promise.resolve([]);
258+
},
259+
searchMemoryFacts() {
260+
return Promise.resolve([]);
261+
},
262+
searchNodesWithStatus() {
263+
return Promise.resolve({ nodes: [], degraded: true });
264+
},
265+
} as never,
266+
{
267+
get() {
268+
return Promise.resolve(null);
269+
},
270+
getMeta() {
271+
return Promise.resolve(null);
272+
},
273+
rememberRefreshQuery() {
274+
return Promise.resolve();
275+
},
276+
set() {
277+
return Promise.resolve();
278+
},
279+
} as never,
280+
{
281+
drainGroup() {
282+
drainCalls += 1;
283+
return drainDeferred.promise;
284+
},
285+
} as never,
286+
25,
287+
50,
288+
);
289+
290+
service.scheduleDrain("group-1");
291+
await Promise.resolve();
292+
service.scheduleDrain("group-1");
293+
294+
assertEquals(drainCalls, 1);
295+
assert(recoveryHandler);
296+
297+
recoveryHandler();
298+
await Promise.resolve();
299+
300+
assertEquals(drainCalls, 1);
301+
302+
drainDeferred.resolve({ status: "idle" });
303+
await service.dispose();
304+
305+
assertEquals(drainCalls, 1);
306+
} finally {
307+
globalThis.setTimeout = originalSetTimeout;
308+
globalThis.clearTimeout = originalClearTimeout;
309+
}
310+
});
233311
});

src/services/graphiti-async.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { logger } from "./logger.ts";
55
import type { RedisCacheService } from "./redis-cache.ts";
66

77
export class GraphitiAsyncService {
8+
private static readonly DEFAULT_DRAIN_RECOVERY_DELAY_MS = 30_000;
89
private readonly drainInFlight = new Map<string, Promise<void>>();
910
private readonly drainRetryTimers = new Map<
1011
string,
@@ -26,6 +27,8 @@ export class GraphitiAsyncService {
2627
private readonly cache: RedisCacheService,
2728
private readonly drain: BatchDrainService,
2829
private readonly drainRetryDelayMs = 1_000,
30+
private readonly drainRecoveryDelayMs =
31+
GraphitiAsyncService.DEFAULT_DRAIN_RECOVERY_DELAY_MS,
2932
) {}
3033

3134
async flushPendingGroups(groupIds: Iterable<string>): Promise<void> {
@@ -97,9 +100,11 @@ export class GraphitiAsyncService {
97100
if (!recovery || recovery.run !== run) return;
98101
this.drainRecoveryTimers.delete(groupId);
99102
if (this.drainInFlight.get(groupId) !== run) return;
100-
this.drainInFlight.delete(groupId);
101-
this.scheduleDrain(groupId);
102-
}, this.drainRetryDelayMs);
103+
logger.warn(
104+
"Graphiti drain recovery timeout exceeded; leaving in-flight drain intact",
105+
{ groupId, timeoutMs: this.drainRecoveryDelayMs },
106+
);
107+
}, this.drainRecoveryDelayMs);
103108

104109
this.drainRecoveryTimers.set(groupId, { run, timer });
105110
}

0 commit comments

Comments
 (0)