Skip to content

Commit dc82d97

Browse files
committed
Fixing race condition in task updates
1 parent 4e17552 commit dc82d97

4 files changed

Lines changed: 181 additions & 16 deletions

File tree

packages/a2a-server/src/agent/task-event-driven.test.ts

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ describe('Task Event-Driven Scheduler', () => {
6666
handler({
6767
type: MessageBusType.TOOL_CALLS_UPDATE,
6868
toolCalls: [toolCall],
69+
schedulerId: 'task-id',
6970
});
7071

7172
expect(mockEventBus.publish).toHaveBeenCalledWith(
@@ -106,6 +107,7 @@ describe('Task Event-Driven Scheduler', () => {
106107
handler({
107108
type: MessageBusType.TOOL_CALLS_UPDATE,
108109
toolCalls: [toolCall],
110+
schedulerId: 'task-id',
109111
});
110112

111113
// Simulate A2A client confirmation
@@ -148,7 +150,11 @@ describe('Task Event-Driven Scheduler', () => {
148150
const handler = (messageBus.subscribe as Mock).mock.calls.find(
149151
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
150152
)?.[1];
151-
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
153+
handler({
154+
type: MessageBusType.TOOL_CALLS_UPDATE,
155+
toolCalls: [toolCall],
156+
schedulerId: 'task-id',
157+
});
152158

153159
// Simulate Rejection (Cancel)
154160
const handled = await (
@@ -174,7 +180,11 @@ describe('Task Event-Driven Scheduler', () => {
174180
correlationId: 'corr-2',
175181
confirmationDetails: { type: 'info', title: 'test', prompt: 'test' },
176182
};
177-
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall2] });
183+
handler({
184+
type: MessageBusType.TOOL_CALLS_UPDATE,
185+
toolCalls: [toolCall2],
186+
schedulerId: 'task-id',
187+
});
178188

179189
// Simulate ModifyWithEditor
180190
const handled2 = await (
@@ -215,7 +225,11 @@ describe('Task Event-Driven Scheduler', () => {
215225
const handler = (messageBus.subscribe as Mock).mock.calls.find(
216226
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
217227
)?.[1];
218-
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
228+
handler({
229+
type: MessageBusType.TOOL_CALLS_UPDATE,
230+
toolCalls: [toolCall],
231+
schedulerId: 'task-id',
232+
});
219233

220234
// Simulate ProceedOnce for MCP
221235
const handled = await (
@@ -255,7 +269,11 @@ describe('Task Event-Driven Scheduler', () => {
255269
const handler = (messageBus.subscribe as Mock).mock.calls.find(
256270
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
257271
)?.[1];
258-
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
272+
handler({
273+
type: MessageBusType.TOOL_CALLS_UPDATE,
274+
toolCalls: [toolCall],
275+
schedulerId: 'task-id',
276+
});
259277

260278
const handled = await (
261279
task as unknown as {
@@ -294,7 +312,11 @@ describe('Task Event-Driven Scheduler', () => {
294312
const handler = (messageBus.subscribe as Mock).mock.calls.find(
295313
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
296314
)?.[1];
297-
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
315+
handler({
316+
type: MessageBusType.TOOL_CALLS_UPDATE,
317+
toolCalls: [toolCall],
318+
schedulerId: 'task-id',
319+
});
298320

299321
const handled = await (
300322
task as unknown as {
@@ -333,7 +355,11 @@ describe('Task Event-Driven Scheduler', () => {
333355
const handler = (messageBus.subscribe as Mock).mock.calls.find(
334356
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
335357
)?.[1];
336-
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
358+
handler({
359+
type: MessageBusType.TOOL_CALLS_UPDATE,
360+
toolCalls: [toolCall],
361+
schedulerId: 'task-id',
362+
});
337363

338364
const handled = await (
339365
task as unknown as {
@@ -376,7 +402,11 @@ describe('Task Event-Driven Scheduler', () => {
376402
const handler = (yoloMessageBus.subscribe as Mock).mock.calls.find(
377403
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
378404
)?.[1];
379-
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
405+
handler({
406+
type: MessageBusType.TOOL_CALLS_UPDATE,
407+
toolCalls: [toolCall],
408+
schedulerId: 'task-id',
409+
});
380410

381411
// Should NOT auto-publish ProceedOnce anymore, because PolicyEngine handles it directly
382412
expect(yoloMessageBus.publish).not.toHaveBeenCalledWith(
@@ -419,6 +449,7 @@ describe('Task Event-Driven Scheduler', () => {
419449
handler({
420450
type: MessageBusType.TOOL_CALLS_UPDATE,
421451
toolCalls: [toolCall],
452+
schedulerId: 'task-id',
422453
});
423454

424455
// Should publish artifact update for output
@@ -453,7 +484,11 @@ describe('Task Event-Driven Scheduler', () => {
453484
const handler = (messageBus.subscribe as Mock).mock.calls.find(
454485
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
455486
)?.[1];
456-
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
487+
handler({
488+
type: MessageBusType.TOOL_CALLS_UPDATE,
489+
toolCalls: [toolCall],
490+
schedulerId: 'task-id',
491+
});
457492

458493
// The tool should be complete and registered appropriately, eventually
459494
// triggering the toolCompletionPromise resolution when all clear.
@@ -533,6 +568,7 @@ describe('Task Event-Driven Scheduler', () => {
533568
handler({
534569
type: MessageBusType.TOOL_CALLS_UPDATE,
535570
toolCalls: [toolCall1, toolCall2],
571+
schedulerId: 'task-id',
536572
});
537573

538574
// Confirm first tool call
@@ -600,6 +636,7 @@ describe('Task Event-Driven Scheduler', () => {
600636
handler({
601637
type: MessageBusType.TOOL_CALLS_UPDATE,
602638
toolCalls: [toolCall1, toolCall2],
639+
schedulerId: 'task-id',
603640
});
604641

605642
// Should NOT transition to input-required yet
@@ -621,6 +658,7 @@ describe('Task Event-Driven Scheduler', () => {
621658
handler({
622659
type: MessageBusType.TOOL_CALLS_UPDATE,
623660
toolCalls: [toolCall1Complete, toolCall2],
661+
schedulerId: 'task-id',
624662
});
625663

626664
// Now it should transition

packages/a2a-server/src/agent/task.test.ts

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,4 +460,110 @@ describe('Task', () => {
460460
expect(task.currentPromptId).toBe(expectedPromptId2);
461461
});
462462
});
463+
464+
describe('Race Condition Fix', () => {
465+
const mockConfig = createMockConfig();
466+
const mockEventBus: ExecutionEventBus = {
467+
publish: vi.fn(),
468+
on: vi.fn(),
469+
off: vi.fn(),
470+
once: vi.fn(),
471+
removeAllListeners: vi.fn(),
472+
finished: vi.fn(),
473+
};
474+
475+
beforeEach(() => {
476+
vi.clearAllMocks();
477+
});
478+
479+
it('should NOT transition to input-required if a tool is still validating', async () => {
480+
// @ts-expect-error - Calling private constructor
481+
const task = new Task(
482+
'task-id',
483+
'context-id',
484+
mockConfig as Config,
485+
mockEventBus,
486+
);
487+
488+
// Manually register two tool calls
489+
task['_registerToolCall']('tool-1', 'awaiting_approval');
490+
task['_registerToolCall']('tool-2', 'validating');
491+
492+
// Call checkInputRequiredState (private)
493+
task['checkInputRequiredState']();
494+
495+
// Verify task state did NOT change to input-required
496+
expect(task.taskState).not.toBe('input-required');
497+
expect(mockEventBus.publish).not.toHaveBeenCalledWith(
498+
expect.objectContaining({
499+
status: expect.objectContaining({ state: 'input-required' }),
500+
}),
501+
);
502+
});
503+
504+
it('should transition to input-required if all active tools are awaiting approval', async () => {
505+
// @ts-expect-error - Calling private constructor
506+
const task = new Task(
507+
'task-id',
508+
'context-id',
509+
mockConfig as Config,
510+
mockEventBus,
511+
);
512+
513+
// Transition from submitted to working first to simulate normal flow
514+
task.taskState = 'working';
515+
516+
// Manually register tool calls
517+
task['_registerToolCall']('tool-1', 'awaiting_approval');
518+
519+
// Call checkInputRequiredState
520+
task['checkInputRequiredState']();
521+
522+
// Verify task state changed to input-required
523+
expect(task.taskState).toBe('input-required');
524+
expect(mockEventBus.publish).toHaveBeenCalledWith(
525+
expect.objectContaining({
526+
status: expect.objectContaining({ state: 'input-required' }),
527+
}),
528+
);
529+
});
530+
531+
it('handleEventDrivenToolCallsUpdate should ignore events for other schedulers', async () => {
532+
// @ts-expect-error - Calling private constructor
533+
const task = new Task(
534+
'task-id',
535+
'context-id',
536+
mockConfig as Config,
537+
mockEventBus,
538+
);
539+
540+
const handleEventDrivenToolCallSpy = vi.spyOn(
541+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
542+
task as any,
543+
'handleEventDrivenToolCall',
544+
);
545+
546+
const otherEvent = {
547+
type: 'tool-calls-update',
548+
toolCalls: [{ request: { callId: '1' }, status: 'executing' }],
549+
schedulerId: 'other-task-id',
550+
};
551+
552+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
553+
task['handleEventDrivenToolCallsUpdate'](otherEvent as any);
554+
555+
expect(handleEventDrivenToolCallSpy).not.toHaveBeenCalled();
556+
557+
const ownEvent = {
558+
type: 'tool-calls-update',
559+
toolCalls: [{ request: { callId: '1' }, status: 'executing' }],
560+
schedulerId: 'task-id',
561+
};
562+
563+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
564+
task['handleEventDrivenToolCallsUpdate'](ownEvent as any);
565+
566+
expect(handleEventDrivenToolCallSpy).toHaveBeenCalled();
567+
});
568+
});
463569
});

packages/a2a-server/src/agent/task.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,10 @@ export class Task {
413413
private handleEventDrivenToolCallsUpdate(
414414
event: ToolCallsUpdateMessage,
415415
): void {
416-
if (event.type !== MessageBusType.TOOL_CALLS_UPDATE) {
416+
if (
417+
event.type !== MessageBusType.TOOL_CALLS_UPDATE ||
418+
event.schedulerId !== this.id
419+
) {
417420
return;
418421
}
419422

@@ -508,7 +511,11 @@ export class Task {
508511
let isExecuting = false;
509512

510513
for (const [callId, status] of this.pendingToolCalls.entries()) {
511-
if (status === 'executing' || status === 'scheduled') {
514+
if (
515+
status === 'executing' ||
516+
status === 'scheduled' ||
517+
status === 'validating'
518+
) {
512519
isExecuting = true;
513520
} else if (
514521
status === 'awaiting_approval' &&

packages/a2a-server/src/http/app.test.ts

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -381,11 +381,11 @@ describe('E2E Tests', () => {
381381
]);
382382

383383
// 6. Tool 1 is awaiting approval.
384-
const toolCallAwaitEvent = events[5].result as TaskStatusUpdateEvent;
385-
expect(toolCallAwaitEvent.metadata?.['coderAgent']).toMatchObject({
384+
const toolCallAwaitEvent1 = events[5].result as TaskStatusUpdateEvent;
385+
expect(toolCallAwaitEvent1.metadata?.['coderAgent']).toMatchObject({
386386
kind: 'tool-call-confirmation',
387387
});
388-
expect(toolCallAwaitEvent.status.message?.parts).toMatchObject([
388+
expect(toolCallAwaitEvent1.status.message?.parts).toMatchObject([
389389
{
390390
data: {
391391
request: { callId: 'test-call-id-1' },
@@ -394,14 +394,28 @@ describe('E2E Tests', () => {
394394
},
395395
]);
396396

397-
// 7. The final event is "input-required".
398-
const finalEvent = events[6].result as TaskStatusUpdateEvent;
397+
// 7. Tool 2 is awaiting approval.
398+
const toolCallAwaitEvent2 = events[6].result as TaskStatusUpdateEvent;
399+
expect(toolCallAwaitEvent2.metadata?.['coderAgent']).toMatchObject({
400+
kind: 'tool-call-confirmation',
401+
});
402+
expect(toolCallAwaitEvent2.status.message?.parts).toMatchObject([
403+
{
404+
data: {
405+
request: { callId: 'test-call-id-2' },
406+
status: 'awaiting_approval',
407+
},
408+
},
409+
]);
410+
411+
// 8. The final event is "input-required".
412+
const finalEvent = events[7].result as TaskStatusUpdateEvent;
399413
expect(finalEvent.final).toBe(true);
400414
expect(finalEvent.status.state).toBe('input-required');
401415

402416
// The scheduler now waits for approval, so no more events are sent.
403417
assertUniqueFinalEventIsLast(events);
404-
expect(events.length).toBe(7);
418+
expect(events.length).toBe(8);
405419
});
406420

407421
it('should handle multiple tool calls sequentially in YOLO mode', async () => {

0 commit comments

Comments
 (0)