Skip to content

Commit 2bcd190

Browse files
authored
Merge branch 'main' into fix/1846-default-effort-level
2 parents 2845b8e + 6e616e0 commit 2bcd190

3 files changed

Lines changed: 249 additions & 26 deletions

File tree

apps/code/src/main/services/focus/sync-service.ts

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { ApplyPatchSaga } from "@posthog/git/sagas/patch";
1010
import ignore, { type Ignore } from "ignore";
1111
import { inject, injectable } from "inversify";
1212
import { MAIN_TOKENS } from "../../di/tokens";
13+
import { subscribeWithTimeout } from "../../utils/async";
1314
import { logger } from "../../utils/logger";
1415
import type { WatcherRegistryService } from "../watcher-registry/service";
1516

@@ -100,12 +101,15 @@ export class FocusSyncService {
100101
}
101102

102103
const watcherIgnore = ALWAYS_IGNORE.map((p) => `**/${p}/**`);
104+
const mainWatcherId = `focus-sync:main:${mainRepoPath}`;
105+
const worktreeWatcherId = `focus-sync:worktree:${worktreePath}`;
103106

107+
let mainRegistered = false;
104108
try {
105109
const mainSubPromise = watcher.subscribe(
106110
mainRepoPath,
107111
(err, events) => {
108-
if (this.watcherRegistry.isShutdown) return;
112+
if (!mainRegistered || this.watcherRegistry.isShutdown) return;
109113
if (err) {
110114
log.error("Main repo watcher error:", err);
111115
return;
@@ -115,28 +119,32 @@ export class FocusSyncService {
115119
{ ignore: watcherIgnore },
116120
);
117121

118-
const mainSubResult = await Promise.race([
119-
mainSubPromise.then((sub) => ({ sub, timeout: false })),
120-
new Promise<{ sub: null; timeout: true }>((resolve) =>
121-
setTimeout(() => resolve({ sub: null, timeout: true }), 5000),
122-
),
123-
]);
122+
const mainSubResult = await subscribeWithTimeout(
123+
mainSubPromise,
124+
5000,
125+
mainWatcherId,
126+
);
124127

125-
if (mainSubResult.timeout) {
128+
if (mainSubResult.result === "timeout") {
126129
log.warn("Main repo watcher subscription timed out");
127-
} else if (mainSubResult.sub) {
128-
this.mainWatcherId = `focus-sync:main:${mainRepoPath}`;
129-
this.watcherRegistry.register(this.mainWatcherId, mainSubResult.sub);
130+
} else {
131+
mainRegistered = true;
132+
this.mainWatcherId = mainWatcherId;
133+
this.watcherRegistry.register(
134+
this.mainWatcherId,
135+
mainSubResult.subscription,
136+
);
130137
}
131138
} catch (error) {
132139
log.error("Failed to subscribe to main repo watcher:", error);
133140
}
134141

142+
let worktreeRegistered = false;
135143
try {
136144
const worktreeSubPromise = watcher.subscribe(
137145
worktreePath,
138146
(err, events) => {
139-
if (this.watcherRegistry.isShutdown) return;
147+
if (!worktreeRegistered || this.watcherRegistry.isShutdown) return;
140148
if (err) {
141149
log.error("Worktree watcher error:", err);
142150
return;
@@ -146,20 +154,20 @@ export class FocusSyncService {
146154
{ ignore: watcherIgnore },
147155
);
148156

149-
const worktreeSubResult = await Promise.race([
150-
worktreeSubPromise.then((sub) => ({ sub, timeout: false })),
151-
new Promise<{ sub: null; timeout: true }>((resolve) =>
152-
setTimeout(() => resolve({ sub: null, timeout: true }), 5000),
153-
),
154-
]);
157+
const worktreeSubResult = await subscribeWithTimeout(
158+
worktreeSubPromise,
159+
5000,
160+
worktreeWatcherId,
161+
);
155162

156-
if (worktreeSubResult.timeout) {
163+
if (worktreeSubResult.result === "timeout") {
157164
log.warn("Worktree watcher subscription timed out");
158-
} else if (worktreeSubResult.sub) {
159-
this.worktreeWatcherId = `focus-sync:worktree:${worktreePath}`;
165+
} else {
166+
worktreeRegistered = true;
167+
this.worktreeWatcherId = worktreeWatcherId;
160168
this.watcherRegistry.register(
161169
this.worktreeWatcherId,
162-
worktreeSubResult.sub,
170+
worktreeSubResult.subscription,
163171
);
164172
}
165173
} catch (error) {
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
2+
3+
const warn = vi.hoisted(() => vi.fn());
4+
5+
vi.mock("./logger", () => ({
6+
logger: {
7+
scope: () => ({
8+
info: vi.fn(),
9+
error: vi.fn(),
10+
warn,
11+
debug: vi.fn(),
12+
}),
13+
},
14+
}));
15+
16+
import { subscribeWithTimeout, withTimeout } from "./async";
17+
18+
interface FakeSubscription {
19+
unsubscribe: () => Promise<unknown>;
20+
}
21+
22+
const makeSubscription = (
23+
unsubscribeImpl: () => Promise<unknown> = () => Promise.resolve(),
24+
): FakeSubscription & { unsubscribe: ReturnType<typeof vi.fn> } => {
25+
const unsubscribe = vi.fn(
26+
unsubscribeImpl,
27+
) as unknown as (() => Promise<unknown>) & ReturnType<typeof vi.fn>;
28+
return { unsubscribe };
29+
};
30+
31+
const deferred = <T>() => {
32+
let resolve!: (value: T) => void;
33+
let reject!: (reason?: unknown) => void;
34+
const promise = new Promise<T>((res, rej) => {
35+
resolve = res;
36+
reject = rej;
37+
});
38+
return { promise, resolve, reject };
39+
};
40+
41+
describe("withTimeout", () => {
42+
beforeEach(() => {
43+
vi.useFakeTimers();
44+
warn.mockClear();
45+
});
46+
47+
afterEach(() => {
48+
vi.useRealTimers();
49+
});
50+
51+
it("returns success with the value when the operation resolves first", async () => {
52+
const result = await withTimeout(Promise.resolve("done"), 1000);
53+
expect(result).toEqual({ result: "success", value: "done" });
54+
});
55+
56+
it("returns timeout when the operation is slower than the deadline", async () => {
57+
const { promise } = deferred<string>();
58+
const racePromise = withTimeout(promise, 1000);
59+
await vi.advanceTimersByTimeAsync(1000);
60+
expect(await racePromise).toEqual({ result: "timeout" });
61+
});
62+
63+
it("clears the timeout timer on success", async () => {
64+
const clearSpy = vi.spyOn(globalThis, "clearTimeout");
65+
await withTimeout(Promise.resolve("done"), 1000);
66+
expect(clearSpy).toHaveBeenCalledTimes(1);
67+
});
68+
});
69+
70+
describe("subscribeWithTimeout", () => {
71+
beforeEach(() => {
72+
vi.useFakeTimers();
73+
warn.mockClear();
74+
});
75+
76+
afterEach(() => {
77+
vi.useRealTimers();
78+
});
79+
80+
it("returns success and the subscription when subscribe resolves first", async () => {
81+
const sub = makeSubscription();
82+
const result = await subscribeWithTimeout(
83+
Promise.resolve(sub),
84+
1000,
85+
"test",
86+
);
87+
expect(result).toEqual({ result: "success", subscription: sub });
88+
expect(sub.unsubscribe).not.toHaveBeenCalled();
89+
});
90+
91+
it("clears the timeout timer on success", async () => {
92+
const clearSpy = vi.spyOn(globalThis, "clearTimeout");
93+
await subscribeWithTimeout(
94+
Promise.resolve(makeSubscription()),
95+
1000,
96+
"test",
97+
);
98+
expect(clearSpy).toHaveBeenCalledTimes(1);
99+
});
100+
101+
it("returns timeout and unsubscribes the late subscription", async () => {
102+
const sub = makeSubscription();
103+
const { promise, resolve } = deferred<FakeSubscription>();
104+
105+
const racePromise = subscribeWithTimeout(promise, 1000, "late-sub");
106+
await vi.advanceTimersByTimeAsync(1000);
107+
expect(await racePromise).toEqual({ result: "timeout" });
108+
109+
resolve(sub);
110+
await vi.runAllTimersAsync();
111+
await Promise.resolve();
112+
113+
expect(sub.unsubscribe).toHaveBeenCalledTimes(1);
114+
});
115+
116+
it("logs a warning when the late unsubscribe rejects", async () => {
117+
const sub = makeSubscription(() => Promise.reject(new Error("nope")));
118+
const { promise, resolve } = deferred<FakeSubscription>();
119+
120+
const racePromise = subscribeWithTimeout(promise, 1000, "boom");
121+
await vi.advanceTimersByTimeAsync(1000);
122+
await racePromise;
123+
124+
resolve(sub);
125+
await vi.runAllTimersAsync();
126+
await Promise.resolve();
127+
await Promise.resolve();
128+
129+
expect(warn).toHaveBeenCalledWith(
130+
expect.stringContaining("Failed to tear down late subscription (boom)"),
131+
expect.any(Error),
132+
);
133+
});
134+
135+
it("logs a warning when the subscribe promise rejects after the timeout", async () => {
136+
const { promise, reject } = deferred<FakeSubscription>();
137+
138+
const racePromise = subscribeWithTimeout(promise, 1000, "rejected-late");
139+
await vi.advanceTimersByTimeAsync(1000);
140+
await racePromise;
141+
142+
reject(new Error("subscribe blew up"));
143+
await vi.runAllTimersAsync();
144+
await Promise.resolve();
145+
await Promise.resolve();
146+
147+
expect(warn).toHaveBeenCalledWith(
148+
expect.stringContaining(
149+
"Late subscribe rejected after timeout (rejected-late)",
150+
),
151+
expect.any(Error),
152+
);
153+
});
154+
155+
it("propagates a subscribe rejection that beats the timeout", async () => {
156+
const failing = Promise.reject(new Error("immediate fail"));
157+
await expect(
158+
subscribeWithTimeout(failing, 1000, "early-fail"),
159+
).rejects.toThrow("immediate fail");
160+
});
161+
});

apps/code/src/main/utils/async.ts

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import { logger } from "./logger";
2+
3+
const log = logger.scope("async-utils");
4+
15
/**
26
* Races an operation against a timeout.
37
* Returns success with the value if the operation completes in time,
@@ -7,12 +11,62 @@ export async function withTimeout<T>(
711
operation: Promise<T>,
812
timeoutMs: number,
913
): Promise<{ result: "success"; value: T } | { result: "timeout" }> {
10-
const timeoutPromise = new Promise<{ result: "timeout" }>((resolve) =>
11-
setTimeout(() => resolve({ result: "timeout" }), timeoutMs),
12-
);
14+
let timeoutHandle!: ReturnType<typeof setTimeout>;
15+
const timeoutPromise = new Promise<{ result: "timeout" }>((resolve) => {
16+
timeoutHandle = setTimeout(() => resolve({ result: "timeout" }), timeoutMs);
17+
});
1318
const operationPromise = operation.then((value) => ({
1419
result: "success" as const,
1520
value,
1621
}));
17-
return Promise.race([operationPromise, timeoutPromise]);
22+
try {
23+
return await Promise.race([operationPromise, timeoutPromise]);
24+
} finally {
25+
clearTimeout(timeoutHandle);
26+
}
27+
}
28+
29+
/**
30+
* Races a subscribe-style promise against a timeout. If the timeout wins,
31+
* any late-arriving subscription is torn down via its `unsubscribe()` method
32+
* so the underlying resource (e.g. FSEvents/inotify fd, callback closure)
33+
* does not leak.
34+
*
35+
* The late teardown is fire-and-forget: the caller does not await it. Errors
36+
* during teardown (or a late rejection of the subscribe promise) are logged
37+
* at warn level with `label` for diagnostic context.
38+
*/
39+
export async function subscribeWithTimeout<
40+
T extends { unsubscribe(): Promise<unknown> },
41+
>(
42+
subscribePromise: Promise<T>,
43+
timeoutMs: number,
44+
label: string,
45+
): Promise<{ result: "success"; subscription: T } | { result: "timeout" }> {
46+
let timeoutHandle!: ReturnType<typeof setTimeout>;
47+
const timeoutPromise = new Promise<{ result: "timeout" }>((resolve) => {
48+
timeoutHandle = setTimeout(() => resolve({ result: "timeout" }), timeoutMs);
49+
});
50+
const successPromise = subscribePromise.then((subscription) => ({
51+
result: "success" as const,
52+
subscription,
53+
}));
54+
55+
const race = await Promise.race([successPromise, timeoutPromise]);
56+
clearTimeout(timeoutHandle);
57+
58+
if (race.result === "timeout") {
59+
subscribePromise
60+
.then((sub) =>
61+
sub.unsubscribe().catch((err) => {
62+
log.warn(`Failed to tear down late subscription (${label}):`, err);
63+
}),
64+
)
65+
.catch((err) => {
66+
log.warn(`Late subscribe rejected after timeout (${label}):`, err);
67+
});
68+
return { result: "timeout" };
69+
}
70+
71+
return race;
1872
}

0 commit comments

Comments
 (0)