-
Notifications
You must be signed in to change notification settings - Fork 65
Expand file tree
/
Copy pathresult-watcher.ts
More file actions
92 lines (84 loc) · 2.62 KB
/
result-watcher.ts
File metadata and controls
92 lines (84 loc) · 2.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import * as fs from "node:fs";
import * as path from "node:path";
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import { buildCompletionKey, markSeenWithTtl } from "./completion-dedupe.js";
import { createFileCoalescer } from "./file-coalescer.js";
import type { SubagentState } from "./types.js";
export function createResultWatcher(
pi: ExtensionAPI,
state: SubagentState,
resultsDir: string,
completionTtlMs: number,
): {
startResultWatcher: () => void;
primeExistingResults: () => void;
stopResultWatcher: () => void;
} {
const handleResult = (file: string) => {
const resultPath = path.join(resultsDir, file);
if (!fs.existsSync(resultPath)) return;
try {
const data = JSON.parse(fs.readFileSync(resultPath, "utf-8")) as {
sessionId?: string;
cwd?: string;
};
if (data.sessionId && data.sessionId !== state.currentSessionId) return;
if (!data.sessionId && data.cwd && data.cwd !== state.baseCwd) return;
const now = Date.now();
const completionKey = buildCompletionKey(data, `result:${file}`);
if (markSeenWithTtl(state.completionSeen, completionKey, now, completionTtlMs)) {
try {
fs.unlinkSync(resultPath);
} catch {}
return;
}
pi.events.emit("subagent:complete", data);
fs.unlinkSync(resultPath);
} catch {}
};
state.resultFileCoalescer = createFileCoalescer(handleResult, 50);
const startResultWatcher = () => {
state.watcherRestartTimer = null;
try {
state.watcher = fs.watch(resultsDir, (ev, file) => {
if (ev !== "rename" || !file) return;
const fileName = file.toString();
if (!fileName.endsWith(".json")) return;
state.resultFileCoalescer.schedule(fileName);
});
state.watcher.on("error", () => {
state.watcher = null;
state.watcherRestartTimer = setTimeout(() => {
try {
fs.mkdirSync(resultsDir, { recursive: true });
startResultWatcher();
} catch {}
}, 3000);
});
state.watcher.unref?.();
} catch {
state.watcher = null;
state.watcherRestartTimer = setTimeout(() => {
try {
fs.mkdirSync(resultsDir, { recursive: true });
startResultWatcher();
} catch {}
}, 3000);
}
};
const primeExistingResults = () => {
fs.readdirSync(resultsDir)
.filter((f) => f.endsWith(".json"))
.forEach((file) => state.resultFileCoalescer.schedule(file, 0));
};
const stopResultWatcher = () => {
state.watcher?.close();
state.watcher = null;
if (state.watcherRestartTimer) {
clearTimeout(state.watcherRestartTimer);
}
state.watcherRestartTimer = null;
state.resultFileCoalescer.clear();
};
return { startResultWatcher, primeExistingResults, stopResultWatcher };
}