Skip to content

Commit 90f832a

Browse files
authored
[Agent Builder] Lift streaming provider + remove 'new' cache key (elastic#267324)
1 parent f085474 commit 90f832a

35 files changed

Lines changed: 1060 additions & 723 deletions

x-pack/platform/plugins/shared/agent_builder/CONTRIBUTOR_GUIDE.md

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,3 +1280,98 @@ setupDeps.agentBuilder.sml.registerType(visualizationSmlType);
12801280

12811281
The full implementation is ~130 lines and serves as the reference for new types.
12821282

1283+
## Streams lifecycle (frontend)
1284+
1285+
The chat streaming layer lives in `public/application/context/send_message/`. This section
1286+
documents how mutations, lifted state, and the React Query cache fit together. Read this
1287+
before touching any of:
1288+
`send_message_context.tsx`, `use_send_message.ts`, `use_send_message_mutation.ts`,
1289+
`use_resume_round_mutation.ts`, `use_subscribe_to_chat_events.ts`,
1290+
`use_is_any_conversation_streaming.ts`.
1291+
1292+
### The lift
1293+
1294+
`<SendMessageProvider>` is mounted **once** above the routes/sidebar:
1295+
1296+
- Routed app: in `mount.tsx`, above `<AgentBuilderRoutes>`. The sidebar is part of the
1297+
routes, so it can read streaming state directly.
1298+
- Embeddable: in `embeddable_conversations_provider.tsx`, one provider per embeddable
1299+
instance because each instance has its own `QueryClient`.
1300+
1301+
The sidebar uses `useIsAnyConversationStreaming()` and `useSendMessageContext()` directly.
1302+
Anything inside the conversation tree should use the per-conversation scoped hook,
1303+
`useSendMessage()` (in `use_send_message.ts`).
1304+
1305+
### Lifted state
1306+
1307+
`SendMessageProvider` owns:
1308+
1309+
- `activeStream: { conversationId, type, agentReasoning } | undefined` — points at the
1310+
conversation that is currently streaming. Set synchronously when each mutation kicks
1311+
off; cleared in the mutation's `finally`.
1312+
- `byConversationId: Record<string, StreamRecord>` — per-conversation `pendingMessage`,
1313+
`error`, `errorSteps`. Persists across stream end so a user can hit Retry after a
1314+
failure.
1315+
1316+
### Mutations: single-scope `mutationFn`
1317+
1318+
`useSendMessageMutation` and `useResumeRoundMutation` use a **single-scope `mutationFn`
1319+
with `try / catch / finally`**, not React Query's lifecycle methods (`onMutate`,
1320+
`onSuccess`, `onError`, `onSettled`). The shape is:
1321+
1322+
```ts
1323+
mutationFn: async (vars) => {
1324+
// setup phase (sync, before any await): seed the optimistic round, set pending message.
1325+
// Note: `activeStream` is set by the provider's `mutateSendMessage` wrapper *before*
1326+
// `mutate()` returns — not here.
1327+
const streamActions = createConversationActions({ conversationId: vars.conversationId, ... });
1328+
1329+
try {
1330+
await subscribeToChatEvents({ events$, conversationActions: streamActions, ... });
1331+
// success cleanup
1332+
} catch (err) {
1333+
// error cleanup
1334+
throw err;
1335+
} finally {
1336+
// cleanup: invalidate cache (skipped if round paused on HITL),
1337+
// clear `activeStream`, clear abort ref.
1338+
}
1339+
}
1340+
```
1341+
1342+
**Why not lifecycle methods?** Streams aren't typical mutations — the bulk of the work
1343+
happens *during* `mutationFn`, with state mutations flowing for many seconds. Splitting
1344+
the work across lifecycle callbacks forces you to bridge state between scopes via refs
1345+
or React Query's `context` return — neither is clean for streaming. With single-scope,
1346+
`streamActions` and `vars` are visible throughout. No refs to bridge phases. Reads
1347+
top-to-bottom.
1348+
1349+
### Each conversation owns its streaming lifecycle
1350+
1351+
Every `mutationFn` invocation builds its **own** `streamActions` instance via
1352+
`createConversationActions({ conversationId: vars.conversationId, ... })`. That instance
1353+
is closure-bound to the mutation's conversation id. **Stream events target the
1354+
conversation the mutation was started for, regardless of where the user has navigated.**
1355+
1356+
If the user submits on conversation A and immediately switches to B, the stream events
1357+
keep writing to A's cache. B loads cleanly from the server.
1358+
1359+
### Per-conversation `useConversation` gate
1360+
1361+
`useConversation` is disabled for a conversation when (a) a stream is currently writing
1362+
to its cache, or (b) the cache shows it's paused on a HITL prompt. The cache is
1363+
authoritative in both cases, so a refetch would race with optimistic chunks (streaming)
1364+
or with the resume mutation about to fire on Approve (HITL). Other conversations stay
1365+
free to refetch — switch to conversation B while A streams and B loads cleanly. See the
1366+
inline comment on the `enabled` predicate for details.
1367+
1368+
### Single-stream vs concurrent streams
1369+
1370+
Today the app enforces single-stream-at-a-time. The global gates (HITL Approve,
1371+
submit button, page-leave guard) all read `useIsAnyConversationStreaming()`.
1372+
1373+
The architecture supports concurrent streams in principle — per-conversation cache,
1374+
mutation-scoped `streamActions`, lifted `byConversationId`. The follow-up PR removes
1375+
the global gates, moves the abort controller into a per-conversation slot so each
1376+
stream can be cancelled independently, and enables concurrent streams.
1377+

x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation.tsx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import { NewConversationPrompt } from './new_conversation_prompt';
2626
import { useConversationId } from '../../context/conversation/use_conversation_id';
2727
import { useShouldStickToBottom } from '../../context/conversation/use_should_stick_to_bottom';
2828
import { useSendMessage } from '../../context/send_message/send_message_context';
29+
import { useIsAnyConversationStreaming } from '../../hooks/use_is_any_conversation_streaming';
2930
import { useConversationScrollActions } from '../../hooks/use_conversation_scroll_actions';
3031
import { useConversationStatus } from '../../hooks/use_conversation';
3132
import { useSendPredefinedInitialMessage } from '../../hooks/use_initial_message';
@@ -53,6 +54,7 @@ export const Conversation: React.FC<{}> = () => {
5354
const conversationId = useConversationId();
5455
const hasActiveConversation = useHasActiveConversation();
5556
const { isResponseLoading } = useSendMessage();
57+
const isAnyStreaming = useIsAnyConversationStreaming();
5658
const conversationRounds = useConversationRounds();
5759
const lastRound = conversationRounds.at(-1);
5860
const { isFetched } = useConversationStatus();
@@ -65,9 +67,10 @@ export const Conversation: React.FC<{}> = () => {
6567
const [dismissStaleAttachments, setDismissStaleAttachments] = useState(false);
6668
useSendPredefinedInitialMessage();
6769

70+
// Page-leave guard fires for any in-flight stream, not just this conversation's.
6871
useNavigationAbort({
6972
onAppLeave,
70-
isResponseLoading,
73+
isResponseLoading: isAnyStreaming,
7174
});
7275

7376
const scrollContainerRef = useRef<HTMLDivElement | null>(null);

x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/conversation_input.tsx

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ import type { PropsWithChildren } from 'react';
1818
import React, { useEffect, useMemo } from 'react';
1919
import { useConversationId } from '../../../context/conversation/use_conversation_id';
2020
import { useSendMessage } from '../../../context/send_message/send_message_context';
21+
import { useSubmitMessage } from '../../../hooks/use_submit_message';
2122
import { useAgentBuilderAgents } from '../../../hooks/agents/use_agents';
2223
import { useValidateAgentId } from '../../../hooks/agents/use_validate_agent_id';
23-
import { useIsSendingMessage } from '../../../hooks/use_is_sending_message';
24+
// Submit is gated globally on any-conversation streaming until concurrent streams are
25+
// unblocked in a future PR — at which point it becomes a per-conversation check.
26+
import { useIsAnyConversationStreaming } from '../../../hooks/use_is_any_conversation_streaming';
2427
import {
2528
useAgentId,
2629
useConversationTitle,
@@ -144,8 +147,8 @@ export const ConversationInput: React.FC<ConversationInputProps> = ({
144147
onSubmit,
145148
onEditorFocus,
146149
}) => {
147-
const isSendingMessage = useIsSendingMessage();
148-
const { sendMessage, pendingMessage, error, isResuming } = useSendMessage();
150+
const isSendingMessage = useIsAnyConversationStreaming();
151+
const { pendingMessage, error, isResuming } = useSendMessage();
149152
const { isFetched } = useAgentBuilderAgents();
150153
const agentId = useAgentId();
151154
const conversationId = useConversationId();
@@ -158,6 +161,7 @@ export const ConversationInput: React.FC<ConversationInputProps> = ({
158161
const isAwaitingPrompt = useIsAwaitingPrompt();
159162
const { attachments, initialMessage, autoSendInitialMessage, resetInitialMessage } =
160163
useConversationContext();
164+
const submitMessage = useSubmitMessage();
161165

162166
const validateAgentId = useValidateAgentId();
163167
const isAgentIdValid = validateAgentId(agentId);
@@ -241,7 +245,7 @@ export const ConversationInput: React.FC<ConversationInputProps> = ({
241245
}
242246
return;
243247
}
244-
sendMessage({ message: content });
248+
submitMessage(content);
245249
messageEditorController.clear();
246250
onSubmit?.();
247251
};

x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/input_actions/connector_selector/connector_selector.test.tsx

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ jest.mock('../../../../../hooks/use_kibana', () => ({
1818
useKibana: jest.fn(),
1919
}));
2020

21-
jest.mock('../../../../../context/send_message/send_message_context', () => ({
22-
useSendMessage: jest.fn(),
21+
jest.mock('../../../../../hooks/chat/use_connector_selection', () => ({
22+
useConnectorSelection: jest.fn(),
2323
}));
2424

2525
jest.mock('../../../../../hooks/chat/use_default_connector', () => ({
@@ -74,13 +74,15 @@ jest.mock('./connector_icon', () => ({
7474

7575
import { useLoadConnectors } from '@kbn/inference-connectors';
7676
import { useKibana } from '../../../../../hooks/use_kibana';
77-
import { useSendMessage } from '../../../../../context/send_message/send_message_context';
77+
import { useConnectorSelection } from '../../../../../hooks/chat/use_connector_selection';
7878
import { useDefaultConnector } from '../../../../../hooks/chat/use_default_connector';
7979
import { ConnectorSelector } from './connector_selector';
8080

8181
const mockUseLoadConnectors = useLoadConnectors as jest.MockedFunction<typeof useLoadConnectors>;
8282
const mockUseKibana = useKibana as jest.MockedFunction<typeof useKibana>;
83-
const mockUseSendMessage = useSendMessage as jest.MockedFunction<typeof useSendMessage>;
83+
const mockUseConnectorSelection = useConnectorSelection as jest.MockedFunction<
84+
typeof useConnectorSelection
85+
>;
8486
const mockUseDefaultConnector = useDefaultConnector as jest.MockedFunction<
8587
typeof useDefaultConnector
8688
>;
@@ -137,14 +139,12 @@ const setup = ({
137139

138140
const selectConnector = jest.fn();
139141

140-
mockUseSendMessage.mockReturnValue({
141-
connectorSelection: {
142-
selectedConnector,
143-
selectConnector,
144-
defaultConnectorId,
145-
defaultConnectorOnly,
146-
},
147-
} as any);
142+
mockUseConnectorSelection.mockReturnValue({
143+
selectedConnector,
144+
selectConnector,
145+
defaultConnectorId,
146+
defaultConnectorOnly,
147+
});
148148

149149
const utils = render(
150150
<IntlProvider locale="en">
@@ -154,17 +154,15 @@ const setup = ({
154154
return {
155155
...utils,
156156
selectConnector,
157-
// Helper to re-render with a new send-message context (simulates admin changing a setting).
157+
// Helper to re-render with a new connector selection (simulates admin changing a setting).
158158
updateContext: (next: Partial<RenderOptions>) => {
159-
mockUseSendMessage.mockReturnValue({
160-
connectorSelection: {
161-
selectedConnector: next.selectedConnector ?? selectedConnector,
162-
selectConnector,
163-
defaultConnectorId:
164-
'defaultConnectorId' in next ? next.defaultConnectorId : defaultConnectorId,
165-
defaultConnectorOnly: next.defaultConnectorOnly ?? defaultConnectorOnly,
166-
},
167-
} as any);
159+
mockUseConnectorSelection.mockReturnValue({
160+
selectedConnector: next.selectedConnector ?? selectedConnector,
161+
selectConnector,
162+
defaultConnectorId:
163+
'defaultConnectorId' in next ? next.defaultConnectorId : defaultConnectorId,
164+
defaultConnectorOnly: next.defaultConnectorOnly ?? defaultConnectorOnly,
165+
});
168166
act(() => {
169167
utils.rerender(
170168
<IntlProvider locale="en">

x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_input/input_actions/connector_selector/connector_selector.tsx

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { FormattedMessage } from '@kbn/i18n-react';
2121
import React, { useEffect, useMemo, useRef, useState } from 'react';
2222
import { useUiPrivileges } from '../../../../../hooks/use_ui_privileges';
2323
import { useNavigation } from '../../../../../hooks/use_navigation';
24-
import { useSendMessage } from '../../../../../context/send_message/send_message_context';
24+
import { useConnectorSelection } from '../../../../../hooks/chat/use_connector_selection';
2525
import { useDefaultConnector } from '../../../../../hooks/chat/use_default_connector';
2626
import { useKibana } from '../../../../../hooks/use_kibana';
2727
import {
@@ -165,13 +165,11 @@ export const ConnectorSelector: React.FC<{}> = () => {
165165
services: { http, settings },
166166
} = useKibana();
167167
const {
168-
connectorSelection: {
169-
selectConnector: onSelectConnector,
170-
selectedConnector: selectedConnectorId,
171-
defaultConnectorId,
172-
defaultConnectorOnly,
173-
},
174-
} = useSendMessage();
168+
selectConnector: onSelectConnector,
169+
selectedConnector: selectedConnectorId,
170+
defaultConnectorId,
171+
defaultConnectorOnly,
172+
} = useConnectorSelection();
175173
const [isPopoverOpen, setIsPopoverOpen] = useState(false);
176174

177175
const { data: aiConnectors, isLoading } = useLoadConnectors({

x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_layout.tsx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { RoundInput } from './round_input';
2121
import { RoundThinking } from './round_thinking/round_thinking';
2222
import { RoundResponse } from './round_response/round_response';
2323
import { useSendMessage } from '../../../context/send_message/send_message_context';
24+
import { useIsAnyConversationStreaming } from '../../../hooks/use_is_any_conversation_streaming';
2425
import { RoundError } from './round_error/round_error';
2526
import { ConfirmationPrompt } from './round_prompt';
2627
import { RoundAttachmentReferences } from './round_attachment_references';
@@ -88,6 +89,12 @@ export const RoundLayout: React.FC<RoundLayoutProps> = ({
8889
resumeRound,
8990
isResuming,
9091
} = useSendMessage();
92+
// Approve / Cancel for HITL must be gated on global streaming state: while ANY other
93+
// conversation is streaming, racing two mutations against the same single-stream
94+
// backend would corrupt cache state. This becomes a per-conversation check in the
95+
// concurrent-streams follow-up PR.
96+
const isAnyStreaming = useIsAnyConversationStreaming();
97+
const isHitlDisabled = isAnyStreaming && !isResuming;
9198

9299
const isLoadingCurrentRound = isResponseLoading && isCurrentRound;
93100
const isErrorCurrentRound = Boolean(error) && isCurrentRound;
@@ -193,6 +200,7 @@ export const RoundLayout: React.FC<RoundLayoutProps> = ({
193200
onConfirm={() => handlePromptResponse(prompt.id, true)}
194201
onCancel={() => handlePromptResponse(prompt.id, false)}
195202
isLoading={isResuming}
203+
isDisabled={isHitlDisabled}
196204
isAnswered={promptResponses[prompt.id] !== undefined}
197205
answeredValue={promptResponses[prompt.id]?.allow}
198206
/>

x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_prompt/confirmation_prompt.tsx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ export interface ConfirmationPromptProps {
5050
onConfirm: () => void;
5151
onCancel: () => void;
5252
isLoading?: boolean;
53+
isDisabled?: boolean;
5354
isAnswered?: boolean;
5455
answeredValue?: boolean;
5556
}
@@ -59,6 +60,7 @@ export const ConfirmationPrompt: React.FC<ConfirmationPromptProps> = ({
5960
onConfirm,
6061
onCancel,
6162
isLoading = false,
63+
isDisabled = false,
6264
isAnswered = false,
6365
answeredValue,
6466
}) => {
@@ -120,7 +122,7 @@ export const ConfirmationPrompt: React.FC<ConfirmationPromptProps> = ({
120122
<EuiFlexItem grow={false}>
121123
<EuiButtonEmpty
122124
onClick={onCancel}
123-
disabled={isLoading || isAnswered}
125+
disabled={isDisabled || isLoading || isAnswered}
124126
size="s"
125127
color={isAnswered && answeredValue === false ? 'danger' : 'text'}
126128
data-test-subj="agentBuilderConfirmationPromptCancelButton"
@@ -132,7 +134,7 @@ export const ConfirmationPrompt: React.FC<ConfirmationPromptProps> = ({
132134
<EuiButton
133135
onClick={onConfirm}
134136
isLoading={isLoading}
135-
disabled={isAnswered}
137+
disabled={isDisabled || isAnswered}
136138
fill={!isAnswered || answeredValue === true}
137139
size="s"
138140
color={isAnswered && answeredValue === true ? 'success' : color}

x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_response/chat_message_text.test.tsx

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,9 @@ describe('chat_message_text', () => {
112112
isEmbeddedContext: false,
113113
browserApiTools: undefined,
114114
conversationActions: {
115-
removeNewConversationQuery: jest.fn(),
116115
invalidateConversation: jest.fn(),
117116
addOptimisticRound: jest.fn(),
118117
removeOptimisticRound: jest.fn(),
119-
setAgentId: jest.fn(),
120118
addReasoningStep: jest.fn(),
121119
addToolCall: jest.fn(),
122120
setToolCallProgress: jest.fn(),

x-pack/platform/plugins/shared/agent_builder/public/application/components/conversations/conversation_rounds/round_response/streaming_text.tsx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ export const StreamingText = ({
3131
attachmentRefs,
3232
conversationId,
3333
}: StreamingTextProps) => {
34-
const [displayedText, setDisplayedText] = useState('');
34+
// Initial state derives from the content already in the cache so navigating away and back
35+
// mid-stream doesn't replay the full text. Only chunks arriving AFTER mount get animated.
36+
const [displayedText, setDisplayedText] = useState(content);
3537
const tokenQueueRef = useRef<string[]>([]);
3638
const intervalRef = useRef<NodeJS.Timeout | null>(null);
37-
const previousContentLengthRef = useRef(0);
39+
const previousContentLengthRef = useRef(content.length);
3840

3941
useEffect(() => {
4042
const previousContentLength = previousContentLengthRef.current;

0 commit comments

Comments
 (0)