Skip to content

Commit cc34f09

Browse files
authored
[Dashboard Agent] Scope Agent Builder event subscriptions by conversation (#268766)
## Summary Rewrite after merging #268440 Migrates Dashboard Agent off the deprecated shared `agentBuilder.events.chat$` stream and onto the new per-conversation `agentBuilder.events.getChatEvents$(conversationId)` API. This prevents dashboard live updates from reacting to events emitted by other concurrent Agent Builder conversations and adjusts our code to the new api.
1 parent 8e9fbd3 commit cc34f09

4 files changed

Lines changed: 143 additions & 70 deletions

File tree

x-pack/platform/plugins/shared/dashboard_agent/public/attachment_types/dashboard_integration/agent_live_updates_subscription.ts

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* 2.0.
66
*/
77

8-
import { filter, type Subscription } from 'rxjs';
8+
import { EMPTY, filter, switchMap, type Subscription } from 'rxjs';
99
import { isRoundCompleteEvent } from '@kbn/agent-builder-common';
1010
import { ATTACHMENT_REF_OPERATION, getLatestVersion } from '@kbn/agent-builder-common/attachments';
1111
import type { AgentBuilderPluginStart } from '@kbn/agent-builder-browser';
@@ -28,53 +28,60 @@ export const createAgentLiveUpdatesSubscription = ({
2828
api,
2929
setAttachments,
3030
}: AgentLiveUpdatesSubscriptionParams): Subscription =>
31-
agentBuilder.events.chat$.pipe(filter(isRoundCompleteEvent)).subscribe((event) => {
32-
const dashboardAttachments = event.data.attachments?.filter(isDashboardAttachment) ?? [];
33-
const incomingAttachments = dashboardAttachments.filter((attachment) => {
34-
return (
35-
event.data.round.input.attachment_refs?.some(
36-
(ref) =>
37-
ref.attachment_id === attachment.id &&
38-
(ref.operation === ATTACHMENT_REF_OPERATION.updated ||
39-
ref.operation === ATTACHMENT_REF_OPERATION.created)
40-
) === true
41-
);
42-
});
31+
agentBuilder.events.ui.activeConversation$
32+
.pipe(
33+
switchMap((conversation) =>
34+
conversation?.id ? agentBuilder.events.getChatEvents$(conversation.id) : EMPTY
35+
),
36+
filter(isRoundCompleteEvent)
37+
)
38+
.subscribe((event) => {
39+
const dashboardAttachments = event.data.attachments?.filter(isDashboardAttachment) ?? [];
40+
const incomingAttachments = dashboardAttachments.filter((attachment) => {
41+
return (
42+
event.data.round.input.attachment_refs?.some(
43+
(ref) =>
44+
ref.attachment_id === attachment.id &&
45+
(ref.operation === ATTACHMENT_REF_OPERATION.updated ||
46+
ref.operation === ATTACHMENT_REF_OPERATION.created)
47+
) === true
48+
);
49+
});
4350

44-
setAttachments(
45-
dashboardAttachments
46-
.map((attachment): DashboardAttachment | undefined => {
47-
const latestVersionData = getLatestVersion(attachment)?.data;
48-
return latestVersionData
49-
? {
50-
id: attachment.id,
51-
type: attachment.type,
52-
data: latestVersionData,
53-
origin: attachment.origin,
54-
}
55-
: undefined;
56-
})
57-
.filter((attachment): attachment is DashboardAttachment => attachment !== undefined)
58-
);
51+
setAttachments(
52+
dashboardAttachments
53+
.map((attachment): DashboardAttachment | undefined => {
54+
const latestVersionData = getLatestVersion(attachment)?.data;
55+
return latestVersionData
56+
? {
57+
id: attachment.id,
58+
type: attachment.type,
59+
data: latestVersionData,
60+
origin: attachment.origin,
61+
}
62+
: undefined;
63+
})
64+
.filter((attachment): attachment is DashboardAttachment => attachment !== undefined)
65+
);
5966

60-
// TODO: we're assuming only one attachment is coming in at a time
61-
const incomingAttachment = incomingAttachments?.at(0);
62-
if (!incomingAttachment) {
63-
return;
64-
}
67+
// TODO: we're assuming only one attachment is coming in at a time
68+
const incomingAttachment = incomingAttachments?.at(0);
69+
if (!incomingAttachment) {
70+
return;
71+
}
6572

66-
const currentSavedObjectId = api.savedObjectId$.getValue();
73+
const currentSavedObjectId = api.savedObjectId$.getValue();
6774

68-
// Skip if viewing a saved dashboard that differs from the attachment's linked dashboard
69-
if (currentSavedObjectId && incomingAttachment.origin !== currentSavedObjectId) {
70-
return;
71-
}
75+
// Skip if viewing a saved dashboard that differs from the attachment's linked dashboard
76+
if (currentSavedObjectId && incomingAttachment.origin !== currentSavedObjectId) {
77+
return;
78+
}
7279

73-
const latestVersionData = getLatestVersion(incomingAttachment)?.data;
80+
const latestVersionData = getLatestVersion(incomingAttachment)?.data;
7481

75-
if (!latestVersionData) {
76-
return;
77-
}
82+
if (!latestVersionData) {
83+
return;
84+
}
7885

79-
api.setState(attachmentDataToDashboardState(latestVersionData));
80-
});
86+
api.setState(attachmentDataToDashboardState(latestVersionData));
87+
});

x-pack/platform/plugins/shared/dashboard_agent/public/attachment_types/dashboard_integration/dashboard_app_integration.test.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ const createActiveConversation = ({
188188

189189
describe('registerDashboardAppIntegration', () => {
190190
let mockApi: MockDashboardApi;
191-
let chat$: Subject<ChatEvent>;
191+
let chatEventsByConversationId: Map<string, Subject<ChatEvent>>;
192192
let addAttachment: jest.Mock;
193193
let updateAttachmentOrigin: jest.Mock;
194194
let getUpdateOrigin: jest.Mock;
@@ -203,7 +203,7 @@ describe('registerDashboardAppIntegration', () => {
203203
beforeEach(() => {
204204
jest.useFakeTimers();
205205
mockApi = createMockDashboardApi();
206-
chat$ = new Subject<ChatEvent>();
206+
chatEventsByConversationId = new Map();
207207
addAttachment = jest.fn();
208208
updateAttachmentOrigin = jest.fn().mockResolvedValue(undefined);
209209
getUpdateOrigin = jest.fn(
@@ -223,6 +223,10 @@ describe('registerDashboardAppIntegration', () => {
223223
};
224224
});
225225

226+
const emitChatEvent = (conversationId: string, event: ChatEvent) => {
227+
chatEventsByConversationId.get(conversationId)?.next(event);
228+
};
229+
226230
afterEach(() => {
227231
cleanup?.();
228232
jest.useRealTimers();
@@ -234,7 +238,16 @@ describe('registerDashboardAppIntegration', () => {
234238
addAttachment,
235239
updateAttachmentOrigin,
236240
events: {
237-
chat$,
241+
getChatEvents$: jest.fn((conversationId: string) => {
242+
let chatEvents$ = chatEventsByConversationId.get(conversationId);
243+
244+
if (!chatEvents$) {
245+
chatEvents$ = new Subject<ChatEvent>();
246+
chatEventsByConversationId.set(conversationId, chatEvents$);
247+
}
248+
249+
return chatEvents$.asObservable();
250+
}),
238251
ui: { activeConversation$: activeConversation$.asObservable() },
239252
},
240253
} as unknown as AgentBuilderPluginStart;
@@ -459,7 +472,9 @@ describe('registerDashboardAppIntegration', () => {
459472
);
460473

461474
addAttachment.mockClear();
462-
chat$.next(
475+
emitConversationChange({ id: 'conversation-1', attachments: [] });
476+
emitChatEvent(
477+
'conversation-1',
463478
createMockRoundCompleteEvent(
464479
[createVersionedAttachment(createDashboardAttachment({ id: firstDraftAttachment.id }))],
465480
[{ attachment_id: firstDraftAttachment.id, operation: ATTACHMENT_REF_OPERATION.created }]

x-pack/platform/plugins/shared/dashboard_agent/public/attachment_types/dashboard_integration/new_attachment_id_regeneration_subscription.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* 2.0.
66
*/
77

8-
import { filter, type Subscription } from 'rxjs';
8+
import { EMPTY, filter, switchMap, type Subscription } from 'rxjs';
99
import { isRoundCompleteEvent } from '@kbn/agent-builder-common';
1010
import type { AgentBuilderPluginStart } from '@kbn/agent-builder-browser';
1111
import type { IdGenerator } from '..';
@@ -21,8 +21,15 @@ export const createNewAttachmentIdRegenerationSubscription = ({
2121
agentBuilder,
2222
draftAttachmentId,
2323
}: NewAttachmentIdRegenerationSubscriptionParams): Subscription =>
24-
agentBuilder.events.chat$.pipe(filter(isRoundCompleteEvent)).subscribe((event) => {
25-
if (event.data.attachments?.some(({ id }) => id === draftAttachmentId.current)) {
26-
draftAttachmentId.next();
27-
}
28-
});
24+
agentBuilder.events.ui.activeConversation$
25+
.pipe(
26+
switchMap((conversation) =>
27+
conversation?.id ? agentBuilder.events.getChatEvents$(conversation.id) : EMPTY
28+
),
29+
filter(isRoundCompleteEvent)
30+
)
31+
.subscribe((event) => {
32+
if (event.data.attachments?.some(({ id }) => id === draftAttachmentId.current)) {
33+
draftAttachmentId.next();
34+
}
35+
});

x-pack/platform/plugins/shared/dashboard_agent/public/attachment_types/index.test.tsx

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,21 @@ describe('registerDashboardAttachmentUiDefinition', () => {
193193
let deps: ReturnType<typeof createMockDeps>;
194194
let uiDefinition: AttachmentUIDefinition<DashboardAttachment>;
195195
let unregister: () => void;
196-
let chat$: Subject<ChatEvent>;
197196
let currentAppId$: BehaviorSubject<string | null>;
198197

199198
const createMockDeps = () => {
200-
chat$ = new Subject<ChatEvent>();
201199
currentAppId$ = new BehaviorSubject<string | null>('agentBuilder');
200+
const chatEventsByConversationId = new Map<string, Subject<ChatEvent>>();
201+
const getChatEvents = (conversationId: string) => {
202+
let chatEvents$ = chatEventsByConversationId.get(conversationId);
203+
204+
if (!chatEvents$) {
205+
chatEvents$ = new Subject<ChatEvent>();
206+
chatEventsByConversationId.set(conversationId, chatEvents$);
207+
}
208+
209+
return chatEvents$;
210+
};
202211
const dashboardAppClientApi$ = new Subject<DashboardApi | undefined>();
203212
const addAttachmentType = jest.fn();
204213
const updateAttachmentOrigin = jest.fn().mockResolvedValue(undefined);
@@ -216,7 +225,9 @@ describe('registerDashboardAttachmentUiDefinition', () => {
216225
addAttachment: mockAddAttachment,
217226
updateAttachmentOrigin,
218227
events: {
219-
chat$,
228+
getChatEvents$: jest.fn((conversationId: string) =>
229+
getChatEvents(conversationId).asObservable()
230+
),
220231
ui: { activeConversation$: activeConversation$.asObservable() },
221232
},
222233
} as unknown as AgentBuilderPluginStart;
@@ -252,7 +263,9 @@ describe('registerDashboardAttachmentUiDefinition', () => {
252263
updateAttachmentOrigin,
253264
findDashboardsService,
254265
emitConversationChange,
255-
chat$,
266+
emitChatEvent: (conversationId: string, event: ChatEvent) => {
267+
getChatEvents(conversationId).next(event);
268+
},
256269
currentAppId$,
257270
};
258271
};
@@ -329,7 +342,7 @@ describe('registerDashboardAttachmentUiDefinition', () => {
329342
addAttachment: jest.fn(),
330343
updateAttachmentOrigin: jest.fn().mockResolvedValue(undefined),
331344
events: {
332-
chat$: new Subject<ChatEvent>(),
345+
getChatEvents$: jest.fn(() => new Subject<ChatEvent>().asObservable()),
333346
ui: {
334347
activeConversation$: new BehaviorSubject<ActiveConversation | null>(
335348
null
@@ -555,7 +568,7 @@ describe('registerDashboardAttachmentUiDefinition', () => {
555568
});
556569
});
557570

558-
describe('dashboard app integration - live changes from chat$', () => {
571+
describe('dashboard app integration - live changes from Agent Builder events', () => {
559572
beforeEach(() => {
560573
jest.useFakeTimers();
561574
});
@@ -575,7 +588,8 @@ describe('registerDashboardAttachmentUiDefinition', () => {
575588

576589
// Updated operation triggers state update
577590
const versionedAttachment = createMockVersionedAttachment('attachment-1');
578-
chat$.next(
591+
deps.emitChatEvent(
592+
'conversation-1',
579593
createMockRoundCompleteEvent(
580594
[versionedAttachment],
581595
[{ attachment_id: 'attachment-1', operation: ATTACHMENT_REF_OPERATION.updated }]
@@ -589,7 +603,8 @@ describe('registerDashboardAttachmentUiDefinition', () => {
589603

590604
// Created operation also triggers
591605
mockApi.setState.mockClear();
592-
chat$.next(
606+
deps.emitChatEvent(
607+
'conversation-1',
593608
createMockRoundCompleteEvent(
594609
[versionedAttachment],
595610
[{ attachment_id: 'attachment-1', operation: ATTACHMENT_REF_OPERATION.created }]
@@ -600,6 +615,28 @@ describe('registerDashboardAttachmentUiDefinition', () => {
600615
cleanup?.();
601616
});
602617

618+
it('ignores roundComplete events from other conversations', async () => {
619+
const { getAttachment } = createMockAttachment('attachment-1');
620+
const mockApi = createMockDashboardApi();
621+
622+
const cleanup = await mountAttachment({
623+
getAttachment,
624+
api: mockApi as unknown as DashboardApi,
625+
conversationId: 'conversation-1',
626+
});
627+
628+
deps.emitChatEvent(
629+
'conversation-2',
630+
createMockRoundCompleteEvent(
631+
[createMockVersionedAttachment('attachment-1')],
632+
[{ attachment_id: 'attachment-1', operation: ATTACHMENT_REF_OPERATION.updated }]
633+
)
634+
);
635+
636+
expect(mockApi.setState).not.toHaveBeenCalled();
637+
cleanup?.();
638+
});
639+
603640
it('does not update state for read-only operations or missing attachments', async () => {
604641
const { getAttachment } = createMockAttachment('attachment-1');
605642
const mockApi = createMockDashboardApi();
@@ -610,7 +647,8 @@ describe('registerDashboardAttachmentUiDefinition', () => {
610647
});
611648

612649
// Read operation - no update
613-
chat$.next(
650+
deps.emitChatEvent(
651+
'conversation-1',
614652
createMockRoundCompleteEvent(
615653
[createMockVersionedAttachment('attachment-1')],
616654
[{ attachment_id: 'attachment-1', operation: ATTACHMENT_REF_OPERATION.read }]
@@ -619,7 +657,8 @@ describe('registerDashboardAttachmentUiDefinition', () => {
619657
expect(mockApi.setState).not.toHaveBeenCalled();
620658

621659
// No attachment in event - no update
622-
chat$.next(
660+
deps.emitChatEvent(
661+
'conversation-1',
623662
createMockRoundCompleteEvent(
624663
[],
625664
[{ attachment_id: 'attachment-1', operation: ATTACHMENT_REF_OPERATION.updated }]
@@ -628,7 +667,8 @@ describe('registerDashboardAttachmentUiDefinition', () => {
628667
expect(mockApi.setState).not.toHaveBeenCalled();
629668

630669
// Attachment without versions - no update
631-
chat$.next(
670+
deps.emitChatEvent(
671+
'conversation-1',
632672
createMockRoundCompleteEvent(
633673
[createMockVersionedAttachment('attachment-1', undefined, false)],
634674
[{ attachment_id: 'attachment-1', operation: ATTACHMENT_REF_OPERATION.updated }]
@@ -652,7 +692,8 @@ describe('registerDashboardAttachmentUiDefinition', () => {
652692
api: mockApi1 as unknown as DashboardApi,
653693
});
654694

655-
chat$.next(
695+
deps.emitChatEvent(
696+
'conversation-1',
656697
createMockRoundCompleteEvent(
657698
[createMockVersionedAttachment('attachment-1', 'original-dashboard-id')],
658699
[{ attachment_id: 'attachment-1', operation: ATTACHMENT_REF_OPERATION.updated }]
@@ -678,7 +719,8 @@ describe('registerDashboardAttachmentUiDefinition', () => {
678719
api: mockApi2 as unknown as DashboardApi,
679720
});
680721

681-
chat$.next(
722+
deps.emitChatEvent(
723+
'conversation-1',
682724
createMockRoundCompleteEvent(
683725
[createMockVersionedAttachment('attachment-1', 'same-dashboard-id')],
684726
[{ attachment_id: 'attachment-1', operation: ATTACHMENT_REF_OPERATION.updated }]
@@ -688,7 +730,7 @@ describe('registerDashboardAttachmentUiDefinition', () => {
688730
cleanup2?.();
689731
});
690732

691-
it('cleans up chat$ subscription on cleanup or API unavailable', async () => {
733+
it('cleans up Agent Builder events subscription on cleanup or API unavailable', async () => {
692734
const { getAttachment } = createMockAttachment('attachment-1');
693735
const mockApi = createMockDashboardApi();
694736

@@ -699,7 +741,8 @@ describe('registerDashboardAttachmentUiDefinition', () => {
699741

700742
// API becomes unavailable
701743
deps.dashboardAppClientApi$.next(undefined);
702-
chat$.next(
744+
deps.emitChatEvent(
745+
'conversation-1',
703746
createMockRoundCompleteEvent(
704747
[createMockVersionedAttachment('attachment-1')],
705748
[{ attachment_id: 'attachment-1', operation: ATTACHMENT_REF_OPERATION.updated }]
@@ -710,7 +753,8 @@ describe('registerDashboardAttachmentUiDefinition', () => {
710753
// After cleanup
711754
deps.dashboardAppClientApi$.next(mockApi as unknown as DashboardApi);
712755
cleanup?.();
713-
chat$.next(
756+
deps.emitChatEvent(
757+
'conversation-1',
714758
createMockRoundCompleteEvent(
715759
[createMockVersionedAttachment('attachment-1')],
716760
[{ attachment_id: 'attachment-1', operation: ATTACHMENT_REF_OPERATION.updated }]

0 commit comments

Comments
 (0)