Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/models/conversational-agent/conversational-agent.models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import type { ConnectionStatus } from '@/core/websocket';
* S -->|onExchangeStart| E["ExchangeStream"]
* S -->|onSessionEnd| SE(["session closed"])
* E -->|onMessageStart| M["MessageStream"]
* E -->|sendExchangeEnd| STOP(["stop response"])
* E -->|onExchangeEnd| EE(["exchange complete"])
* M -->|onContentPartStart| CP["ContentPartStream"]
* M -->|onToolCallStart| TC["ToolCallStream"]
Expand Down Expand Up @@ -81,10 +82,20 @@ import type { ConnectionStatus } from '@/core/websocket';
* exchange.sendMessageWithContentPart({ data: 'Hello!' });
* });
*
* // 5. End session when done
* // 5. Stop a response mid-stream
* // Use sendExchangeEnd() on any active exchange to stop the agent
* session.onSessionStarted(() => {
* const exchange = session.startExchange();
* exchange.sendMessageWithContentPart({ data: 'Tell me a long story' });
*
* // Stop after 5 seconds
* setTimeout(() => exchange.sendExchangeEnd(), 5000);
* });
*
* // 6. End session when done
* conversation.endSession();
*
* // 6. Retrieve conversation history (offline)
* // 7. Retrieve conversation history (offline)
* const exchanges = await conversation.exchanges.getAll();
* ```
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,23 @@ export interface ExchangeStream {
getMessage(messageId: string): MessageStream | undefined;

/**
* Ends the exchange
* Ends the exchange. Stops further events for that exchange from being received.
* Use this to stop an in-progress agent response from the client side.
*
* @param endExchange - Optional end event data
*
* @example Manually ending an exchange
* @example Manually ending an exchange and stopping a response mid-stream
* ```typescript
* session.onExchangeStart((exchange) => {
* stopButton.addEventListener('click', () => exchange.sendExchangeEnd());
* });
* ```
*
* @example End an exchange after sending a message
* ```typescript
* const exchange = session.startExchange();
* exchange.sendMessageWithContentPart({ data: 'Hello!' });
* // Later, stop the response
* exchange.sendExchangeEnd();
* ```
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ export abstract class ExchangeEventHelper extends ConversationEventHelperBase<
}

/**
* Ends the exchange with optional end event data.
* Ends the exchange. Stops further events for that exchange from being received.
* Can be called from either the client side (to stop an in-progress response) or
* the server/agent side (to signal natural completion).
* @throws Error if exchange has already ended.
*/
public sendExchangeEnd(endExchange: ExchangeEndEvent = {}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,165 @@ describe('ExchangeEventHelper', () => {
});
});

describe('client-side stop (sendExchangeEnd mid-stream)', () => {
it('should stop exchange while assistant message is streaming', () => {
const { emitSpy, exchange } = createExchange();

// User sends a message
const userMessage = exchange.startMessage({ messageId: 'user-msg', role: MessageRole.User });
userMessage.sendContentPart({ data: 'Tell me a long story' });
userMessage.sendMessageEnd();

// Assistant starts responding (server dispatches events)
exchange.dispatch({
exchangeId: EXCHANGE_ID,
message: {
messageId: 'assistant-msg',
startMessage: { role: MessageRole.Assistant },
},
});
exchange.dispatch({
exchangeId: EXCHANGE_ID,
message: {
messageId: 'assistant-msg',
contentPart: {
contentPartId: 'cp-1',
startContentPart: { mimeType: 'text/plain' },
},
},
});
exchange.dispatch({
exchangeId: EXCHANGE_ID,
message: {
messageId: 'assistant-msg',
contentPart: {
contentPartId: 'cp-1',
chunk: { data: 'Once upon a time...' },
},
},
});

emitSpy.mockClear();

// Client triggers stop mid-stream
exchange.sendExchangeEnd();

expect(exchange.ended).toBe(true);
expect(emitSpy).toHaveBeenCalledWith(
expect.objectContaining({
exchange: expect.objectContaining({
exchangeId: EXCHANGE_ID,
endExchange: {},
}),
})
);
});

it('should prevent further message operations after client-side stop', () => {
const { exchange } = createExchange();

// Assistant is streaming
exchange.dispatch({
exchangeId: EXCHANGE_ID,
message: {
messageId: 'assistant-msg',
startMessage: { role: MessageRole.Assistant },
},
});

// Client stops the exchange
exchange.sendExchangeEnd();

// No new messages can be started
expect(() => exchange.startMessage({ messageId: 'new-msg' })).toThrow(
ConversationEventInvalidOperationError
);
});

it('should trigger onExchangeEnd handler on echo after client sends stop', () => {
const { emitSpy, exchange } = createExchange();
const endSpy = vi.fn();
exchange.onExchangeEnd(endSpy);

// Simulate assistant streaming
exchange.dispatch({
exchangeId: EXCHANGE_ID,
message: {
messageId: 'assistant-msg',
startMessage: { role: MessageRole.Assistant },
},
});
exchange.dispatch({
exchangeId: EXCHANGE_ID,
message: {
messageId: 'assistant-msg',
contentPart: {
contentPartId: 'cp-1',
chunk: { data: 'partial response...' },
},
},
});

// Client sends stop
exchange.sendExchangeEnd();

expect(emitSpy).toHaveBeenCalledWith(
expect.objectContaining({
exchange: expect.objectContaining({
exchangeId: EXCHANGE_ID,
endExchange: {},
}),
})
);

// Server echoes the endExchange back — handler fires
exchange.dispatch({
exchangeId: EXCHANGE_ID,
endExchange: {},
});

expect(endSpy).toHaveBeenCalledTimes(1);
});

it('should stop exchange during tool call execution', () => {
const { emitSpy, exchange } = createExchange();

// Assistant starts a tool call
exchange.dispatch({
exchangeId: EXCHANGE_ID,
message: {
messageId: 'assistant-msg',
startMessage: { role: MessageRole.Assistant },
},
});
exchange.dispatch({
exchangeId: EXCHANGE_ID,
message: {
messageId: 'assistant-msg',
toolCall: {
toolCallId: 'tc-1',
startToolCall: { toolName: 'search' },
},
},
});

emitSpy.mockClear();

// Client triggers stop while tool call is in progress
exchange.sendExchangeEnd();

expect(exchange.ended).toBe(true);
expect(emitSpy).toHaveBeenCalledWith(
expect.objectContaining({
exchange: expect.objectContaining({
exchangeId: EXCHANGE_ID,
endExchange: {},
}),
})
);
});
});

describe('replay', () => {
it('should generate correct event sequence', () => {
const events = Array.from(ExchangeEventHelperImpl.replay({
Expand Down
Loading