Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,42 @@ export interface ChatUiEventsContract {
*/
export interface EventsServiceStartContract {
/**
* (hot) observable of all chat events.
* Hot observable of chat events from every conversation, interleaved.
*
* @deprecated With concurrent per-conversation streams, events from multiple
* in-flight conversations are interleaved here and consumers cannot reliably
* attribute an event to its source conversation. Use `getChatEvents$(conversationId)`
* to scope a subscription to a single conversation. Will be removed in a future
* release once known consumers (dashboard_agent, workflows_management) have migrated.
*/
chat$: Observable<BrowserChatEvent>;

/**
* Returns a hot observable of chat events scoped to a single conversation. Subscribe
* at any point during the events service's lifetime; events emit live as the agent
* runs that conversation. Subscribers only see events tagged with the matching id.
*
* Use this in preference to `chat$` whenever your consumer cares about a specific
* conversation. Concurrent streams mean `chat$` is no longer single-conversation,
* and most chat event payloads do not carry a `conversation_id` field that you can
* filter on directly.
*
* Migration patterns (these are recommendations):
*
* // Before — assumes only one conversation streams at a time:
* events.chat$.subscribe((event) => { ... });
*
* // After — scope to a known conversation:
* events.getChatEvents$(conversationId).subscribe((event) => { ... });
*
* // After — scope to whichever conversation the UI is currently focused on:
* events.ui.activeConversation$.pipe(
* filter((c): c is ActiveConversation => c?.id != null),
* switchMap((c) => events.getChatEvents$(c.id!))
* ).subscribe((event) => { ... });
Comment thread
chrisbmar marked this conversation as resolved.
*/
getChatEvents$: (conversationId: string) => Observable<BrowserChatEvent>;

/**
* Chat UI-shell state observables.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,46 @@ class MyPlugin {
}
```

### Chat events: per-conversation streams

`events.chat$` exposes a hot observable of every chat event from every in-flight
conversation. Concurrent per-conversation streams are now supported, so this stream
interleaves events across conversations, and most event payloads do not carry a
`conversation_id` you can filter on directly. **`chat$` is deprecated.**

Use `events.getChatEvents$(conversationId)` instead. It returns a hot observable
scoped to a single conversation — every event is tagged with the conversation that
produced it, and the returned stream filters to the matching tag.

```ts
// Before — works only when a single conversation streams at a time.
agentBuilder.events.chat$
.pipe(filter(isRoundCompleteEvent))
.subscribe((event) => { ... });

// After — scope to a known conversation id.
agentBuilder.events
.getChatEvents$(conversationId)
.pipe(filter(isRoundCompleteEvent))
.subscribe((event) => { ... });

// After — scope to whichever conversation the chat surface is currently focused on.
agentBuilder.events.ui.activeConversation$
.pipe(
filter((c): c is ActiveConversation => c?.id != null),
switchMap((c) => agentBuilder.events.getChatEvents$(c.id!))
)
.subscribe((event) => { ... });
```

`switchMap` cancels the previous per-conversation subscription whenever the active
conversation changes — e.g. when a consumer follows the user's focus.

All chat in Kibana is initiated through the agent_builder UI (sidebar, embeddable,
or full-page chat), and the UI generates a client-side conversation UUID before
each request. Plugins consuming this contract have access to that id via
`events.ui.activeConversation$` and can pass it straight to `getChatEvents$`.

## Registering skills

**Note**: Skills are currently an experimental feature. You need to enable the `agentBuilder:experimentalFeatures` uiSetting to enable and use them.
Expand Down Expand Up @@ -1016,7 +1056,7 @@ does; the skill tells it *when* to do it.

### Marking a skill as experimental

Individual built-in skills can be flagged as experimental by setting `experimental: true` on their definition.
Individual built-in skills can be flagged as experimental by setting `experimental: true` on their definition.
Experimental skills are only visible and usable when the `agentBuilder:experimentalFeatures` uiSetting is enabled.

**Example:**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const createStartContractMock = (): AgentBuilderPluginStartMock => {
tools: createToolStartMock(),
events: {
chat$: EMPTY,
getChatEvents$: jest.fn().mockReturnValue(EMPTY),
ui: {
activeConversation$: new BehaviorSubject(null),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ interface BaseConverseParams {
signal?: AbortSignal;
agentId?: string;
connectorId?: string;
conversationId?: string;
conversationId: string;
browserApiTools?: BrowserApiToolMetadata[];
capabilities?: AgentCapabilities;
}
Expand All @@ -37,13 +37,16 @@ export type ChatParams = BaseConverseParams & {
};

export type ResumeRoundParams = BaseConverseParams & {
conversationId: string;
prompts: Record<string, PromptResponse>;
};

export type RegenerateParams = BaseConverseParams & {
conversationId: string;
};
export type RegenerateParams = BaseConverseParams;

/**
* Wire payload for `converse()` with `conversation_id` narrowed to required. Every
* Agent Builder UI caller passes a client-generated UUID before chat fires.
*/
type ConversePayload = ChatRequestBodyPayload & { conversation_id: string };

export class ChatService {
private readonly http: HttpSetup;
Expand Down Expand Up @@ -105,7 +108,7 @@ export class ChatService {
);
}

private converse(signal: AbortSignal | undefined, payload: ChatRequestBodyPayload) {
private converse(signal: AbortSignal | undefined, payload: ConversePayload) {
return defer(() => {
return this.http.post(`${publicApiPath}/converse/async`, {
signal,
Expand All @@ -117,7 +120,10 @@ export class ChatService {
// @ts-expect-error SseEvent mixin issue
httpResponseIntoObservable<ChatEvent>(),
unwrapAgentBuilderErrors(),
propagateEvents({ eventsService: this.events })
propagateEvents({
eventsService: this.events,
conversationId: payload.conversation_id,
})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@ import { tap } from 'rxjs';
import type { ChatEvent } from '@kbn/agent-builder-common';
import type { EventsService } from '../events';

/**
* Forwards each event in the converse() stream to the public events service, tagged with
* the conversation id so per-conversation subscribers (`getChatEvents$`) can filter to
* just their conversation.
*/
export function propagateEvents({
eventsService,
conversationId,
}: {
eventsService: EventsService;
conversationId: string;
}): OperatorFunction<ChatEvent, ChatEvent> {
return tap((event) => {
eventsService.propagateChatEvent(event);
eventsService.propagateChatEvent(conversationId, event);
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export const createPublicEventsContract = ({
}): EventsServiceStartContract => {
return {
chat$: eventsService.obs$,
getChatEvents$: (conversationId: string) => eventsService.getChatEvents$(conversationId),
ui: {
activeConversation$: eventsService.activeConversation$,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { ChatEventType, type ChatEvent } from '@kbn/agent-builder-common';
import { EventsService } from './events_service';

const messageChunkEvent = (chunk: string): ChatEvent =>
({
type: ChatEventType.messageChunk,
data: { message_id: 'm1', text_chunk: chunk },
} as ChatEvent);

describe('EventsService', () => {
describe('getChatEvents$', () => {
it('only emits events tagged with the matching conversation id', () => {
const service = new EventsService();
const fromA: ChatEvent[] = [];
const fromB: ChatEvent[] = [];

service.getChatEvents$('A').subscribe((event) => fromA.push(event));
service.getChatEvents$('B').subscribe((event) => fromB.push(event));

service.propagateChatEvent('A', messageChunkEvent('a1'));
service.propagateChatEvent('B', messageChunkEvent('b1'));
service.propagateChatEvent('A', messageChunkEvent('a2'));

expect(fromA.map((e) => (e.data as any).text_chunk)).toEqual(['a1', 'a2']);
expect(fromB.map((e) => (e.data as any).text_chunk)).toEqual(['b1']);
});

it('returns a hot stream — events emitted before subscription are not replayed', () => {
const service = new EventsService();
service.propagateChatEvent('A', messageChunkEvent('missed'));

const received: ChatEvent[] = [];
service.getChatEvents$('A').subscribe((event) => received.push(event));

service.propagateChatEvent('A', messageChunkEvent('seen'));

expect(received.map((e) => (e.data as any).text_chunk)).toEqual(['seen']);
});
});

describe('obs$ (deprecated)', () => {
it('still emits every event regardless of conversation id', () => {
const service = new EventsService();
const received: ChatEvent[] = [];

service.obs$.subscribe((event) => received.push(event));

service.propagateChatEvent('A', messageChunkEvent('a'));
service.propagateChatEvent('B', messageChunkEvent('b'));

expect(received.map((e) => (e.data as any).text_chunk)).toEqual(['a', 'b']);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,50 @@
* 2.0.
*/

import { BehaviorSubject, Subject, share } from 'rxjs';
import type { Observable } from 'rxjs';
import { BehaviorSubject, Subject, filter, map, share } from 'rxjs';
import type { ChatEvent } from '@kbn/agent-builder-common';
import type { ActiveConversation, BrowserChatEvent } from '@kbn/agent-builder-browser/events';

interface TaggedChatEvent {
/** The conversation that produced this event. */
conversationId: string;
event: BrowserChatEvent;
}

export class EventsService {
private readonly events$ = new Subject<BrowserChatEvent>();
public readonly obs$ = this.events$.asObservable().pipe(share());
private readonly events$ = new Subject<TaggedChatEvent>();

/**
* @deprecated Backed by a single shared `Subject` that interleaves events from every
* conversation. With concurrent per-conversation streams, consumers cannot reliably
* attribute an event to its source conversation from this stream alone. Use
* `getChatEvents$(conversationId)` for per-conversation scoping.
*/
public readonly obs$: Observable<BrowserChatEvent> = this.events$.pipe(
map(({ event }) => event),
share()
);

private readonly activeConversationState$ = new BehaviorSubject<ActiveConversation | null>(null);
public readonly activeConversation$ = this.activeConversationState$.asObservable();

constructor() {}

propagateChatEvent(event: ChatEvent) {
this.events$.next(event);
propagateChatEvent(conversationId: string, event: ChatEvent) {
this.events$.next({ conversationId, event });
}

/**
* Returns a hot observable of chat events scoped to a single conversation. Subscribe
* any time during the events service's lifetime; events emit live as the agent runs
* the conversation. Subscribers only see events tagged with the matching id.
*/
getChatEvents$(conversationId: string): Observable<BrowserChatEvent> {
return this.events$.pipe(
filter((tagged) => tagged.conversationId === conversationId),
map(({ event }) => event)
);
}

setActiveConversation(activeConversation: ActiveConversation | null) {
Expand Down
Loading