Skip to content

Commit 3605c53

Browse files
authored
[ESQL Editor] Optimize stream description fetching (#265103)
1 parent c5eec9c commit 3605c53

7 files changed

Lines changed: 330 additions & 102 deletions

File tree

packages/kbn-optimizer/limits.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ pageLoadAssetSize:
177177
spaces: 28871
178178
stackAlerts: 31499
179179
stackConnectors: 85421
180-
streams: 15433
180+
streams: 32595
181181
streamsApp: 25375
182182
synthetics: 31571
183183
telemetry: 25755

x-pack/platform/plugins/shared/streams/common/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,5 @@ export {
4646
type StreamsAppLocationParams,
4747
getStreamsLocation,
4848
} from './get_streams_location/get_streams_location';
49+
50+
export type { StreamSummary } from './stream_summary';
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
export interface StreamSummary {
9+
name: string;
10+
type: string;
11+
description: string;
12+
}

x-pack/platform/plugins/shared/streams/public/services/esql_source_enricher.test.ts

Lines changed: 199 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -8,57 +8,19 @@
88
import type { ApplicationStart } from '@kbn/core/public';
99
import type { ESQLSourceResult } from '@kbn/esql-types';
1010
import { SOURCES_TYPES } from '@kbn/esql-types';
11-
import type { Streams } from '@kbn/streams-schema';
11+
import type { StreamSummary } from '../../common';
1212
import type { StreamsRepositoryClient } from '../api';
1313
import { createStreamsSourceEnricher, STREAMS_CACHE_TTL_MS } from './esql_source_enricher';
1414

15-
const NOW = '2024-01-01T00:00:00.000Z';
16-
17-
const wiredStreamDefinition: Streams.WiredStream.Definition = {
18-
name: 'logs',
19-
type: 'wired',
20-
description: 'All logs',
21-
updated_at: NOW,
22-
ingest: {
23-
lifecycle: { inherit: {} },
24-
processing: { steps: [], updated_at: NOW },
25-
settings: {},
26-
failure_store: { inherit: {} },
27-
wired: { fields: {}, routing: [] },
28-
},
29-
};
30-
31-
const classicStreamDefinition: Streams.ClassicStream.Definition = {
32-
name: 'classic-logs',
33-
type: 'classic',
34-
description: 'Classic log stream',
35-
updated_at: NOW,
36-
ingest: {
37-
lifecycle: { inherit: {} },
38-
processing: { steps: [], updated_at: NOW },
39-
settings: {},
40-
failure_store: { inherit: {} },
41-
classic: {},
42-
},
43-
};
44-
45-
const wiredStreamNoDescription: Streams.WiredStream.Definition = {
46-
...wiredStreamDefinition,
47-
name: 'no-desc-stream',
48-
description: '',
49-
};
50-
5115
const makeSource = (name: string, extra: Partial<ESQLSourceResult> = {}): ESQLSourceResult => ({
5216
name,
5317
hidden: false,
5418
...extra,
5519
});
5620

57-
const makeRepositoryClient = (
58-
streams: Streams.all.Definition[]
59-
): jest.Mocked<StreamsRepositoryClient> =>
21+
const makeRepositoryClient = (summaries: StreamSummary[]): jest.Mocked<StreamsRepositoryClient> =>
6022
({
61-
fetch: jest.fn().mockResolvedValue({ streams }),
23+
fetch: jest.fn().mockResolvedValue({ summaries }),
6224
} as unknown as jest.Mocked<StreamsRepositoryClient>);
6325

6426
const makeApplication = (
@@ -70,7 +32,7 @@ const makeApplication = (
7032
describe('createStreamsSourceEnricher', () => {
7133
it('returns sources unchanged when none match a stream', async () => {
7234
const enricher = createStreamsSourceEnricher(
73-
makeRepositoryClient([wiredStreamDefinition]),
35+
makeRepositoryClient([{ name: 'logs', type: 'wired', description: 'All logs' }]),
7436
makeApplication()
7537
);
7638
const sources = [makeSource('unrelated-index')];
@@ -82,7 +44,7 @@ describe('createStreamsSourceEnricher', () => {
8244

8345
it('enriches a source matching a wired stream with type, description, and link', async () => {
8446
const enricher = createStreamsSourceEnricher(
85-
makeRepositoryClient([wiredStreamDefinition]),
47+
makeRepositoryClient([{ name: 'logs', type: 'wired', description: 'All logs' }]),
8648
makeApplication()
8749
);
8850
const sources = [makeSource('logs')];
@@ -98,7 +60,9 @@ describe('createStreamsSourceEnricher', () => {
9860

9961
it('enriches a source matching a classic stream with type, description, and link', async () => {
10062
const enricher = createStreamsSourceEnricher(
101-
makeRepositoryClient([classicStreamDefinition]),
63+
makeRepositoryClient([
64+
{ name: 'classic-logs', type: 'classic', description: 'Classic log stream' },
65+
]),
10266
makeApplication()
10367
);
10468
const sources = [makeSource('classic-logs')];
@@ -113,7 +77,7 @@ describe('createStreamsSourceEnricher', () => {
11377

11478
it('sets description to undefined when the stream has an empty description', async () => {
11579
const enricher = createStreamsSourceEnricher(
116-
makeRepositoryClient([wiredStreamNoDescription]),
80+
makeRepositoryClient([{ name: 'no-desc-stream', type: 'wired', description: '' }]),
11781
makeApplication()
11882
);
11983
const sources = [makeSource('no-desc-stream')];
@@ -125,7 +89,10 @@ describe('createStreamsSourceEnricher', () => {
12589

12690
it('enriches only matching sources in a mixed list', async () => {
12791
const enricher = createStreamsSourceEnricher(
128-
makeRepositoryClient([wiredStreamDefinition, classicStreamDefinition]),
92+
makeRepositoryClient([
93+
{ name: 'logs', type: 'wired', description: 'All logs' },
94+
{ name: 'classic-logs', type: 'classic', description: 'Classic' },
95+
]),
12996
makeApplication()
13097
);
13198
const sources = [makeSource('logs'), makeSource('other-index'), makeSource('classic-logs')];
@@ -139,7 +106,7 @@ describe('createStreamsSourceEnricher', () => {
139106

140107
it('preserves existing source fields when enriching', async () => {
141108
const enricher = createStreamsSourceEnricher(
142-
makeRepositoryClient([wiredStreamDefinition]),
109+
makeRepositoryClient([{ name: 'logs', type: 'wired', description: 'All logs' }]),
143110
makeApplication()
144111
);
145112
const source = makeSource('logs', { hidden: true, title: 'Logs title' });
@@ -174,7 +141,7 @@ describe('createStreamsSourceEnricher', () => {
174141

175142
it('returns an empty array when sources is empty', async () => {
176143
const enricher = createStreamsSourceEnricher(
177-
makeRepositoryClient([wiredStreamDefinition]),
144+
makeRepositoryClient([{ name: 'logs', type: 'wired', description: 'All logs' }]),
178145
makeApplication()
179146
);
180147

@@ -183,14 +150,71 @@ describe('createStreamsSourceEnricher', () => {
183150
expect(result).toEqual([]);
184151
});
185152

153+
it('does not call the API when sources is empty', async () => {
154+
const client = makeRepositoryClient([]);
155+
const enricher = createStreamsSourceEnricher(client, makeApplication());
156+
157+
await enricher([]);
158+
159+
expect(client.fetch).not.toHaveBeenCalled();
160+
});
161+
162+
it('only fetches names that are not yet cached', async () => {
163+
const client = makeRepositoryClient([
164+
{ name: 'logs', type: 'wired', description: 'All logs' },
165+
{ name: 'metrics', type: 'classic', description: 'Metrics' },
166+
]);
167+
const enricher = createStreamsSourceEnricher(client, makeApplication());
168+
169+
// First call: both are cache misses
170+
await enricher([makeSource('logs'), makeSource('metrics')]);
171+
expect(client.fetch).toHaveBeenCalledTimes(1);
172+
expect(client.fetch).toHaveBeenCalledWith(
173+
'POST /internal/streams/_bulk_get_summaries',
174+
expect.objectContaining({
175+
params: { body: { names: expect.arrayContaining(['logs', 'metrics']) } },
176+
})
177+
);
178+
179+
client.fetch.mockClear();
180+
181+
// Second call: both are cache hits — no fetch
182+
await enricher([makeSource('logs'), makeSource('metrics')]);
183+
expect(client.fetch).not.toHaveBeenCalled();
184+
});
185+
186+
it('only fetches new names when some are cached and some are not', async () => {
187+
const client = makeRepositoryClient([{ name: 'logs', type: 'wired', description: 'All logs' }]);
188+
const enricher = createStreamsSourceEnricher(client, makeApplication());
189+
190+
// Prime the cache with 'logs'
191+
await enricher([makeSource('logs')]);
192+
client.fetch.mockClear();
193+
194+
// Second call adds a new source 'metrics'
195+
client.fetch.mockResolvedValue({
196+
streams: [{ name: 'metrics', type: 'classic', description: 'Metrics' }],
197+
});
198+
await enricher([makeSource('logs'), makeSource('metrics')]);
199+
200+
expect(client.fetch).toHaveBeenCalledTimes(1);
201+
expect(client.fetch).toHaveBeenCalledWith(
202+
'POST /internal/streams/_bulk_get_summaries',
203+
expect.objectContaining({ params: { body: { names: ['metrics'] } } })
204+
);
205+
});
206+
186207
describe('caching', () => {
187208
const makeFakePerf = () => {
188-
let now = 0;
209+
// Start at 1 rather than 0: LRUCache uses !!starts[index] to detect whether a TTL
210+
// start was recorded, so a start value of 0 would cause entries to never be
211+
// considered stale. In practice performance.now() always returns a value > 0.
212+
let now = 1;
189213
return { now: () => now, tick: (ms: number) => (now += ms) };
190214
};
191215

192216
it('calls the streams API only once across multiple enricher invocations within the TTL', async () => {
193-
const client = makeRepositoryClient([wiredStreamDefinition]);
217+
const client = makeRepositoryClient([{ name: 'logs', type: 'wired', description: '' }]);
194218
const enricher = createStreamsSourceEnricher(client, makeApplication(), makeFakePerf());
195219

196220
await enricher([makeSource('logs')]);
@@ -200,7 +224,7 @@ describe('createStreamsSourceEnricher', () => {
200224
});
201225

202226
it('re-fetches streams from the API after the cache TTL expires', async () => {
203-
const client = makeRepositoryClient([wiredStreamDefinition]);
227+
const client = makeRepositoryClient([{ name: 'logs', type: 'wired', description: '' }]);
204228
const perf = makeFakePerf();
205229
const enricher = createStreamsSourceEnricher(client, makeApplication(), perf);
206230

@@ -214,25 +238,142 @@ describe('createStreamsSourceEnricher', () => {
214238
});
215239

216240
it('reflects updated stream data after the cache TTL expires', async () => {
217-
const updatedStream: Streams.WiredStream.Definition = {
218-
...wiredStreamDefinition,
219-
description: 'Updated description',
220-
};
221-
222-
const client = makeRepositoryClient([wiredStreamDefinition]);
241+
const client = makeRepositoryClient([
242+
{ name: 'logs', type: 'wired', description: 'All logs' },
243+
]);
223244
const perf = makeFakePerf();
224245
const enricher = createStreamsSourceEnricher(client, makeApplication(), perf);
225246

226247
const [firstResult] = await enricher([makeSource('logs')]);
227248
expect(firstResult.description).toBe('All logs');
228249

229-
client.fetch.mockResolvedValue({ streams: [updatedStream] });
250+
client.fetch.mockResolvedValue({
251+
summaries: [{ name: 'logs', type: 'wired', description: 'Updated description' }],
252+
});
230253

231254
perf.tick(STREAMS_CACHE_TTL_MS + 1);
232255

233256
const [secondResult] = await enricher([makeSource('logs')]);
234257
expect(secondResult.description).toBe('Updated description');
235258
expect(client.fetch).toHaveBeenCalledTimes(2);
236259
});
260+
261+
it('caches absent names so unmanaged indices do not trigger repeated API calls', async () => {
262+
const client = makeRepositoryClient([]);
263+
const enricher = createStreamsSourceEnricher(client, makeApplication());
264+
const sources = [makeSource('unmanaged-index')];
265+
266+
const first = await enricher(sources);
267+
const second = await enricher(sources);
268+
269+
// Both calls should leave the source unchanged.
270+
expect(first).toEqual(sources);
271+
expect(second).toEqual(sources);
272+
// The API should only have been hit once — the second call is a cache hit.
273+
expect(client.fetch).toHaveBeenCalledTimes(1);
274+
});
275+
276+
it('re-fetches absent names after the cache TTL expires', async () => {
277+
const client = makeRepositoryClient([]);
278+
const perf = makeFakePerf();
279+
const enricher = createStreamsSourceEnricher(client, makeApplication(), perf);
280+
const sources = [makeSource('unmanaged-index')];
281+
282+
await enricher(sources);
283+
284+
perf.tick(STREAMS_CACHE_TTL_MS + 1);
285+
286+
await enricher(sources);
287+
288+
expect(client.fetch).toHaveBeenCalledTimes(2);
289+
});
290+
});
291+
292+
describe('concurrent requests', () => {
293+
it('deduplicates concurrent requests for the same sources', async () => {
294+
const client = makeRepositoryClient([{ name: 'logs', type: 'wired', description: '' }]);
295+
const enricher = createStreamsSourceEnricher(client, makeApplication());
296+
297+
const [result1, result2] = await Promise.all([
298+
enricher([makeSource('logs')]),
299+
enricher([makeSource('logs')]),
300+
]);
301+
302+
expect(client.fetch).toHaveBeenCalledTimes(1);
303+
expect(result1[0].type).toBe(SOURCES_TYPES.WIRED_STREAM);
304+
expect(result2[0].type).toBe(SOURCES_TYPES.WIRED_STREAM);
305+
});
306+
307+
it('batches cache misses from concurrent calls into a single API request', async () => {
308+
const client = makeRepositoryClient([
309+
{ name: 'logs', type: 'wired', description: '' },
310+
{ name: 'metrics', type: 'classic', description: '' },
311+
{ name: 'traces', type: 'wired', description: '' },
312+
]);
313+
const enricher = createStreamsSourceEnricher(client, makeApplication());
314+
315+
await Promise.all([
316+
enricher([makeSource('logs'), makeSource('metrics')]),
317+
enricher([makeSource('metrics'), makeSource('traces')]),
318+
]);
319+
320+
expect(client.fetch).toHaveBeenCalledTimes(1);
321+
expect(client.fetch).toHaveBeenCalledWith(
322+
'POST /internal/streams/_bulk_get_summaries',
323+
expect.objectContaining({
324+
params: { body: { names: expect.arrayContaining(['logs', 'metrics', 'traces']) } },
325+
})
326+
);
327+
});
328+
329+
it('handles overlapping source sets across separate async calls correctly', async () => {
330+
const client = {
331+
fetch: jest
332+
.fn()
333+
.mockResolvedValueOnce({
334+
summaries: [
335+
{ name: 'logs', type: 'wired', description: '' },
336+
{ name: 'metrics', type: 'classic', description: '' },
337+
],
338+
})
339+
.mockResolvedValueOnce({
340+
summaries: [{ name: 'traces', type: 'wired', description: '' }],
341+
}),
342+
} as unknown as jest.Mocked<StreamsRepositoryClient>;
343+
const enricher = createStreamsSourceEnricher(client, makeApplication());
344+
345+
const promise1 = enricher([makeSource('logs'), makeSource('metrics')]);
346+
347+
// Yield to the microtask queue so the first enricher call's API request
348+
// goes in-flight before the second enricher call starts.
349+
await Promise.resolve();
350+
351+
// Second call overlaps: 'metrics' is already in-flight (deduplicated),
352+
// 'traces' is new and triggers a separate API request.
353+
const promise2 = enricher([makeSource('metrics'), makeSource('traces')]);
354+
355+
const [result1, result2] = await Promise.all([promise1, promise2]);
356+
357+
// 'metrics' is shared across calls but fetched only once; 'traces' requires
358+
// its own API call because it wasn't part of the first request.
359+
expect(client.fetch).toHaveBeenCalledTimes(2);
360+
expect(client.fetch).toHaveBeenNthCalledWith(
361+
1,
362+
'POST /internal/streams/_bulk_get_summaries',
363+
expect.objectContaining({
364+
params: { body: { names: expect.arrayContaining(['logs', 'metrics']) } },
365+
})
366+
);
367+
expect(client.fetch).toHaveBeenNthCalledWith(
368+
2,
369+
'POST /internal/streams/_bulk_get_summaries',
370+
expect.objectContaining({ params: { body: { names: ['traces'] } } })
371+
);
372+
373+
expect(result1[0].type).toBe(SOURCES_TYPES.WIRED_STREAM); // logs
374+
expect(result1[1].type).toBe(SOURCES_TYPES.CLASSIC_STREAM); // metrics
375+
expect(result2[0].type).toBe(SOURCES_TYPES.CLASSIC_STREAM); // metrics (deduplicated)
376+
expect(result2[1].type).toBe(SOURCES_TYPES.WIRED_STREAM); // traces
377+
});
237378
});
238379
});

0 commit comments

Comments
 (0)