Skip to content

Commit c5c09d2

Browse files
[SigEvents] Add KI queries deduplication (elastic#263690)
## πŸ““ Summary Closes elastic/streams-program#852 Significant Events query generation can produce duplicates across successive runs or within a single batch when the LLM re-emits queries with equivalent ES|QL. This PR adds multi-layer deduplication β€” commutative-aware ES|QL normalization, deterministic duplicate checks at both the AI validation layer and the persistence layer, and an LLM `replaces` mechanism so the model can update stale queries instead of creating near-duplicates. Rule-backed replacements are routed through `syncQueries` to keep Kibana alerting rules in sync. - Adds `normalizeEsqlSafe` / `hasSameEsql` in `kbn-streams-schema` with AST-based commutative AND/OR sorting so `WHERE a AND b` and `WHERE b AND a` are recognized as duplicates. - Introduces `replaces` field in the `add_queries` tool schema and system prompt, with three-case evaluation (SKIP / REPLACE / NEW) and conservative-bias rules that prefer duplicates over lost queries. - Feeds existing stored queries as LLM context (up to 50, severity-sorted) and rejects server-side duplicates via normalized ES|QL comparison before the query reaches the tool response. - Splits `persistQueries` into two-phase `queryClient.bulk` calls: standard ops with `createRules: false` (link-only), rule-backed replacements through default `bulk` (triggers `syncQueries` for rule update/uninstall). - Exports `QueryClientBulkOperation` / `QueryClientBulkIndexOperation` types from `query_client.ts`, resolving the naming collision with internal storage bulk types. - Adds unit tests for `normalizeEsqlSafe` / `hasSameEsql` (9 cases) and `persistQueries` (10 cases covering dedup, replaces routing, intra-batch dedup, hallucinated ID fallback, and two-phase bulk). ## πŸ§ͺ Testing - Trigger Significant Events generation on a stream that already has queries; verify that existing queries are not re-emitted as duplicates and new detection dimensions are added. --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
1 parent 983d331 commit c5c09d2

15 files changed

Lines changed: 779 additions & 33 deletions

File tree

β€Žx-pack/platform/packages/shared/kbn-streams-ai/index.tsβ€Ž

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ export {
1919
type SuggestProcessingPipelineResult,
2020
type SuggestPipelineAgentSchema,
2121
} from './workflows/suggest_processing_pipeline';
22-
export { generateSignificantEvents } from './src/significant_events/generate_significant_events';
22+
export {
23+
generateSignificantEvents,
24+
type ExistingQuerySummary,
25+
} from './src/significant_events/generate_significant_events';
2326
export {
2427
createDefaultSignificantEventsToolUsage,
2528
type SignificantEventsToolUsage,

β€Žx-pack/platform/packages/shared/kbn-streams-ai/src/significant_events/generate_significant_events.tsβ€Ž

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
ensureMetadata,
1414
getSourcesForStream,
1515
getStatsQueryHints,
16+
normalizeEsqlSafe,
1617
replaceFromSources,
1718
} from '@kbn/streams-schema';
1819
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
@@ -40,6 +41,17 @@ import {
4041
type SignificantEventsToolUsage,
4142
} from './tools/tool_usage';
4243

44+
const MAX_EXISTING_QUERIES_FOR_CONTEXT = 50;
45+
46+
export interface ExistingQuerySummary {
47+
id: string;
48+
title: string;
49+
type: string;
50+
severity_score?: number;
51+
description: string;
52+
esql: string;
53+
}
54+
4355
/**
4456
* Intermediate representation of a query as produced by the LLM tool output.
4557
* Uses a flat `esql` string (vs the wrapped `EsqlQuery` in the wire type)
@@ -53,6 +65,7 @@ interface ParsedToolQuery {
5365
category: SignificantEventType;
5466
severity_score: number;
5567
evidence?: string[];
68+
replaces?: string;
5669
}
5770

5871
function getErrorMessage(error: unknown): string {
@@ -73,6 +86,7 @@ export async function generateSignificantEvents({
7386
logger,
7487
additionalTools,
7588
additionalToolCallbacks,
89+
existingQueries,
7690
}: {
7791
stream: Streams.all.Definition;
7892
esClient: ElasticsearchClient;
@@ -87,6 +101,7 @@ export async function generateSignificantEvents({
87101
systemPrompt: string;
88102
additionalTools?: Record<string, ToolDefinition>;
89103
additionalToolCallbacks?: Record<string, ToolCallback>;
104+
existingQueries?: ExistingQuerySummary[];
90105
}): Promise<{
91106
queries: ParsedToolQuery[];
92107
tokensUsed: ChatCompletionTokenCount;
@@ -99,6 +114,18 @@ export async function generateSignificantEvents({
99114
const prompt = createGenerateSignificantEventsPrompt({ systemPrompt, additionalTools });
100115
const targetSources = getSourcesForStream(stream);
101116

117+
const existingQueriesList = existingQueries ?? [];
118+
119+
const normalizedStoredEsqls = new Set(existingQueriesList.map((q) => normalizeEsqlSafe(q.esql)));
120+
121+
const existingQueriesContext = existingQueriesList.length
122+
? JSON.stringify(
123+
[...existingQueriesList]
124+
.sort((a, b) => (b.severity_score ?? 0) - (a.severity_score ?? 0))
125+
.slice(0, MAX_EXISTING_QUERIES_FOR_CONTEXT)
126+
)
127+
: '';
128+
102129
logger.trace('Generating significant events via reasoning agent');
103130
const response = await withSpan('generate_significant_events', () =>
104131
executeAsReasoningAgent({
@@ -107,6 +134,7 @@ export async function generateSignificantEvents({
107134
description: stream.description,
108135
available_feature_types: SIGNIFICANT_EVENTS_FEATURE_TOOL_TYPES.join(', '),
109136
computed_feature_instructions: getComputedFeatureInstructions(),
137+
existing_queries: existingQueriesContext,
110138
},
111139
maxSteps: additionalToolCallbacks ? 6 : 4,
112140
prompt,
@@ -185,6 +213,16 @@ export async function generateSignificantEvents({
185213
? sourceRewritten
186214
: ensureMetadata(sourceRewritten);
187215

216+
if (normalizedStoredEsqls.has(normalizeEsqlSafe(rewritten))) {
217+
return {
218+
query: { ...query, type: derivedType, esql: rewritten },
219+
valid: false,
220+
status: 'Duplicate',
221+
error: 'This query already exists for this stream.',
222+
hints: undefined,
223+
};
224+
}
225+
188226
const hints = getStatsQueryHints(rewritten);
189227

190228
await esClient.esql.query({

β€Žx-pack/platform/packages/shared/kbn-streams-ai/src/significant_events/prompt.tsβ€Ž

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ export function createGenerateSignificantEventsPrompt({
3434
description: z.string(),
3535
available_feature_types: z.string(),
3636
computed_feature_instructions: z.string(),
37+
existing_queries: z.string(),
3738
}),
3839
})
3940
.version({
@@ -121,6 +122,11 @@ export function createGenerateSignificantEventsPrompt({
121122
type: 'string',
122123
},
123124
},
125+
replaces: {
126+
type: 'string',
127+
description:
128+
'If this query replaces an existing one (same detection intent but updated ES|QL), set this to the ID of the existing query from `existing_queries`.',
129+
},
124130
},
125131
required: ['esql', 'title', 'description', 'category', 'severity_score'],
126132
},

β€Žx-pack/platform/packages/shared/kbn-streams-ai/src/significant_events/system_prompt.textβ€Ž

Lines changed: 114 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,121 @@ Schema features indicate the **log schema family** (ecs, otel, or custom) detect
121121
| Tool | Function | Notes |
122122
| :--- | :--- | :--- |
123123
| `get_stream_features` | Fetches stream features for this stream. | **MUST be called first before any `add_queries` call.** Call without filters to get all features at once. For incremental retrieval, prefer using `min_confidence` + `limit`; optionally pass `feature_types` to narrow by type. Results include computed features (dataset_analysis, log_samples, log_patterns, error_logs) and inferred features. Supported values: `{{{available_feature_types}}}`. |
124-
| `add_queries` | Submits one or more ES|QL queries for the user. | Payload is a list of objects, each with `title`, `description`, `esql`, `category`, `severity_score`, optional `type` ("match" or "stats"), and optional `evidence`. |
124+
| `add_queries` | Submits one or more ES|QL queries for the user. | Payload is a list of objects, each with `title`, `description`, `esql`, `category`, `severity_score`, optional `type` ("match" or "stats"), optional `evidence`, and optional `replaces` (ID of an existing query this one supersedes β€” see Existing Queries section). |
125125
| `reason()` | **Begin a Reasoning Monologue** | Outputs your private thoughts. Must use sentinel tags (`<<<BEGIN_INTERNAL>>>`...`<<<END_INTERNAL>>>`). |
126126
| `complete()` | Declare readiness to answer | Ends the loop and triggers the **Definitive Output**. |
127127

128128
---
129129

130+
## Existing Queries
131+
132+
If `existing_queries` are provided in the input, they list queries already
133+
created for this stream in previous generation runs. Each entry includes an
134+
`id`, `title`, `type`, `severity_score`, `description`, and `esql`.
135+
136+
You MUST evaluate every existing query against the current stream features
137+
before generating any new queries. For each existing query, determine which
138+
of these three cases applies:
139+
140+
### Case 1: Query is still valid β€” SKIP
141+
142+
The query's detection intent is still relevant AND its ES|QL references fields
143+
that are still present and meaningful in the current `dataset_analysis`.
144+
145+
Apply this case whenever your candidate ES|QL is **functionally equivalent**
146+
to an existing query (matches the same documents), not just when it is
147+
textually identical. See "Rules" below for the equivalence patterns you must
148+
recognize.
149+
150+
**Action:** Do NOT re-emit this query. Do not include it in any `add_queries`
151+
call. It is already stored and working correctly.
152+
153+
### Case 2: Query needs updated ES|QL β€” REPLACE
154+
155+
The query's detection intent is still relevant, but the underlying stream
156+
fields have changed since the query was last generated. Examples:
157+
158+
- A field the query references has been removed, renamed, or is no longer
159+
populated
160+
- The field value distribution has changed (e.g., `log.level` was multi-valued
161+
but is now uniform β€” the query needs an alternate error signal)
162+
- A better or more specific field has become available for the same detection
163+
goal (e.g., `error.type` appeared where only message patterns existed before)
164+
- The query's thresholds or structure no longer match the current data patterns
165+
166+
**Action:** Re-emit the query in `add_queries` with:
167+
- Updated ES|QL reflecting the current field landscape
168+
- `replaces` set to the `id` of the existing query it supersedes
169+
- The same title (or improved), updated description if needed
170+
- Recalibrated `severity_score` if the signal strength changed
171+
172+
### Case 3: Novel detection β€” NEW QUERY
173+
174+
The stream features reveal a detection opportunity not covered by any existing
175+
query. This is a genuinely new signal dimension.
176+
177+
**Action:** Emit the query in `add_queries` normally, without setting
178+
`replaces`. Focus on dimensions not already covered by existing queries.
179+
180+
### Rules
181+
182+
- **Functional equivalence counts as Case 1 (SKIP).** Two queries are
183+
functionally equivalent when they match the same documents, regardless of
184+
syntax. Treat the following as the same query and do not re-emit:
185+
- `f:"x" AND f:"y"` vs `f:"y" AND f:"x"` (operand order in AND/OR)
186+
- `f IN ("a", "b")` vs `f == "a" OR f == "b"` (IN vs equality OR chain)
187+
- Same predicate with a redundant filter added or removed (e.g., scoping to
188+
`service.name == "X"` when the stream only contains service "X")
189+
- Literal-formatting differences that don't change the comparison
190+
(`error_rate > 5` vs `error_rate > 5.0`, `duration > 1000` vs
191+
`duration > 1_000`)
192+
- Boundary changes are NOT equivalent: `> 50` and `>= 50` match different
193+
documents at the boundary value, so do **not** treat them as the same
194+
query. The same applies to `<` vs `<=` and to threshold value changes
195+
(e.g., `> 50` vs `> 100`) β€” these are Case 2 (REPLACE) or Case 3 (NEW)
196+
candidates, never Case 1 (SKIP).
197+
- **Strict subset/superset is also a SKIP.** If your candidate query adds a
198+
filter to an existing query (or removes one), it matches a subset/superset
199+
of the existing documents β€” not a new signal. In particular,
200+
`MATCH_PHRASE(f, "a b c")` is a strict subset of `f:"a" AND f:"b" AND f:"c"`
201+
(the phrase requires adjacency and order; the AND chain does not). Pick one
202+
form per concept and do not emit both.
203+
- **Prefer SKIP over REPLACE when uncertain.** Use `replaces` only when you
204+
are confident the new query preserves the same detection intent AND the
205+
existing ES|QL is clearly outdated. When unsure, skip (Case 1) rather than
206+
emit a near-duplicate.
207+
- Never set `replaces` on a genuinely new query.
208+
- Never emit two queries with `replaces` pointing to the same existing query
209+
ID β€” pick the best replacement and skip the other.
210+
- **Intra-batch dedup is your responsibility.** Do not emit two queries in the
211+
same `add_queries` call that are functionally equivalent to each other. The
212+
automatic duplicate check only rejects exact ES|QL matches after
213+
normalization; near-duplicates from the patterns above slip through.
214+
215+
### Reasoning checklist
216+
217+
During your Reasoning Monologue, after retrieving features with
218+
`get_stream_features`, explicitly evaluate the existing queries:
219+
220+
```
221+
<<<BEGIN_INTERNAL>>>
222+
EXISTING QUERY EVALUATION>
223+
- "High Error Rate" (id: abc-123): ES|QL uses log.level IN ("ERROR", ...).
224+
dataset_analysis shows log.level is present with 3% error distribution.
225+
Fields still valid β†’ SKIP.
226+
- "Database Timeout Detection" (id: def-456): ES|QL uses body.text:"timeout"
227+
scoped to service.name == "old-db-service". dataset_analysis shows
228+
service.name no longer contains "old-db-service"; replaced by "new-db".
229+
Intent valid, field changed β†’ REPLACE with updated entity scope.
230+
- No existing query covers auth failure patterns, but event.outcome is present
231+
with 10% failure rate β†’ NEW QUERY opportunity.
232+
<<<END_INTERNAL>>>
233+
```
234+
235+
If `existing_queries` is empty or absent, skip this evaluation entirely.
236+
237+
---
238+
130239
## 2. Core Loop β€” Act/Gather ➜ **Reason** ➜ Decide (continue or complete)
131240

132241
**Mandatory first step:** Your very first action must be to call `get_stream_features` (without filters) to retrieve all available features. Do not call `add_queries` until you have reviewed the feature results and grounded your queries in them.
@@ -407,7 +516,7 @@ Alternatively, you can use the function form `MATCH(field, "query")` which is eq
407516
- **All terms required (3+ terms):** `MATCH(body.text, "connection timeout error", {"operator": "AND"})` β€” cleaner for many terms.
408517
- **Exact phrase (word order matters):** `MATCH_PHRASE(body.text, "connection timeout")` β€” use only for well-known phrases where word order and adjacency are semantically important (e.g., `"Failed password for"`, `"Out of memory"`, `"Started Application in"`).
409518

410-
**Default to separate `:` terms with `AND`** for multi-term queries. Reserve `MATCH_PHRASE` for known phrases where order matters.
519+
**Default to separate `:` terms with `AND`** for multi-term queries. Reserve `MATCH_PHRASE` for known phrases where word order is semantic (e.g., `"Failed password for"`, `"Out of memory"`). **Pick ONE form per concept** β€” never emit both the `AND`-of-`:` variant and the `MATCH_PHRASE` variant as separate queries. The `AND` chain is strictly broader than the phrase (it does not require adjacency or word order), so emitting both creates a subset/superset redundancy for the same signal β€” pick the form that matches the detection intent and skip the other.
411520

412521
**On keyword fields**, `:` performs exact matching (the field is not analyzed):
413522
- `log.level:"ERROR"` β†’ exact match on keyword field
@@ -653,10 +762,10 @@ FROM <stream>
653762
### Guardrails
654763

655764
1. **Precision first:** Higher threshold is safer. Prefer zero firings over false positives.
656-
2. **Signal diversity:** Each STATS query covers a distinct failure dimension (error rate, latency, throughput, auth, cardinality). Consolidate overlapping signals.
765+
2. **Signal diversity:** Emit **at most one STATS query per distinct failure dimension** (error rate, latency, throughput, auth, cardinality). If multiple candidates target the same dimension, pick the strongest and skip the others β€” do not emit variants that differ only in threshold, entity scope, or aggregation shape.
657766
3. **Bucket sizing:** Default 5 min. Low traffic (<100/min): 10–15 min. High traffic (>1000/min): 1–2 min.
658767
4. **Descriptions must answer:** (1) what user-visible problem, (2) what action, (3) threshold reasoning vs baseline, (4) thresholds may need adjustment, (5) include a "can indicate..." clause naming likely root causes β€” investigation agents use this as the hypothesis claim.
659768
5. **Severity:** Weigh relative deviation AND absolute impact. 2Γ— from 0.1%β†’0.2% is low (~40). 2Γ— from 20%β†’40% is critical (~85).
660-
6. **Complementarity:** For each important signal, generate a **detection + evidence pair**: a STATS query (BY bucket) for aggregate detection and a match query for evidence retrieval. The STATS query gets higher severity (tested first by investigators). Optionally add a per-entity STATS variant (BY <entity_field>, bucket) when entity cardinality is 2–50 and isolation adds diagnostic value β€” this counts as a separate dimension toward the >5 cap. Do not generate entity variants for single-entity streams.
769+
6. **Complementarity:** For each important signal, emit **at most three queries**: one STATS query (BY bucket) for aggregate detection, one match query for evidence retrieval, and optionally one per-entity STATS variant (BY <entity_field>, bucket) when entity cardinality is 2–50 and isolation adds diagnostic value. Pick a single entity field β€” never emit multiple per-entity variants for the same signal. The STATS query gets higher severity (tested first by investigators). Do not generate entity variants for single-entity streams.
661770
7. **Forbidden:** No `CATEGORIZE`, `CHANGE_POINT`, nested `STATS`, `SORT`/`LIMIT`/`KEEP`, `EVAL` outside STATS queries, high-cardinality BY (>50 distinct).
662-
8. **Target:** Generate as many as the data justifies β€” zero is fine if no pattern has field evidence. If >5, verify each adds a distinct dimension. Quality over quantity.
771+
8. **Target:** Generate as many as the data justifies β€” **do not pad**. Zero is fine if no pattern has field evidence. Every query must cite a concrete field, value distribution, log pattern, or feature from `dataset_analysis`/`get_stream_features`. Stop as soon as every remaining candidate would restate evidence you've already used or would be functionally equivalent to another emitted query (see the Rules in "Existing Queries"). The total count is bounded by Guardrails 2 and 6 (one per dimension, at most three per signal) β€” there is no fixed quota. Quality over quantity.

β€Žx-pack/platform/packages/shared/kbn-streams-ai/src/significant_events/user_prompt.textβ€Ž

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,8 @@
33

44
`description`:
55
{{{description}}}
6+
7+
{{#existing_queries}}
8+
`existing_queries`:
9+
{{{existing_queries}}}
10+
{{/existing_queries}}

β€Žx-pack/platform/packages/shared/kbn-streams-schema/index.tsβ€Ž

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ export {
7474
hasStatsCommand,
7575
MS_PER_UNIT,
7676
normalizeEsqlQuery,
77+
normalizeEsqlSafe,
78+
hasSameEsql,
7779
replaceFromSources,
7880
rewriteFromSources,
7981
} from './src/helpers/esql_helpers';

β€Žx-pack/platform/packages/shared/kbn-streams-schema/src/api/significant_events/index.tsβ€Ž

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ interface GeneratedSignificantEventQuery {
8282
severity_score: number;
8383
evidence?: string[];
8484
description: string;
85+
replaces?: string;
8586
}
8687

8788
type SignificantEventsGenerateResponse = Observable<

0 commit comments

Comments
Β (0)