diff --git a/src/models/conversational-agent/conversational-agent.models.ts b/src/models/conversational-agent/conversational-agent.models.ts index 07eca99c5..c0ff40e5b 100644 --- a/src/models/conversational-agent/conversational-agent.models.ts +++ b/src/models/conversational-agent/conversational-agent.models.ts @@ -38,6 +38,7 @@ import type { ConnectionStatus } from '@/core/websocket'; * S -->|onExchangeStart| E["ExchangeStream"] * S -->|onSessionEnd| SE(["session closed"]) * E -->|onMessageStart| M["MessageStream"] + * E -->|sendExchangeEnd| STOP(["stop response"]) * E -->|onExchangeEnd| EE(["exchange complete"]) * M -->|onContentPartStart| CP["ContentPartStream"] * M -->|onToolCallStart| TC["ToolCallStream"] @@ -81,10 +82,20 @@ import type { ConnectionStatus } from '@/core/websocket'; * exchange.sendMessageWithContentPart({ data: 'Hello!' }); * }); * - * // 5. End session when done + * // 5. Stop a response mid-stream + * // Use sendExchangeEnd() on any active exchange to stop the agent + * session.onSessionStarted(() => { + * const exchange = session.startExchange(); + * exchange.sendMessageWithContentPart({ data: 'Tell me a long story' }); + * + * // Stop after 5 seconds + * setTimeout(() => exchange.sendExchangeEnd(), 5000); + * }); + * + * // 6. End session when done * conversation.endSession(); * - * // 6. Retrieve conversation history (offline) + * // 7. Retrieve conversation history (offline) * const exchanges = await conversation.exchanges.getAll(); * ``` */ diff --git a/src/models/conversational-agent/conversations/types/events/exchange.types.ts b/src/models/conversational-agent/conversations/types/events/exchange.types.ts index 3b9e8522b..7b7aa5b13 100644 --- a/src/models/conversational-agent/conversations/types/events/exchange.types.ts +++ b/src/models/conversational-agent/conversations/types/events/exchange.types.ts @@ -228,12 +228,23 @@ export interface ExchangeStream { getMessage(messageId: string): MessageStream | undefined; /** - * Ends the exchange + * Ends the exchange. Stops further events for that exchange from being received. + * Use this to stop an in-progress agent response from the client side. * * @param endExchange - Optional end event data * - * @example Manually ending an exchange + * @example Manually ending an exchange and stopping a response mid-stream * ```typescript + * session.onExchangeStart((exchange) => { + * stopButton.addEventListener('click', () => exchange.sendExchangeEnd()); + * }); + * ``` + * + * @example End an exchange after sending a message + * ```typescript + * const exchange = session.startExchange(); + * exchange.sendMessageWithContentPart({ data: 'Hello!' }); + * // Later, stop the response * exchange.sendExchangeEnd(); * ``` */ diff --git a/src/services/conversational-agent/helpers/exchange-event-helper.ts b/src/services/conversational-agent/helpers/exchange-event-helper.ts index a250e9d8b..6fb8eb154 100644 --- a/src/services/conversational-agent/helpers/exchange-event-helper.ts +++ b/src/services/conversational-agent/helpers/exchange-event-helper.ts @@ -140,7 +140,9 @@ export abstract class ExchangeEventHelper extends ConversationEventHelperBase< } /** - * Ends the exchange with optional end event data. + * Ends the exchange. Stops further events for that exchange from being received. + * Can be called from either the client side (to stop an in-progress response) or + * the server/agent side (to signal natural completion). * @throws Error if exchange has already ended. */ public sendExchangeEnd(endExchange: ExchangeEndEvent = {}) { diff --git a/tests/unit/helpers/conversational-agent/exchange-event-helper.test.ts b/tests/unit/helpers/conversational-agent/exchange-event-helper.test.ts index d348c2f65..06c5d1228 100644 --- a/tests/unit/helpers/conversational-agent/exchange-event-helper.test.ts +++ b/tests/unit/helpers/conversational-agent/exchange-event-helper.test.ts @@ -680,6 +680,165 @@ describe('ExchangeEventHelper', () => { }); }); + describe('client-side stop (sendExchangeEnd mid-stream)', () => { + it('should stop exchange while assistant message is streaming', () => { + const { emitSpy, exchange } = createExchange(); + + // User sends a message + const userMessage = exchange.startMessage({ messageId: 'user-msg', role: MessageRole.User }); + userMessage.sendContentPart({ data: 'Tell me a long story' }); + userMessage.sendMessageEnd(); + + // Assistant starts responding (server dispatches events) + exchange.dispatch({ + exchangeId: EXCHANGE_ID, + message: { + messageId: 'assistant-msg', + startMessage: { role: MessageRole.Assistant }, + }, + }); + exchange.dispatch({ + exchangeId: EXCHANGE_ID, + message: { + messageId: 'assistant-msg', + contentPart: { + contentPartId: 'cp-1', + startContentPart: { mimeType: 'text/plain' }, + }, + }, + }); + exchange.dispatch({ + exchangeId: EXCHANGE_ID, + message: { + messageId: 'assistant-msg', + contentPart: { + contentPartId: 'cp-1', + chunk: { data: 'Once upon a time...' }, + }, + }, + }); + + emitSpy.mockClear(); + + // Client triggers stop mid-stream + exchange.sendExchangeEnd(); + + expect(exchange.ended).toBe(true); + expect(emitSpy).toHaveBeenCalledWith( + expect.objectContaining({ + exchange: expect.objectContaining({ + exchangeId: EXCHANGE_ID, + endExchange: {}, + }), + }) + ); + }); + + it('should prevent further message operations after client-side stop', () => { + const { exchange } = createExchange(); + + // Assistant is streaming + exchange.dispatch({ + exchangeId: EXCHANGE_ID, + message: { + messageId: 'assistant-msg', + startMessage: { role: MessageRole.Assistant }, + }, + }); + + // Client stops the exchange + exchange.sendExchangeEnd(); + + // No new messages can be started + expect(() => exchange.startMessage({ messageId: 'new-msg' })).toThrow( + ConversationEventInvalidOperationError + ); + }); + + it('should trigger onExchangeEnd handler on echo after client sends stop', () => { + const { emitSpy, exchange } = createExchange(); + const endSpy = vi.fn(); + exchange.onExchangeEnd(endSpy); + + // Simulate assistant streaming + exchange.dispatch({ + exchangeId: EXCHANGE_ID, + message: { + messageId: 'assistant-msg', + startMessage: { role: MessageRole.Assistant }, + }, + }); + exchange.dispatch({ + exchangeId: EXCHANGE_ID, + message: { + messageId: 'assistant-msg', + contentPart: { + contentPartId: 'cp-1', + chunk: { data: 'partial response...' }, + }, + }, + }); + + // Client sends stop + exchange.sendExchangeEnd(); + + expect(emitSpy).toHaveBeenCalledWith( + expect.objectContaining({ + exchange: expect.objectContaining({ + exchangeId: EXCHANGE_ID, + endExchange: {}, + }), + }) + ); + + // Server echoes the endExchange back — handler fires + exchange.dispatch({ + exchangeId: EXCHANGE_ID, + endExchange: {}, + }); + + expect(endSpy).toHaveBeenCalledTimes(1); + }); + + it('should stop exchange during tool call execution', () => { + const { emitSpy, exchange } = createExchange(); + + // Assistant starts a tool call + exchange.dispatch({ + exchangeId: EXCHANGE_ID, + message: { + messageId: 'assistant-msg', + startMessage: { role: MessageRole.Assistant }, + }, + }); + exchange.dispatch({ + exchangeId: EXCHANGE_ID, + message: { + messageId: 'assistant-msg', + toolCall: { + toolCallId: 'tc-1', + startToolCall: { toolName: 'search' }, + }, + }, + }); + + emitSpy.mockClear(); + + // Client triggers stop while tool call is in progress + exchange.sendExchangeEnd(); + + expect(exchange.ended).toBe(true); + expect(emitSpy).toHaveBeenCalledWith( + expect.objectContaining({ + exchange: expect.objectContaining({ + exchangeId: EXCHANGE_ID, + endExchange: {}, + }), + }) + ); + }); + }); + describe('replay', () => { it('should generate correct event sequence', () => { const events = Array.from(ExchangeEventHelperImpl.replay({