Skip to content

Commit c4ff96e

Browse files
Migrate mergeSampleDocumentsWithFieldCaps to mergeSampleDocumentsWithSchema, replacing field caps logic with ES|QL schema retrieval, updating related helpers, tests, and integrations.
1 parent 388330e commit c4ff96e

13 files changed

Lines changed: 480 additions & 106 deletions

x-pack/platform/packages/shared/kbn-ai-tools/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export { describeDataset } from './src/tools/describe_dataset';
99

1010
export { formatDocumentAnalysis } from './src/tools/describe_dataset/format_document_analysis';
1111

12-
export { mergeSampleDocumentsWithFieldCaps } from './src/tools/describe_dataset/merge_sample_documents_with_field_caps';
12+
export { mergeSampleDocumentsWithSchema } from './src/tools/describe_dataset/merge_sample_documents_with_schema';
1313
export {
1414
getSampleDocuments,
1515
getSampleDocumentsEsql,
@@ -30,5 +30,10 @@ export {
3030
P_VALUE_SIGNIFICANCE_HIGH,
3131
P_VALUE_SIGNIFICANCE_MEDIUM,
3232
} from './src/utils/p_value_to_label';
33+
export {
34+
getEsqlColumnSchema,
35+
type EsqlColumnSchema,
36+
type GetEsqlColumnSchemaParams,
37+
} from './src/utils/get_esql_column_schema';
3338

3439
export { executeAsEsqlAgent } from './src/tools/esql';

x-pack/platform/packages/shared/kbn-ai-tools/src/tools/describe_dataset/get_diverse_sample_documents.test.ts

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,10 @@ jest.mock('./get_sample_documents', () => ({
1717
const getSampleDocumentsEsqlMock = jest.mocked(getSampleDocumentsEsql);
1818

1919
const createEsClient = () => {
20-
const fieldCaps = jest.fn().mockResolvedValue({
21-
fields: {
22-
message: {},
23-
},
24-
});
2520
const query = jest.fn();
2621

2722
return {
28-
esClient: { fieldCaps, esql: { query } } as unknown as ElasticsearchClient,
29-
fieldCaps,
23+
esClient: { esql: { query } } as unknown as ElasticsearchClient,
3024
query,
3125
};
3226
};
@@ -40,6 +34,13 @@ const countResponse = (total: number) => ({
4034
values: [[total]],
4135
});
4236

37+
const schemaResponse = (
38+
columns: Array<{ name: string; type: string }> = [{ name: 'message', type: 'text' }]
39+
) => ({
40+
columns,
41+
values: [],
42+
});
43+
4344
const pass1Response = (
4445
values: unknown[][] = [
4546
['logs-a:doc-1', 10, 'error'],
@@ -76,6 +77,7 @@ describe('getDiverseSampleDocuments', () => {
7677
it('uses ES|QL count, categorize pass, and composite-key source fetch', async () => {
7778
const { esClient, query } = createEsClient();
7879
query
80+
.mockResolvedValueOnce(schemaResponse())
7981
.mockResolvedValueOnce(countResponse(10))
8082
.mockResolvedValueOnce(pass1Response())
8183
.mockResolvedValueOnce(pass2Response());
@@ -90,11 +92,12 @@ describe('getDiverseSampleDocuments', () => {
9092
logger,
9193
});
9294

93-
expect(query.mock.calls[0][0].query).toBe('FROM logs-a, logs-b | STATS total = COUNT(*)');
94-
expect(query.mock.calls[1][0].query).toBe(
95+
expect(query.mock.calls[0][0].query).toBe('FROM logs-a, logs-b | LIMIT 0');
96+
expect(query.mock.calls[1][0].query).toBe('FROM logs-a, logs-b | STATS total = COUNT(*)');
97+
expect(query.mock.calls[2][0].query).toBe(
9598
'FROM logs-a, logs-b METADATA _index, _id | EVAL doc_key = CONCAT(_index, ":", _id) | STATS representative_key = TOP(doc_key, 1, "desc"), count = COUNT(*) BY pattern = CATEGORIZE(message) | SORT count DESC | LIMIT 3'
9699
);
97-
expect(query.mock.calls[2][0].query).toBe(
100+
expect(query.mock.calls[3][0].query).toBe(
98101
'FROM logs-a, logs-b METADATA _index, _id, _source | EVAL doc_key = CONCAT(_index, ":", _id) | WHERE doc_key IN ("logs-b:doc-2") | KEEP _index, _id, _source | LIMIT 1'
99102
);
100103
expect(result.hits).toEqual([
@@ -105,6 +108,7 @@ describe('getDiverseSampleDocuments', () => {
105108
it('adds SAMPLE when the population is large', async () => {
106109
const { esClient, query } = createEsClient();
107110
query
111+
.mockResolvedValueOnce(schemaResponse())
108112
.mockResolvedValueOnce(countResponse(10_000_000))
109113
.mockResolvedValueOnce(pass1Response([['logs-a:doc-1', 10, 'error']]))
110114
.mockResolvedValueOnce(pass2Response([['logs-a', 'doc-1', { message: 'error one' }]]));
@@ -119,12 +123,12 @@ describe('getDiverseSampleDocuments', () => {
119123
logger,
120124
});
121125

122-
expect(query.mock.calls[1][0].query).toContain('| SAMPLE 0.01 |');
126+
expect(query.mock.calls[2][0].query).toContain('| SAMPLE 0.01 |');
123127
});
124128

125129
it('short-circuits when the count query returns zero', async () => {
126130
const { esClient, query } = createEsClient();
127-
query.mockResolvedValueOnce(countResponse(0));
131+
query.mockResolvedValueOnce(schemaResponse()).mockResolvedValueOnce(countResponse(0));
128132

129133
const result = await getDiverseSampleDocuments({
130134
esClient,
@@ -136,14 +140,15 @@ describe('getDiverseSampleDocuments', () => {
136140
logger,
137141
});
138142

139-
expect(query).toHaveBeenCalledTimes(1);
143+
expect(query).toHaveBeenCalledTimes(2);
140144
expect(result).toEqual({ hits: [] });
141145
});
142146

143147
it('falls back to random ES|QL sampling when no message field exists', async () => {
144-
const { esClient, fieldCaps, query } = createEsClient();
145-
fieldCaps.mockResolvedValueOnce({ fields: {} });
146-
query.mockResolvedValueOnce(countResponse(10));
148+
const { esClient, query } = createEsClient();
149+
query
150+
.mockResolvedValueOnce(schemaResponse([{ name: 'host.name', type: 'keyword' }]))
151+
.mockResolvedValueOnce(countResponse(10));
147152
getSampleDocumentsEsqlMock.mockResolvedValueOnce({
148153
hits: [{ _index: 'logs-a', _id: 'doc-1', _source: { event: 'one' } }],
149154
total: 1,
@@ -166,13 +171,42 @@ describe('getDiverseSampleDocuments', () => {
166171
end: 200,
167172
sampleSize: 1,
168173
});
169-
expect(query).toHaveBeenCalledTimes(1);
174+
expect(query).toHaveBeenCalledTimes(2);
170175
expect(result.hits).toEqual([{ _index: 'logs-a', _id: 'doc-1', _source: { event: 'one' } }]);
171176
});
172177

178+
it('uses body.text when it is the first available text field candidate', async () => {
179+
const { esClient, query } = createEsClient();
180+
query
181+
.mockResolvedValueOnce(
182+
schemaResponse([
183+
{ name: 'message', type: 'keyword' },
184+
{ name: 'body.text', type: 'text' },
185+
])
186+
)
187+
.mockResolvedValueOnce(countResponse(10))
188+
.mockResolvedValueOnce(pass1Response([['logs-a:doc-1', 10, 'body pattern']]))
189+
.mockResolvedValueOnce(
190+
pass2Response([['logs-a', 'doc-1', { body: { text: 'body value' } }]])
191+
);
192+
193+
await getDiverseSampleDocuments({
194+
esClient,
195+
index: 'logs-*',
196+
start: 100,
197+
end: 200,
198+
size: 1,
199+
offset: 0,
200+
logger,
201+
});
202+
203+
expect(query.mock.calls[2][0].query).toContain('CATEGORIZE(body.text)');
204+
});
205+
173206
it('drops patterns whose composite key is missing from pass 2', async () => {
174207
const { esClient, query } = createEsClient();
175208
query
209+
.mockResolvedValueOnce(schemaResponse())
176210
.mockResolvedValueOnce(countResponse(10))
177211
.mockResolvedValueOnce(pass1Response())
178212
.mockResolvedValueOnce(pass2Response([['logs-a', 'doc-1', { message: 'error one' }]]));

x-pack/platform/packages/shared/kbn-ai-tools/src/tools/describe_dataset/get_diverse_sample_documents.ts

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import type { ElasticsearchClient } from '@kbn/core/server';
1111
import type { ESQLSearchResponse } from '@kbn/es-types';
1212
import { dateRangeQuery } from '@kbn/es-query';
1313
import type { Logger } from '@kbn/logging';
14+
import { getEsqlColumnSchema } from '../../utils/get_esql_column_schema';
1415
import { getSampleDocumentsEsql } from './get_sample_documents';
1516

1617
const MESSAGE_FIELD_CANDIDATES = ['message', 'body.text'];
@@ -39,9 +40,8 @@ export async function getDiverseSampleDocuments({
3940
const filter = { bool: { filter: timeRangeFilter } };
4041
const indices = Array.isArray(index) ? index : [index];
4142

42-
// TODO: migrate this fieldCaps probe to ES|QL in https://github.com/elastic/streams-program/issues/1220.
4343
const [messageField, totalDocs] = await Promise.all([
44-
detectMessageField({ esClient, index, timeRangeFilter }),
44+
detectMessageField({ esClient, index, start, end }),
4545
runEsqlCount({ esClient, indices, filter }),
4646
]);
4747

@@ -119,27 +119,21 @@ export async function getDiverseSampleDocuments({
119119
async function detectMessageField({
120120
esClient,
121121
index,
122-
timeRangeFilter,
122+
start,
123+
end,
123124
}: {
124125
esClient: ElasticsearchClient;
125126
index: string | string[];
126-
timeRangeFilter: ReturnType<typeof dateRangeQuery>;
127+
start: number;
128+
end: number;
127129
}): Promise<string | undefined> {
128-
const fieldCapsResponse = await esClient.fieldCaps({
129-
index,
130-
fields: MESSAGE_FIELD_CANDIDATES,
131-
index_filter: {
132-
bool: {
133-
filter: timeRangeFilter,
134-
},
135-
},
136-
types: ['text', 'match_only_text'],
137-
});
138-
139-
const fieldsFound = Object.keys(fieldCapsResponse.fields);
130+
const columns = await getEsqlColumnSchema({ esClient, index, start, end });
131+
const textColumnNames = new Set(
132+
columns.filter((column) => column.type === 'text').map((column) => column.name)
133+
);
140134

141135
for (const candidate of MESSAGE_FIELD_CANDIDATES) {
142-
if (fieldsFound.includes(candidate)) {
136+
if (textColumnNames.has(candidate)) {
143137
return candidate;
144138
}
145139
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import type { ElasticsearchClient } from '@kbn/core/server';
9+
import { describeDataset } from '.';
10+
11+
const createEsClient = () => {
12+
const query = jest.fn();
13+
return {
14+
esClient: { esql: { query } } as unknown as ElasticsearchClient,
15+
query,
16+
};
17+
};
18+
19+
const schemaResponse = ({
20+
columns = [
21+
{ name: '@timestamp', type: 'date' },
22+
{ name: 'message', type: 'text' },
23+
{ name: 'host.name', type: 'keyword' },
24+
],
25+
}: {
26+
columns?: Array<{ name: string; type: string; original_types?: string[] }>;
27+
} = {}) => ({
28+
columns,
29+
values: [],
30+
});
31+
32+
const hitsResponse = ({
33+
values = [
34+
['doc-1', { message: 'hello', host: { name: 'host-a' } }],
35+
['doc-2', { message: 'hello', host: { name: 'host-b' } }],
36+
],
37+
}: {
38+
values?: unknown[][];
39+
} = {}) => ({
40+
columns: [
41+
{ name: '_id', type: 'keyword' },
42+
{ name: '_source', type: 'object' },
43+
],
44+
values,
45+
});
46+
47+
const countResponse = (total: number) => ({
48+
columns: [{ name: 'total', type: 'long' }],
49+
values: [[total]],
50+
});
51+
52+
describe('describeDataset', () => {
53+
beforeEach(() => {
54+
jest.clearAllMocks();
55+
});
56+
57+
it('merges ES|QL schema columns with sampled documents', async () => {
58+
const { esClient, query } = createEsClient();
59+
query
60+
.mockResolvedValueOnce(schemaResponse())
61+
.mockResolvedValueOnce(hitsResponse())
62+
.mockResolvedValueOnce(countResponse(2));
63+
64+
const analysis = await describeDataset({
65+
esClient,
66+
index: 'logs-*',
67+
start: 100,
68+
end: 200,
69+
});
70+
71+
expect(query.mock.calls[0][0].query).toBe('FROM logs-* | LIMIT 0');
72+
expect(query.mock.calls[1][0].query).toBe('FROM logs-* METADATA _id, _source | LIMIT 1000');
73+
expect(query.mock.calls[2][0].query).toBe('FROM logs-* | STATS total = COUNT(*)');
74+
expect(analysis.total).toBe(2);
75+
expect(analysis.sampled).toBe(2);
76+
expect(analysis.fields).toEqual(
77+
expect.arrayContaining([
78+
expect.objectContaining({
79+
name: 'message',
80+
types: ['text'],
81+
cardinality: 1,
82+
documentsWithValue: 2,
83+
values: [{ value: 'hello', count: 2 }],
84+
}),
85+
expect.objectContaining({
86+
name: 'host.name',
87+
types: ['keyword'],
88+
cardinality: 2,
89+
documentsWithValue: 2,
90+
}),
91+
])
92+
);
93+
});
94+
95+
it('uses the population count instead of sampled hit count for total', async () => {
96+
const { esClient, query } = createEsClient();
97+
query
98+
.mockResolvedValueOnce(schemaResponse())
99+
.mockResolvedValueOnce(hitsResponse({ values: [['doc-1', { message: 'hello' }]] }))
100+
.mockResolvedValueOnce(countResponse(15_000));
101+
102+
const analysis = await describeDataset({
103+
esClient,
104+
index: 'logs-*',
105+
start: 100,
106+
end: 200,
107+
});
108+
109+
expect(analysis.sampled).toBe(1);
110+
expect(analysis.total).toBe(15_000);
111+
});
112+
113+
it('uses original types for unsupported cross-index columns', async () => {
114+
const { esClient, query } = createEsClient();
115+
query
116+
.mockResolvedValueOnce(
117+
schemaResponse({
118+
columns: [{ name: 'host.name', type: 'unsupported', original_types: ['ip', 'keyword'] }],
119+
})
120+
)
121+
.mockResolvedValueOnce(hitsResponse({ values: [['doc-1', { host: { name: '127.0.0.1' } }]] }))
122+
.mockResolvedValueOnce(countResponse(1));
123+
124+
const analysis = await describeDataset({
125+
esClient,
126+
index: ['logs-a', 'logs-b'],
127+
start: 100,
128+
end: 200,
129+
});
130+
131+
expect(analysis.fields).toEqual(
132+
expect.arrayContaining([
133+
expect.objectContaining({
134+
name: 'host.name',
135+
types: ['ip', 'keyword'],
136+
}),
137+
])
138+
);
139+
});
140+
});

0 commit comments

Comments
 (0)