Skip to content

Commit ad015fa

Browse files
authored
[Agent Builder] Add new property to public events contract to filter events by conversation id (#268440)
1 parent 816ff81 commit ad015fa

8 files changed

Lines changed: 193 additions & 15 deletions

File tree

x-pack/platform/packages/shared/agent-builder/agent-builder-browser/events/contract.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,42 @@ export interface ChatUiEventsContract {
3737
*/
3838
export interface EventsServiceStartContract {
3939
/**
40-
* (hot) observable of all chat events.
40+
* Hot observable of chat events from every conversation, interleaved.
41+
*
42+
* @deprecated With concurrent per-conversation streams, events from multiple
43+
* in-flight conversations are interleaved here and consumers cannot reliably
44+
* attribute an event to its source conversation. Use `getChatEvents$(conversationId)`
45+
* to scope a subscription to a single conversation. Will be removed in a future
46+
* release once known consumers (dashboard_agent, workflows_management) have migrated.
4147
*/
4248
chat$: Observable<BrowserChatEvent>;
49+
50+
/**
51+
* Returns a hot observable of chat events scoped to a single conversation. Subscribe
52+
* at any point during the events service's lifetime; events emit live as the agent
53+
* runs that conversation. Subscribers only see events tagged with the matching id.
54+
*
55+
* Use this in preference to `chat$` whenever your consumer cares about a specific
56+
* conversation. Concurrent streams mean `chat$` is no longer single-conversation,
57+
* and most chat event payloads do not carry a `conversation_id` field that you can
58+
* filter on directly.
59+
*
60+
* Migration patterns (these are recommendations):
61+
*
62+
* // Before — assumes only one conversation streams at a time:
63+
* events.chat$.subscribe((event) => { ... });
64+
*
65+
* // After — scope to a known conversation:
66+
* events.getChatEvents$(conversationId).subscribe((event) => { ... });
67+
*
68+
* // After — scope to whichever conversation the UI is currently focused on:
69+
* events.ui.activeConversation$.pipe(
70+
* filter((c): c is ActiveConversation => c?.id != null),
71+
* switchMap((c) => events.getChatEvents$(c.id!))
72+
* ).subscribe((event) => { ... });
73+
*/
74+
getChatEvents$: (conversationId: string) => Observable<BrowserChatEvent>;
75+
4376
/**
4477
* Chat UI-shell state observables.
4578
*/

x-pack/platform/plugins/shared/agent_builder/CONTRIBUTOR_GUIDE.md

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -924,6 +924,46 @@ class MyPlugin {
924924
}
925925
```
926926

927+
### Chat events: per-conversation streams
928+
929+
`events.chat$` exposes a hot observable of every chat event from every in-flight
930+
conversation. Concurrent per-conversation streams are now supported, so this stream
931+
interleaves events across conversations, and most event payloads do not carry a
932+
`conversation_id` you can filter on directly. **`chat$` is deprecated.**
933+
934+
Use `events.getChatEvents$(conversationId)` instead. It returns a hot observable
935+
scoped to a single conversation — every event is tagged with the conversation that
936+
produced it, and the returned stream filters to the matching tag.
937+
938+
```ts
939+
// Before — works only when a single conversation streams at a time.
940+
agentBuilder.events.chat$
941+
.pipe(filter(isRoundCompleteEvent))
942+
.subscribe((event) => { ... });
943+
944+
// After — scope to a known conversation id.
945+
agentBuilder.events
946+
.getChatEvents$(conversationId)
947+
.pipe(filter(isRoundCompleteEvent))
948+
.subscribe((event) => { ... });
949+
950+
// After — scope to whichever conversation the chat surface is currently focused on.
951+
agentBuilder.events.ui.activeConversation$
952+
.pipe(
953+
filter((c): c is ActiveConversation => c?.id != null),
954+
switchMap((c) => agentBuilder.events.getChatEvents$(c.id!))
955+
)
956+
.subscribe((event) => { ... });
957+
```
958+
959+
`switchMap` cancels the previous per-conversation subscription whenever the active
960+
conversation changes — e.g. when a consumer follows the user's focus.
961+
962+
All chat in Kibana is initiated through the agent_builder UI (sidebar, embeddable,
963+
or full-page chat), and the UI generates a client-side conversation UUID before
964+
each request. Plugins consuming this contract have access to that id via
965+
`events.ui.activeConversation$` and can pass it straight to `getChatEvents$`.
966+
927967
## Registering skills
928968

929969
**Note**: Skills are currently an experimental feature. You need to enable the `agentBuilder:experimentalFeatures` uiSetting to enable and use them.
@@ -1016,7 +1056,7 @@ does; the skill tells it *when* to do it.
10161056

10171057
### Marking a skill as experimental
10181058

1019-
Individual built-in skills can be flagged as experimental by setting `experimental: true` on their definition.
1059+
Individual built-in skills can be flagged as experimental by setting `experimental: true` on their definition.
10201060
Experimental skills are only visible and usable when the `agentBuilder:experimentalFeatures` uiSetting is enabled.
10211061

10221062
**Example:**

x-pack/platform/plugins/shared/agent_builder/public/mocks.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ const createStartContractMock = (): AgentBuilderPluginStartMock => {
6161
tools: createToolStartMock(),
6262
events: {
6363
chat$: EMPTY,
64+
getChatEvents$: jest.fn().mockReturnValue(EMPTY),
6465
ui: {
6566
activeConversation$: new BehaviorSubject(null),
6667
},

x-pack/platform/plugins/shared/agent_builder/public/services/chat/chat_service.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ interface BaseConverseParams {
2626
signal?: AbortSignal;
2727
agentId?: string;
2828
connectorId?: string;
29-
conversationId?: string;
29+
conversationId: string;
3030
browserApiTools?: BrowserApiToolMetadata[];
3131
capabilities?: AgentCapabilities;
3232
}
@@ -37,13 +37,16 @@ export type ChatParams = BaseConverseParams & {
3737
};
3838

3939
export type ResumeRoundParams = BaseConverseParams & {
40-
conversationId: string;
4140
prompts: Record<string, PromptResponse>;
4241
};
4342

44-
export type RegenerateParams = BaseConverseParams & {
45-
conversationId: string;
46-
};
43+
export type RegenerateParams = BaseConverseParams;
44+
45+
/**
46+
* Wire payload for `converse()` with `conversation_id` narrowed to required. Every
47+
* Agent Builder UI caller passes a client-generated UUID before chat fires.
48+
*/
49+
type ConversePayload = ChatRequestBodyPayload & { conversation_id: string };
4750

4851
export class ChatService {
4952
private readonly http: HttpSetup;
@@ -105,7 +108,7 @@ export class ChatService {
105108
);
106109
}
107110

108-
private converse(signal: AbortSignal | undefined, payload: ChatRequestBodyPayload) {
111+
private converse(signal: AbortSignal | undefined, payload: ConversePayload) {
109112
return defer(() => {
110113
return this.http.post(`${publicApiPath}/converse/async`, {
111114
signal,
@@ -117,7 +120,10 @@ export class ChatService {
117120
// @ts-expect-error SseEvent mixin issue
118121
httpResponseIntoObservable<ChatEvent>(),
119122
unwrapAgentBuilderErrors(),
120-
propagateEvents({ eventsService: this.events })
123+
propagateEvents({
124+
eventsService: this.events,
125+
conversationId: payload.conversation_id,
126+
})
121127
);
122128
}
123129
}

x-pack/platform/plugins/shared/agent_builder/public/services/chat/propagate_events.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,19 @@ import { tap } from 'rxjs';
1010
import type { ChatEvent } from '@kbn/agent-builder-common';
1111
import type { EventsService } from '../events';
1212

13+
/**
14+
* Forwards each event in the converse() stream to the public events service, tagged with
15+
* the conversation id so per-conversation subscribers (`getChatEvents$`) can filter to
16+
* just their conversation.
17+
*/
1318
export function propagateEvents({
1419
eventsService,
20+
conversationId,
1521
}: {
1622
eventsService: EventsService;
23+
conversationId: string;
1724
}): OperatorFunction<ChatEvent, ChatEvent> {
1825
return tap((event) => {
19-
eventsService.propagateChatEvent(event);
26+
eventsService.propagateChatEvent(conversationId, event);
2027
});
2128
}

x-pack/platform/plugins/shared/agent_builder/public/services/events/create_public_contract.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export const createPublicEventsContract = ({
1515
}): EventsServiceStartContract => {
1616
return {
1717
chat$: eventsService.obs$,
18+
getChatEvents$: (conversationId: string) => eventsService.getChatEvents$(conversationId),
1819
ui: {
1920
activeConversation$: eventsService.activeConversation$,
2021
},
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import { ChatEventType, type ChatEvent } from '@kbn/agent-builder-common';
9+
import { EventsService } from './events_service';
10+
11+
const messageChunkEvent = (chunk: string): ChatEvent =>
12+
({
13+
type: ChatEventType.messageChunk,
14+
data: { message_id: 'm1', text_chunk: chunk },
15+
} as ChatEvent);
16+
17+
describe('EventsService', () => {
18+
describe('getChatEvents$', () => {
19+
it('only emits events tagged with the matching conversation id', () => {
20+
const service = new EventsService();
21+
const fromA: ChatEvent[] = [];
22+
const fromB: ChatEvent[] = [];
23+
24+
service.getChatEvents$('A').subscribe((event) => fromA.push(event));
25+
service.getChatEvents$('B').subscribe((event) => fromB.push(event));
26+
27+
service.propagateChatEvent('A', messageChunkEvent('a1'));
28+
service.propagateChatEvent('B', messageChunkEvent('b1'));
29+
service.propagateChatEvent('A', messageChunkEvent('a2'));
30+
31+
expect(fromA.map((e) => (e.data as any).text_chunk)).toEqual(['a1', 'a2']);
32+
expect(fromB.map((e) => (e.data as any).text_chunk)).toEqual(['b1']);
33+
});
34+
35+
it('returns a hot stream — events emitted before subscription are not replayed', () => {
36+
const service = new EventsService();
37+
service.propagateChatEvent('A', messageChunkEvent('missed'));
38+
39+
const received: ChatEvent[] = [];
40+
service.getChatEvents$('A').subscribe((event) => received.push(event));
41+
42+
service.propagateChatEvent('A', messageChunkEvent('seen'));
43+
44+
expect(received.map((e) => (e.data as any).text_chunk)).toEqual(['seen']);
45+
});
46+
});
47+
48+
describe('obs$ (deprecated)', () => {
49+
it('still emits every event regardless of conversation id', () => {
50+
const service = new EventsService();
51+
const received: ChatEvent[] = [];
52+
53+
service.obs$.subscribe((event) => received.push(event));
54+
55+
service.propagateChatEvent('A', messageChunkEvent('a'));
56+
service.propagateChatEvent('B', messageChunkEvent('b'));
57+
58+
expect(received.map((e) => (e.data as any).text_chunk)).toEqual(['a', 'b']);
59+
});
60+
});
61+
});

x-pack/platform/plugins/shared/agent_builder/public/services/events/events_service.ts

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,50 @@
55
* 2.0.
66
*/
77

8-
import { BehaviorSubject, Subject, share } from 'rxjs';
8+
import type { Observable } from 'rxjs';
9+
import { BehaviorSubject, Subject, filter, map, share } from 'rxjs';
910
import type { ChatEvent } from '@kbn/agent-builder-common';
1011
import type { ActiveConversation, BrowserChatEvent } from '@kbn/agent-builder-browser/events';
1112

13+
interface TaggedChatEvent {
14+
/** The conversation that produced this event. */
15+
conversationId: string;
16+
event: BrowserChatEvent;
17+
}
18+
1219
export class EventsService {
13-
private readonly events$ = new Subject<BrowserChatEvent>();
14-
public readonly obs$ = this.events$.asObservable().pipe(share());
20+
private readonly events$ = new Subject<TaggedChatEvent>();
21+
22+
/**
23+
* @deprecated Backed by a single shared `Subject` that interleaves events from every
24+
* conversation. With concurrent per-conversation streams, consumers cannot reliably
25+
* attribute an event to its source conversation from this stream alone. Use
26+
* `getChatEvents$(conversationId)` for per-conversation scoping.
27+
*/
28+
public readonly obs$: Observable<BrowserChatEvent> = this.events$.pipe(
29+
map(({ event }) => event),
30+
share()
31+
);
1532

1633
private readonly activeConversationState$ = new BehaviorSubject<ActiveConversation | null>(null);
1734
public readonly activeConversation$ = this.activeConversationState$.asObservable();
1835

1936
constructor() {}
2037

21-
propagateChatEvent(event: ChatEvent) {
22-
this.events$.next(event);
38+
propagateChatEvent(conversationId: string, event: ChatEvent) {
39+
this.events$.next({ conversationId, event });
40+
}
41+
42+
/**
43+
* Returns a hot observable of chat events scoped to a single conversation. Subscribe
44+
* any time during the events service's lifetime; events emit live as the agent runs
45+
* the conversation. Subscribers only see events tagged with the matching id.
46+
*/
47+
getChatEvents$(conversationId: string): Observable<BrowserChatEvent> {
48+
return this.events$.pipe(
49+
filter((tagged) => tagged.conversationId === conversationId),
50+
map(({ event }) => event)
51+
);
2352
}
2453

2554
setActiveConversation(activeConversation: ActiveConversation | null) {

0 commit comments

Comments
 (0)