Skip to content

Commit c0b4519

Browse files
ddwanganerli
authored andcommitted
Add agent.pause() / agent.resume() for mid-execution control
Allows pausing the agent's action loop at clean boundaries (between actions and between batches) for human-in-the-loop review or conditional execution, then resuming without losing state. stop() while paused unblocks the loop to prevent deadlocks.
1 parent 8407878 commit c0b4519

3 files changed

Lines changed: 291 additions & 1 deletion

File tree

packages/magnitude-core/src/agent/index.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ export class Agent {
7171

7272
//public readonly memory: AgentMemory;
7373
private doneActing: boolean;
74+
private _paused: boolean = false;
75+
private _pauseResolve: (() => void) | null = null;
7476

7577
protected latestTaskMemory: AgentMemory;// | null = null;
7678

@@ -116,6 +118,8 @@ export class Agent {
116118
this.models = new MultiModelHarness(llms);
117119
this.models.events.on('tokensUsed', (usage) => this.events.emit('tokensUsed', usage), this);
118120
this.doneActing = false;
121+
this._paused = false;
122+
this._pauseResolve = null;
119123

120124
this.memoryOptions = {
121125
// TODO: maybe do if Gemini or other prompt caching supported providers as well
@@ -387,6 +391,8 @@ export class Agent {
387391

388392
// Execute partial recipe
389393
for (const action of actions) {
394+
await this._waitIfPaused();
395+
if (this.doneActing) break;
390396
await this.exec(action, memory);
391397

392398
// const postActionScreenshot = await this.screenshot();
@@ -399,6 +405,7 @@ export class Agent {
399405
// if (finished) {
400406
// break;
401407
// }
408+
await this._waitIfPaused();
402409
if (this.doneActing) {
403410
break;
404411
}
@@ -420,12 +427,42 @@ export class Agent {
420427
this.doneActing = true;
421428
}
422429

430+
private async _waitIfPaused(): Promise<void> {
431+
if (!this._paused) return;
432+
this.events.emit('pause');
433+
logger.info("Agent: Paused");
434+
await new Promise<void>((resolve) => {
435+
this._pauseResolve = resolve;
436+
});
437+
}
438+
439+
pause(): void {
440+
this._paused = true;
441+
}
442+
443+
resume(): void {
444+
this._paused = false;
445+
if (this._pauseResolve) {
446+
this._pauseResolve();
447+
this._pauseResolve = null;
448+
}
449+
this.events.emit('resume');
450+
logger.info("Agent: Resumed");
451+
}
452+
453+
get paused(): boolean {
454+
return this._paused;
455+
}
456+
423457
async stop() {
424458
/**
425459
* Stop the agent and close the browser context.
426460
* May be called asynchronously and interrupt an agent in the middle of a action sequence.
427461
*/
428-
// set signal to cancelled?
462+
this.doneActing = true;
463+
if (this._paused) {
464+
this.resume(); // unblock so loop can see doneActing and exit
465+
}
429466
logger.info("Agent: Stopping connectors...");
430467
for (const connector of this.connectors) {
431468
try {
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
import { describe, expect, test } from 'bun:test';
2+
import EventEmitter from 'eventemitter3';
3+
import { AgentEvents } from '@/common/events';
4+
5+
// Minimal Agent-like class that isolates pause/resume logic for testing
6+
// without requiring LLM clients, connectors, or BAML dependencies
7+
class PauseableLoop {
8+
public readonly events: EventEmitter<AgentEvents> = new EventEmitter();
9+
private doneActing: boolean = false;
10+
private _paused: boolean = false;
11+
private _pauseResolve: (() => void) | null = null;
12+
13+
private async _waitIfPaused(): Promise<void> {
14+
if (!this._paused) return;
15+
this.events.emit('pause');
16+
await new Promise<void>((resolve) => {
17+
this._pauseResolve = resolve;
18+
});
19+
}
20+
21+
pause(): void {
22+
this._paused = true;
23+
}
24+
25+
resume(): void {
26+
this._paused = false;
27+
if (this._pauseResolve) {
28+
this._pauseResolve();
29+
this._pauseResolve = null;
30+
}
31+
this.events.emit('resume');
32+
}
33+
34+
get paused(): boolean {
35+
return this._paused;
36+
}
37+
38+
queueDone(): void {
39+
this.doneActing = true;
40+
}
41+
42+
stop(): void {
43+
this.doneActing = true;
44+
if (this._paused) {
45+
this.resume();
46+
}
47+
}
48+
49+
/**
50+
* Simulates the Agent._act() loop structure.
51+
* Each "batch" is an array of action labels. The loop processes batches
52+
* until doneActing is set, mirroring the real while(true) loop.
53+
*/
54+
async runLoop(batches: string[][], onAction: (label: string) => void): Promise<void> {
55+
this.doneActing = false;
56+
let batchIndex = 0;
57+
58+
while (true) {
59+
const actions = batches[batchIndex % batches.length];
60+
batchIndex++;
61+
62+
for (const action of actions) {
63+
await this._waitIfPaused();
64+
if (this.doneActing) return;
65+
onAction(action);
66+
}
67+
68+
await this._waitIfPaused();
69+
if (this.doneActing) return;
70+
}
71+
}
72+
}
73+
74+
describe('Agent pause/resume', () => {
75+
test('pause() sets paused flag', () => {
76+
const loop = new PauseableLoop();
77+
expect(loop.paused).toBe(false);
78+
loop.pause();
79+
expect(loop.paused).toBe(true);
80+
});
81+
82+
test('resume() clears paused flag', () => {
83+
const loop = new PauseableLoop();
84+
loop.pause();
85+
loop.resume();
86+
expect(loop.paused).toBe(false);
87+
});
88+
89+
test('resume() when not paused is a no-op (no error)', () => {
90+
const loop = new PauseableLoop();
91+
expect(() => loop.resume()).not.toThrow();
92+
expect(loop.paused).toBe(false);
93+
});
94+
95+
test('loop pauses before action and resumes on resume()', async () => {
96+
const loop = new PauseableLoop();
97+
const executed: string[] = [];
98+
99+
// Pause immediately so first action blocks
100+
loop.pause();
101+
102+
const loopPromise = loop.runLoop(
103+
[['a', 'b']],
104+
(label) => {
105+
executed.push(label);
106+
// After executing both actions, stop
107+
if (executed.length === 2) loop.queueDone();
108+
}
109+
);
110+
111+
// Give microtasks a chance to settle — loop should be blocked
112+
await new Promise(r => setTimeout(r, 20));
113+
expect(executed).toEqual([]);
114+
115+
// Resume — loop should execute actions then stop
116+
loop.resume();
117+
await loopPromise;
118+
expect(executed).toEqual(['a', 'b']);
119+
});
120+
121+
test('pause mid-batch stops before next action', async () => {
122+
const loop = new PauseableLoop();
123+
const executed: string[] = [];
124+
125+
const loopPromise = loop.runLoop(
126+
[['a', 'b', 'c']],
127+
(label) => {
128+
executed.push(label);
129+
if (label === 'a') loop.pause(); // pause after first action
130+
}
131+
);
132+
133+
// Wait for loop to pause after 'a'
134+
await new Promise(r => setTimeout(r, 20));
135+
expect(executed).toEqual(['a']);
136+
137+
// Resume and let it finish
138+
loop.resume();
139+
await new Promise(r => setTimeout(r, 20));
140+
// 'b' should now execute, pause again since flag was cleared by resume
141+
// Actually after resume, _paused is false, so b and c run
142+
expect(executed).toContain('b');
143+
expect(executed).toContain('c');
144+
145+
// Stop the loop
146+
loop.stop();
147+
await loopPromise;
148+
});
149+
150+
test('pause emits pause event, resume emits resume event', async () => {
151+
const loop = new PauseableLoop();
152+
const events: string[] = [];
153+
154+
loop.events.on('pause', () => events.push('pause'));
155+
loop.events.on('resume', () => events.push('resume'));
156+
157+
loop.pause();
158+
159+
const loopPromise = loop.runLoop(
160+
[['a']],
161+
() => loop.queueDone()
162+
);
163+
164+
// Wait for loop to hit _waitIfPaused and emit 'pause'
165+
await new Promise(r => setTimeout(r, 20));
166+
expect(events).toEqual(['pause']);
167+
168+
loop.resume();
169+
await loopPromise;
170+
expect(events).toEqual(['pause', 'resume']);
171+
});
172+
173+
test('stop() while paused unblocks the loop', async () => {
174+
const loop = new PauseableLoop();
175+
const executed: string[] = [];
176+
177+
loop.pause();
178+
179+
const loopPromise = loop.runLoop(
180+
[['a', 'b']],
181+
(label) => executed.push(label)
182+
);
183+
184+
await new Promise(r => setTimeout(r, 20));
185+
expect(executed).toEqual([]);
186+
187+
// stop() should set doneActing and resume so loop exits
188+
loop.stop();
189+
await loopPromise;
190+
191+
// No actions should have executed — loop exits immediately after unblocking
192+
expect(executed).toEqual([]);
193+
});
194+
195+
test('pause between batches blocks next iteration', async () => {
196+
const loop = new PauseableLoop();
197+
const executed: string[] = [];
198+
let batchCount = 0;
199+
200+
const loopPromise = loop.runLoop(
201+
[['a'], ['b']],
202+
(label) => {
203+
executed.push(label);
204+
batchCount++;
205+
// After first batch completes, pause before second batch
206+
if (batchCount === 1) loop.pause();
207+
if (batchCount === 2) loop.queueDone();
208+
}
209+
);
210+
211+
// Let first batch run, then loop should pause at between-batch check
212+
await new Promise(r => setTimeout(r, 20));
213+
expect(executed).toEqual(['a']);
214+
215+
loop.resume();
216+
await loopPromise;
217+
expect(executed).toEqual(['a', 'b']);
218+
});
219+
220+
test('multiple pause/resume cycles work correctly', async () => {
221+
const loop = new PauseableLoop();
222+
const executed: string[] = [];
223+
let actionCount = 0;
224+
225+
const loopPromise = loop.runLoop(
226+
[['x']],
227+
(label) => {
228+
actionCount++;
229+
executed.push(`${label}${actionCount}`);
230+
if (actionCount < 3) {
231+
loop.pause(); // pause after each action
232+
} else {
233+
loop.queueDone(); // done after 3rd
234+
}
235+
}
236+
);
237+
238+
// First action runs, then pauses
239+
await new Promise(r => setTimeout(r, 20));
240+
expect(executed).toEqual(['x1']);
241+
242+
loop.resume();
243+
await new Promise(r => setTimeout(r, 20));
244+
expect(executed).toEqual(['x1', 'x2']);
245+
246+
loop.resume();
247+
await loopPromise;
248+
expect(executed).toEqual(['x1', 'x2', 'x3']);
249+
});
250+
});

packages/magnitude-core/src/common/events.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,8 @@ export interface AgentEvents {
1919
'actionStarted': (action: Action) => void;
2020
'actionDone': (action: Action) => void;
2121

22+
'pause': () => void;
23+
'resume': () => void;
24+
2225
'tokensUsed': (usage: ModelUsage) => void;
2326
}

0 commit comments

Comments
 (0)