Skip to content

Commit 200c809

Browse files
Shawclaude
andcommitted
fix(app-control): dedupe verification verdicts + harden bridge unsubscribe
VerificationRoomBridgeService now deduplicates verdict broadcasts keyed by `${sessionId}:${verdict}` with a 10-minute TTL so a replayed task_complete or escalation event cannot post a duplicate continuation message into the originating chat room. Sweep is opportunistic on each insert. stop() now reads the stored unsubscribe locally, clears the field first (so a retry of stop() is a no-op), runtime-checks that the value is a function, and wraps the call in a single-purpose try/catch that warn-logs a misbehaving coordinator without crashing service teardown. Adds five tests: - dedupes duplicate verdict events for the same session - emits separately for the same session when verdict differs - re-emits a verdict after the dedupe TTL expires (fake timers) - stop() handles a non-function unsubscribe gracefully - stop() handles an unsubscribe that throws Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent af29d3d commit 200c809

3 files changed

Lines changed: 219 additions & 4 deletions

File tree

plugins/plugin-app-control/typescript/src/services/__tests__/verification-room-bridge.retry.test.ts

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
*/
3939

4040
import type { IAgentRuntime } from "@elizaos/core";
41-
import { beforeEach, describe, expect, it, vi } from "vitest";
41+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
4242
import { VerificationRoomBridgeService } from "../verification-room-bridge.js";
4343

4444
/**
@@ -172,6 +172,39 @@ function escalationEvent(args: {
172172
};
173173
}
174174

175+
/**
176+
* Shape of the broadcast `task_complete` (pass-verdict) event from
177+
* `swarm-decision-loop.ts`. Mirrors `passEvent` in the unit suite but
178+
* keeps a sessionId argument so dedupe tests can pin it explicitly.
179+
*/
180+
function passEvent(args: {
181+
sessionId: string;
182+
originRoomId: string;
183+
appName: string;
184+
}): SwarmEventLike {
185+
return {
186+
type: "task_complete",
187+
sessionId: args.sessionId,
188+
timestamp: Date.now(),
189+
data: {
190+
reasoning: "validator pass",
191+
verification: {
192+
source: "custom-validator",
193+
verdict: "pass",
194+
validator: { service: "app-verification", method: "verifyApp" },
195+
params: {
196+
workdir: "/tmp/wd",
197+
appName: args.appName,
198+
profile: "full",
199+
},
200+
},
201+
originRoomId: args.originRoomId,
202+
label: `create-app:${args.appName}`,
203+
workdir: "/tmp/wd",
204+
},
205+
};
206+
}
207+
175208
describe("VerificationRoomBridgeService — retry-loop integration", () => {
176209
let coord: FakeCoordinator;
177210

@@ -288,3 +321,85 @@ describe("VerificationRoomBridgeService — retry-loop integration", () => {
288321
expect(memory.content.text).toContain("2/3");
289322
});
290323
});
324+
325+
describe("VerificationRoomBridgeService — verdict dedupe", () => {
326+
let coord: FakeCoordinator;
327+
328+
beforeEach(() => {
329+
coord = createFakeCoordinator();
330+
});
331+
332+
afterEach(() => {
333+
// Always restore real timers in case a test installed fakes.
334+
vi.useRealTimers();
335+
});
336+
337+
it("dedupes duplicate verdict events for the same session", async () => {
338+
const { runtime, createMemory } = createRuntime(coord);
339+
await VerificationRoomBridgeService.start(runtime);
340+
341+
const evt = passEvent({
342+
sessionId: "sess-dedupe-1",
343+
originRoomId: "room-dedupe",
344+
appName: "notes-app",
345+
});
346+
coord.emit(evt);
347+
coord.emit(evt);
348+
await flushMicrotasks();
349+
350+
expect(createMemory).toHaveBeenCalledTimes(1);
351+
});
352+
353+
it("emits separately for the same session when verdict differs", async () => {
354+
const { runtime, createMemory } = createRuntime(coord);
355+
await VerificationRoomBridgeService.start(runtime);
356+
357+
const sessionId = "sess-dedupe-mixed";
358+
const room = "room-dedupe-mixed";
359+
coord.emit(
360+
passEvent({ sessionId, originRoomId: room, appName: "notes-app" }),
361+
);
362+
coord.emit(
363+
escalationEvent({
364+
sessionId,
365+
retryCount: 3,
366+
maxRetries: 3,
367+
originRoomId: room,
368+
appName: "notes-app",
369+
}),
370+
);
371+
await flushMicrotasks();
372+
373+
expect(createMemory).toHaveBeenCalledTimes(2);
374+
const verdicts = createMemory.mock.calls.map((call) => {
375+
const [memory] = call as [{ content: { text: string } }, string];
376+
return memory.content.text.includes("verification failure")
377+
? "fail"
378+
: "pass";
379+
});
380+
expect(verdicts.sort()).toEqual(["fail", "pass"]);
381+
});
382+
383+
it("re-emits a verdict after the dedupe TTL expires", async () => {
384+
vi.useFakeTimers();
385+
const { runtime, createMemory } = createRuntime(coord);
386+
await VerificationRoomBridgeService.start(runtime);
387+
388+
const evt = passEvent({
389+
sessionId: "sess-dedupe-ttl",
390+
originRoomId: "room-ttl",
391+
appName: "notes-app",
392+
});
393+
coord.emit(evt);
394+
await flushMicrotasks();
395+
expect(createMemory).toHaveBeenCalledTimes(1);
396+
397+
// Advance past the 10-minute TTL. The next identical event should
398+
// pass the dedupe check and produce a second memory.
399+
vi.advanceTimersByTime(10 * 60 * 1000 + 1);
400+
coord.emit(evt);
401+
await flushMicrotasks();
402+
403+
expect(createMemory).toHaveBeenCalledTimes(2);
404+
});
405+
});

plugins/plugin-app-control/typescript/src/services/__tests__/verification-room-bridge.test.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
import type { IAgentRuntime } from "@elizaos/core";
18+
import { logger } from "@elizaos/core";
1819
import { beforeEach, describe, expect, it, vi } from "vitest";
1920
import { VerificationRoomBridgeService } from "../verification-room-bridge.js";
2021

@@ -272,4 +273,50 @@ describe("VerificationRoomBridgeService", () => {
272273
expect(coord.listeners.length).toBe(0);
273274
expect(coord.unsubscribed).toBe(1);
274275
});
276+
277+
it("stop() handles a non-function unsubscribe gracefully", async () => {
278+
const { runtime } = createRuntime(coord);
279+
const service = await VerificationRoomBridgeService.start(runtime);
280+
// Force the stored unsubscribe into an invalid runtime shape — what
281+
// would happen if a future coordinator surface returned an object
282+
// instead of the documented `() => void`.
283+
(service as unknown as { unsubscribe: unknown }).unsubscribe = {
284+
not: "callable",
285+
};
286+
const warnSpy = vi.spyOn(logger, "warn").mockImplementation(() => {});
287+
await expect(service.stop()).resolves.toBeUndefined();
288+
expect(warnSpy).toHaveBeenCalledWith(
289+
expect.stringContaining(
290+
"[VerificationRoomBridge] stored unsubscribe was not a function",
291+
),
292+
);
293+
// Field cleared so a second stop() is a no-op.
294+
expect(
295+
(service as unknown as { unsubscribe: unknown }).unsubscribe,
296+
).toBeNull();
297+
warnSpy.mockRestore();
298+
});
299+
300+
it("stop() handles an unsubscribe that throws", async () => {
301+
const { runtime } = createRuntime(coord);
302+
const service = await VerificationRoomBridgeService.start(runtime);
303+
const boom = new Error("coordinator exploded");
304+
(service as unknown as { unsubscribe: () => void }).unsubscribe = () => {
305+
throw boom;
306+
};
307+
const warnSpy = vi.spyOn(logger, "warn").mockImplementation(() => {});
308+
await expect(service.stop()).resolves.toBeUndefined();
309+
expect(warnSpy).toHaveBeenCalledWith(
310+
expect.stringContaining(
311+
"[VerificationRoomBridge] unsubscribe threw during stop()",
312+
),
313+
);
314+
expect(warnSpy).toHaveBeenCalledWith(
315+
expect.stringContaining("coordinator exploded"),
316+
);
317+
expect(
318+
(service as unknown as { unsubscribe: unknown }).unsubscribe,
319+
).toBeNull();
320+
warnSpy.mockRestore();
321+
});
275322
});

plugins/plugin-app-control/typescript/src/services/verification-room-bridge.ts

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,16 @@ const APP_VERIFICATION_SERVICE = "app-verification";
4242
const VERIFY_APP_METHOD = "verifyApp";
4343
const VERIFY_PLUGIN_METHOD = "verifyPlugin";
4444

45+
/**
46+
* Dedupe TTL for verdict events keyed by `${sessionId}:${verdict}`.
47+
*
48+
* The broadcast bus may replay events under network blips, supervisor
49+
* retries, or a future multi-listener architecture. A real verdict for
50+
* a given session lands once, within seconds; 10 minutes is well past
51+
* the window where a duplicate is anything other than a replay.
52+
*/
53+
const VERDICT_DEDUPE_TTL_MS = 10 * 60 * 1000;
54+
4555
/**
4656
* Minimal shape of the SwarmCoordinator service surface this bridge
4757
* depends on. We only need `subscribe`; declared locally so we don't
@@ -181,6 +191,15 @@ export class VerificationRoomBridgeService extends Service {
181191

182192
private unsubscribe: (() => void) | null = null;
183193

194+
/**
195+
* Dedupe map: `${sessionId}:${verdict}` -> expiresAt epoch ms. Drops
196+
* replayed verdict events that would otherwise post duplicate chat
197+
* memories. Entries age out via `VERDICT_DEDUPE_TTL_MS`; we sweep
198+
* opportunistically on each insert (single-digit concurrent verdicts
199+
* in practice).
200+
*/
201+
private readonly verdictDedupe: Map<string, number> = new Map();
202+
184203
static override async start(
185204
runtime: IAgentRuntime,
186205
): Promise<VerificationRoomBridgeService> {
@@ -190,9 +209,25 @@ export class VerificationRoomBridgeService extends Service {
190209
}
191210

192211
override async stop(): Promise<void> {
193-
if (this.unsubscribe) {
194-
this.unsubscribe();
195-
this.unsubscribe = null;
212+
const unsub = this.unsubscribe;
213+
// Always clear the field first so a retry of stop() can't double-call.
214+
this.unsubscribe = null;
215+
if (unsub === null) return;
216+
if (typeof unsub !== "function") {
217+
logger.warn(
218+
"[VerificationRoomBridge] stored unsubscribe was not a function; skipping",
219+
);
220+
return;
221+
}
222+
// Single-purpose catch: a misbehaving coordinator must not crash
223+
// service teardown. Translate the failure into a structured warn
224+
// log and continue.
225+
try {
226+
unsub();
227+
} catch (err) {
228+
logger.warn(
229+
`[VerificationRoomBridge] unsubscribe threw during stop(): ${err instanceof Error ? err.message : String(err)}`,
230+
);
196231
}
197232
}
198233

@@ -226,6 +261,18 @@ export class VerificationRoomBridgeService extends Service {
226261
const payload = decodeEvent(event);
227262
if (!payload) return;
228263

264+
const dedupeKey = `${event.sessionId}:${payload.verdict}`;
265+
const now = Date.now();
266+
this.sweepExpiredDedupe(now);
267+
const existingExpiry = this.verdictDedupe.get(dedupeKey);
268+
if (existingExpiry !== undefined && existingExpiry > now) {
269+
logger.debug(
270+
`[VerificationRoomBridge] dedupe drop sessionId=${event.sessionId} verdict=${payload.verdict}`,
271+
);
272+
return;
273+
}
274+
this.verdictDedupe.set(dedupeKey, now + VERDICT_DEDUPE_TTL_MS);
275+
229276
const text =
230277
payload.verdict === "pass"
231278
? buildPassMessage(payload)
@@ -248,6 +295,12 @@ export class VerificationRoomBridgeService extends Service {
248295
`[VerificationRoomBridge] posted ${payload.verdict} verdict for ${payload.targetName} into room=${payload.originRoomId}`,
249296
);
250297
}
298+
299+
private sweepExpiredDedupe(now: number): void {
300+
for (const [key, expiresAt] of this.verdictDedupe) {
301+
if (expiresAt <= now) this.verdictDedupe.delete(key);
302+
}
303+
}
251304
}
252305

253306
export default VerificationRoomBridgeService;

0 commit comments

Comments
 (0)