Skip to content

Commit a9c9cb3

Browse files
authored
Merge pull request #2754 from nanocoai/oss/exchange-hook
feat(runner): onExchangeComplete provider hook + slash-command interruption
2 parents 11afc64 + a619fc1 commit a9c9cb3

4 files changed

Lines changed: 245 additions & 8 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ All notable changes to NanoClaw will be documented in this file.
55
## [Unreleased]
66

77
- [BREAKING] **`@onecli-sh/sdk` 0.5.0 -> 2.2.1 — requires a OneCLI server with the `/v1` API** (older servers 404 every SDK call). The sanctioned gateway and CLI versions are pinned in `versions.json`; the `onecli` setup step enforces them. **Migration:** [docs/onecli-upgrades.md](docs/onecli-upgrades.md).
8+
- **Slash commands now interrupt an in-flight turn.** A runner-handled command (`/clear`, `/compact`, `/cost`, …) arriving mid-turn aborts the active stream and runs immediately instead of waiting out the turn.
89

910
## [2.1.0] - 2026-06-07
1011

container/agent-runner/src/integration.test.ts

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { getUndeliveredMessages } from './db/messages-out.js';
55
import { getPendingMessages } from './db/messages-in.js';
66
import { getContinuation, setContinuation } from './db/session-state.js';
77
import { MockProvider } from './providers/mock.js';
8+
import type { ProviderExchange } from './providers/types.js';
89
import { runPollLoop } from './poll-loop.js';
910

1011
beforeEach(() => {
@@ -304,6 +305,7 @@ async function runPollLoopWithTimeout(provider: MockProvider, signal: AbortSigna
304305
provider,
305306
providerName: 'mock',
306307
cwd: '/tmp',
308+
signal,
307309
}),
308310
new Promise<void>((_, reject) => {
309311
signal.addEventListener('abort', () => reject(new Error('aborted')));
@@ -324,6 +326,86 @@ function sleep(ms: number): Promise<void> {
324326
return new Promise((resolve) => setTimeout(resolve, ms));
325327
}
326328

329+
describe('poll loop — exchange hook (onExchangeComplete)', () => {
330+
// A provider that declares the per-exchange hook. The hook call is the
331+
// wiring under test — these tests go red if the poll-loop seam is severed.
332+
// What the provider DOES with an exchange (e.g. write markdown into
333+
// conversations/) ships with the provider, not the runner.
334+
class HookedMockProvider extends MockProvider {
335+
readonly exchanges: ProviderExchange[] = [];
336+
onExchangeComplete(exchange: ProviderExchange): void {
337+
this.exchanges.push(exchange);
338+
}
339+
}
340+
341+
it('reports each exchange to a provider that declares the hook', async () => {
342+
insertMessage('m1', { sender: 'Alice', text: 'please archive this' }, { platformId: 'chan-1', channelType: 'discord' });
343+
344+
const provider = new HookedMockProvider({}, () => '<message to="discord-test">archived answer</message>');
345+
const controller = new AbortController();
346+
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
347+
348+
await waitFor(() => provider.exchanges.length > 0, 2000);
349+
controller.abort();
350+
351+
expect(provider.exchanges.length).toBe(1);
352+
const exchange = provider.exchanges[0];
353+
expect(exchange.prompt).toContain('please archive this');
354+
expect(exchange.result).toContain('archived answer');
355+
expect(exchange.continuation).toStartWith('mock-session-');
356+
expect(exchange.status).toBe('completed');
357+
358+
await loopPromise.catch(() => {});
359+
});
360+
361+
it('does not report the internal wrapping-retry nudge as a user prompt', async () => {
362+
insertMessage('m1', { sender: 'Alice', text: 'wrap this later' }, { platformId: 'chan-1', channelType: 'discord' });
363+
364+
let calls = 0;
365+
const provider = new HookedMockProvider({}, () => {
366+
calls += 1;
367+
// First result is unwrapped (triggers the retry nudge), second is wrapped.
368+
return calls === 1 ? 'unwrapped text' : '<message to="discord-test">wrapped now</message>';
369+
});
370+
const controller = new AbortController();
371+
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 3000);
372+
373+
await waitFor(() => provider.exchanges.length >= 2, 3000);
374+
controller.abort();
375+
376+
// Both exchanges attribute themselves to the real user prompt, never the nudge.
377+
for (const exchange of provider.exchanges) {
378+
expect(exchange.prompt).not.toContain('Your response was not delivered');
379+
expect(exchange.prompt).toContain('wrap this later');
380+
}
381+
expect(provider.exchanges.map((e) => e.status)).toEqual(['undelivered', 'completed']);
382+
383+
await loopPromise.catch(() => {});
384+
});
385+
386+
it('a throwing hook never breaks delivery', async () => {
387+
insertMessage('m1', { sender: 'Alice', text: 'still deliver this' }, { platformId: 'chan-1', channelType: 'discord' });
388+
389+
class ThrowingHookProvider extends MockProvider {
390+
onExchangeComplete(): void {
391+
throw new Error('hook exploded');
392+
}
393+
}
394+
const provider = new ThrowingHookProvider({}, () => '<message to="discord-test">delivered anyway</message>');
395+
const controller = new AbortController();
396+
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
397+
398+
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
399+
controller.abort();
400+
401+
const out = getUndeliveredMessages();
402+
expect(out.length).toBe(1);
403+
expect(out[0].content).toContain('delivered anyway');
404+
405+
await loopPromise.catch(() => {});
406+
});
407+
});
408+
327409
describe('poll loop — provider error recovery', () => {
328410
it('writes error to outbound and continues loop on provider throw', async () => {
329411
insertMessage('m1', { sender: 'Alice', text: 'trigger error' }, { platformId: 'chan-1', channelType: 'discord' });
@@ -462,3 +544,76 @@ class InvalidSessionProvider {
462544
};
463545
}
464546
}
547+
548+
describe('poll loop — slash command during active query', () => {
549+
it('aborts the active query when /clear arrives as a follow-up', async () => {
550+
insertMessage('m-active', { sender: 'Alice', text: 'long running request' }, { platformId: 'chan-1', channelType: 'discord' });
551+
552+
const provider = new BlockingProvider();
553+
const controller = new AbortController();
554+
const loopPromise = runPollLoopWithTimeout(provider as unknown as MockProvider, controller.signal, 3000);
555+
556+
await waitFor(() => provider.queries === 1, 2000);
557+
insertMessage('m-clear-active', { sender: 'Alice', text: '/clear' }, { platformId: 'chan-1', channelType: 'discord' });
558+
559+
await waitFor(() => provider.aborts === 1, 2000);
560+
await waitFor(
561+
() => getUndeliveredMessages().some((msg) => JSON.parse(msg.content).text === 'Session cleared.'),
562+
2000,
563+
);
564+
controller.abort();
565+
566+
expect(provider.ends).toBe(0);
567+
expect(getContinuation('mock')).toBeUndefined();
568+
expect(getPendingMessages()).toHaveLength(0);
569+
570+
await loopPromise.catch(() => {});
571+
});
572+
});
573+
574+
/**
575+
* Provider whose query never completes until ended/aborted — for testing how
576+
* the loop interrupts an active stream.
577+
*/
578+
class BlockingProvider {
579+
readonly supportsNativeSlashCommands = false;
580+
queries = 0;
581+
aborts = 0;
582+
ends = 0;
583+
584+
isSessionInvalid(): boolean {
585+
return false;
586+
}
587+
588+
query() {
589+
const owner = this;
590+
this.queries += 1;
591+
let wake: (() => void) | null = null;
592+
let ended = false;
593+
let aborted = false;
594+
595+
return {
596+
push() {},
597+
end: () => {
598+
owner.ends += 1;
599+
ended = true;
600+
wake?.();
601+
},
602+
abort: () => {
603+
owner.aborts += 1;
604+
aborted = true;
605+
wake?.();
606+
},
607+
events: (async function* () {
608+
yield { type: 'activity' as const };
609+
yield { type: 'init' as const, continuation: 'blocking-session' };
610+
while (!ended && !aborted) {
611+
await new Promise<void>((resolve) => {
612+
wake = resolve;
613+
});
614+
wake = null;
615+
}
616+
})(),
617+
};
618+
}
619+
}

container/agent-runner/src/poll-loop.ts

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
type RoutingContext,
1515
} from './formatter.js';
1616
import { isUploadTraceCommand, uploadTrace } from './upload-trace.js';
17-
import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js';
17+
import type { AgentProvider, AgentQuery, ProviderEvent, ProviderExchange } from './providers/types.js';
1818

1919
const POLL_INTERVAL_MS = 1000;
2020
const ACTIVE_POLL_INTERVAL_MS = 500;
@@ -63,6 +63,12 @@ export interface PollLoopConfig {
6363
systemContext?: {
6464
instructions?: string;
6565
};
66+
/**
67+
* Optional stop signal. In production the loop runs until the container
68+
* dies; tests pass a signal so an abandoned loop actually exits instead of
69+
* polling forever and stealing messages from the next test's DB.
70+
*/
71+
signal?: AbortSignal;
6672
}
6773

6874
/**
@@ -107,6 +113,7 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
107113
let pollCount = 0;
108114
let isFirstPoll = true;
109115
while (true) {
116+
if (config.signal?.aborted) return;
110117
// Skip system messages — they're responses for MCP tools (e.g., ask_user_question)
111118
const messages = getPendingMessages(isFirstPoll).filter((m) => m.kind !== 'system');
112119
isFirstPoll = false;
@@ -232,7 +239,15 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
232239
// can stamp it on outbound rows — needed for a2a return-path routing.
233240
setCurrentInReplyTo(routing.inReplyTo);
234241
try {
235-
const result = await processQuery(query, routing, processingIds, config.providerName);
242+
const result = await processQuery(
243+
query,
244+
routing,
245+
processingIds,
246+
config.providerName,
247+
config.provider.onExchangeComplete?.bind(config.provider),
248+
prompt,
249+
continuation,
250+
);
236251
if (result.continuation && result.continuation !== continuation) {
237252
continuation = result.continuation;
238253
setContinuation(config.providerName, continuation);
@@ -313,10 +328,18 @@ async function processQuery(
313328
routing: RoutingContext,
314329
initialBatchIds: string[],
315330
providerName: string,
331+
onExchangeComplete: ((exchange: ProviderExchange) => void) | undefined,
332+
initialPrompt: string,
333+
initialContinuation: string | undefined,
316334
): Promise<QueryResult> {
317335
let queryContinuation: string | undefined;
318336
let done = false;
319337
let unwrappedNudged = false;
338+
// Prompt queue for the exchange hook — each result event consumes the
339+
// oldest unanswered prompt, except a wrapping-retry result, which answers
340+
// the same prompt again. Unused (and unmaintained) when the provider
341+
// doesn't implement `onExchangeComplete`.
342+
const archivePrompts: string[] = [initialPrompt];
320343

321344
// Concurrent polling: push follow-ups into the active query as they arrive.
322345
// We do NOT force-end the stream on silence — keeping the query open avoids
@@ -342,13 +365,16 @@ async function processQuery(
342365
// resume id (fixed at sdkQuery() time); admin/passthrough commands
343366
// (/compact, /cost, …) only dispatch when they're the first input
344367
// of a query — pushed mid-stream they arrive as plain text and
345-
// the SDK never runs them. End the stream and leave the rows
346-
// pending; the outer loop handles them on next iteration via the
347-
// canonical command path + formatMessagesWithCommands.
368+
// the SDK never runs them. Abort the active stream and leave the
369+
// rows pending; the outer loop handles them on next iteration via
370+
// the canonical command path + formatMessagesWithCommands. Abort,
371+
// not end: end() lets an in-flight turn run to completion, which
372+
// can block the command (e.g. /clear during a long task) for as
373+
// long as the turn takes.
348374
if (pending.some((m) => isRunnerCommand(m))) {
349-
log('Pending slash command — ending stream so outer loop can process');
375+
log('Pending slash command — aborting active stream so outer loop can process');
350376
endedForCommand = true;
351-
query.end();
377+
query.abort();
352378
return;
353379
}
354380

@@ -393,6 +419,7 @@ async function processQuery(
393419
log(`Pushing ${keep.length} follow-up message(s) into active query`);
394420
unwrappedNudged = false;
395421
query.push(prompt);
422+
archivePrompts.push(prompt);
396423
markCompleted(keptIds);
397424
} catch (err) {
398425
// Without this catch the rejection escapes the void IIFE and Node
@@ -456,7 +483,14 @@ async function processQuery(
456483
markCompleted(initialBatchIds);
457484
if (event.text) {
458485
const { hasUnwrapped } = dispatchResultText(event.text, routing);
459-
if (hasUnwrapped && !unwrappedNudged) {
486+
const willRetryWrapping = hasUnwrapped && !unwrappedNudged;
487+
notifyExchangeComplete(onExchangeComplete, {
488+
prompt: archivePrompts[0] ?? initialPrompt,
489+
result: event.text,
490+
continuation: queryContinuation ?? initialContinuation,
491+
status: hasUnwrapped ? 'undelivered' : 'completed',
492+
});
493+
if (willRetryWrapping) {
460494
unwrappedNudged = true;
461495
const destinations = getAllDestinations();
462496
const names = destinations.map((d) => d.name).join(', ');
@@ -467,9 +501,23 @@ async function processQuery(
467501
`Please re-send your response with the correct wrapping.</system>`,
468502
);
469503
}
504+
// The wrapping-retry result answers the SAME user prompt — keep it
505+
// queued so the retry archives against it, not the nudge text.
506+
if (!willRetryWrapping) archivePrompts.shift();
507+
} else {
508+
archivePrompts.shift();
470509
}
471510
}
472511
}
512+
} catch (err) {
513+
const errMsg = err instanceof Error ? err.message : String(err);
514+
notifyExchangeComplete(onExchangeComplete, {
515+
prompt: archivePrompts[0] ?? initialPrompt,
516+
result: `Error: ${errMsg}`,
517+
continuation: queryContinuation ?? initialContinuation,
518+
status: 'error',
519+
});
520+
throw err;
473521
} finally {
474522
done = true;
475523
clearInterval(pollHandle);
@@ -478,6 +526,18 @@ async function processQuery(
478526
return { continuation: queryContinuation };
479527
}
480528

529+
function notifyExchangeComplete(
530+
hook: ((exchange: ProviderExchange) => void) | undefined,
531+
exchange: ProviderExchange,
532+
): void {
533+
if (!hook) return;
534+
try {
535+
hook(exchange);
536+
} catch (err) {
537+
log(`onExchangeComplete failed: ${err instanceof Error ? err.message : String(err)}`);
538+
}
539+
}
540+
481541
function handleEvent(event: ProviderEvent, _routing: RoutingContext): void {
482542
switch (event.type) {
483543
case 'init':

container/agent-runner/src/providers/types.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,17 @@ export interface AgentProvider {
1414
*/
1515
readonly usesMemoryScaffold?: boolean;
1616

17+
/**
18+
* Optional. Called by the poll-loop after each completed exchange (a
19+
* result, a wrapping retry, or an error). Providers whose harness keeps no
20+
* on-disk transcript implement this to persist exchanges themselves (e.g.
21+
* markdown into the agent's `conversations/` dir); providers that persist
22+
* and archive their own transcript (e.g. the Claude Agent SDK's `.jsonl`)
23+
* omit it. Best-effort: the loop catches and logs anything it throws. The
24+
* implementation lives with the provider, never in the runner.
25+
*/
26+
onExchangeComplete?(exchange: ProviderExchange): void;
27+
1728
/** Start a new query. Returns a handle for streaming input and output. */
1829
query(input: QueryInput): AgentQuery;
1930

@@ -39,6 +50,16 @@ export interface AgentProvider {
3950
maybeRotateContinuation?(continuation: string, cwd: string): string | null;
4051
}
4152

53+
/** One prompt/result round-trip, as reported to `onExchangeComplete`. */
54+
export interface ProviderExchange {
55+
/** The user prompt this exchange answers (never an internal retry nudge). */
56+
prompt: string;
57+
result: string | null;
58+
/** Continuation/thread id in effect for the exchange, if any. */
59+
continuation?: string;
60+
status: 'completed' | 'undelivered' | 'error';
61+
}
62+
4263
/**
4364
* Options passed to provider constructors. Fields are common to most
4465
* providers; individual providers may ignore any they don't need.

0 commit comments

Comments
 (0)