[sigevents][kis] Unify features and queries in the same data stream#270979
[sigevents][kis] Unify features and queries in the same data stream#270979klacabane wants to merge 37 commits into
Conversation
|
@klacabane, this PR increases one or more page-load bundle sizes by 15% or more:
Large bundle size increases can affect page load performance. Consider whether dependencies can be lazy-loaded or code split to reduce the bundle. See the bundle optimization guide for tips. |
This reverts commit 188f879.
| excluded_at: z.string().optional(), | ||
| run_id: z.string().optional(), | ||
| excluded: z.boolean().optional(), | ||
| updated_at: z.string().optional(), |
| run_id: z.string().optional(), | ||
| excluded: z.boolean().optional(), | ||
| updated_at: z.string().optional(), | ||
| expires_at: z.string().optional(), |
|
|
||
| const featureBulkOperationSchema = z.union([ | ||
| z.object({ index: z.object({ feature: featureSchema }) }), | ||
| z.object({ delete: z.object({ id: z.string() }) }), |
| const featureBulkOperationSchema = z.union([ | ||
| z.object({ index: z.object({ feature: featureSchema }) }), | ||
| z.object({ delete: z.object({ id: z.string() }) }), | ||
| z.object({ exclude: z.object({ id: z.string() }) }), |
| z.object({ index: z.object({ feature: featureSchema }) }), | ||
| z.object({ delete: z.object({ id: z.string() }) }), | ||
| z.object({ exclude: z.object({ id: z.string() }) }), | ||
| z.object({ restore: z.object({ id: z.string() }) }), |
|
|
||
| const featureBulkAcrossStreamsOperationSchema = z.union([ | ||
| z.object({ delete: z.object({ id: z.string(), stream_name: z.string() }) }), | ||
| z.object({ exclude: z.object({ id: z.string(), stream_name: z.string() }) }), |
| const featureBulkAcrossStreamsOperationSchema = z.union([ | ||
| z.object({ delete: z.object({ id: z.string(), stream_name: z.string() }) }), | ||
| z.object({ exclude: z.object({ id: z.string(), stream_name: z.string() }) }), | ||
| z.object({ restore: z.object({ id: z.string(), stream_name: z.string() }) }), |
| const featureBulkAcrossStreamsOperationSchema = z.union([ | ||
| z.object({ delete: z.object({ id: z.string(), stream_name: z.string() }) }), | ||
| z.object({ exclude: z.object({ id: z.string(), stream_name: z.string() }) }), | ||
| z.object({ restore: z.object({ id: z.string(), stream_name: z.string() }) }), |
| }, | ||
| params: z.object({ | ||
| path: z.object({ name: z.string(), uuid: z.string() }), | ||
| path: z.object({ name: z.string(), id: z.string() }), |
| }, | ||
| params: z.object({ | ||
| path: z.object({ name: z.string(), uuid: z.string() }), | ||
| path: z.object({ name: z.string(), id: z.string() }), |
💔 Build Failed
Failed CI StepsMetrics [docs]Module Count
Async chunks
History
|
| const deletableOps: Array<Extract<KIBulkOperation, { delete: unknown }>> = []; | ||
| let deleteSkipped = 0; | ||
| for (const op of deleteOps) { | ||
| if (deleteLatest.find((doc) => doc.id === op.delete.id)) { |
There was a problem hiding this comment.
| if (deleteLatest.find((doc) => doc.id === op.delete.id)) { | |
| if (deleteLatest.some((doc) => doc.id === op.delete.id)) { |
| const latest = docById.get(key); | ||
| if ( | ||
| !latest || | ||
| new Date(latest['@timestamp']).getTime() !== new Date(source['@timestamp']).getTime() |
There was a problem hiding this comment.
q: We don't need a tiebreaker here by _id, right?
| query = withSort(query, sort); | ||
| // Cap at REVISION_SIZE_LIMIT regardless of the requested limit so a large | ||
| // caller-supplied value can't fetch an unbounded result set. | ||
| query = query.keep('_source').limit(Math.min(limit, REVISION_SIZE_LIMIT)); |
There was a problem hiding this comment.
q: should we log a warning in case limit happens to be greater than REVISION_SIZE_LIMIT?
| const docs: StoredKnowledgeIndicator[] = []; | ||
| for (const op of operations) { | ||
| if ('index' in op) { | ||
| if ('feature' in op.index) { |
There was a problem hiding this comment.
nit: we're doing this again a few lines above
| private readonly ttlDays: number | ||
| ) {} | ||
|
|
||
| async bulk( |
There was a problem hiding this comment.
this function seems to be doing a lot of things. I wonder if its content should be broken into smaller functions
| query = withTimeRange(query, options); | ||
| if (where) query = withWhere(query, where); | ||
| query = pickLatestPerGroup(query, groupBy); | ||
| const sortArgs: ComposerSortShorthand[] = sort ?? [['@timestamp', 'DESC']]; |
There was a problem hiding this comment.
is there a constant for @timestamp?
| query = query.keep('_source'); | ||
| const query = latestSourceFrom(index, space).where`${esql.col(idField)} == ${esql.str(idValue)}` | ||
| .sort(['@timestamp', 'ASC']) | ||
| .keep('_source'); |
There was a problem hiding this comment.
Is there a constant for _source?
| const wildcard = (field: string, boost?: number) => ({ | ||
| wildcard: { | ||
| [field]: { | ||
| value: `*${escaped}*`, |
There was a problem hiding this comment.
I guess leading * will perform a full term scan, and could have performance implications
| } | ||
|
|
||
| function computeExpiresAt(timestamp: string, ttlDays: number): string { | ||
| return new Date(new Date(timestamp).getTime() + ttlDays * 24 * 60 * 60 * 1000).toISOString(); |
There was a problem hiding this comment.
nit: Could we extract this 24 * 60 * 60 * 1000 in to a constant?
| const parts: string[] = [`Stream: ${streamName}`]; | ||
| if (feature.title) parts.push(`Title: ${feature.title}`); | ||
| if (feature.description) parts.push(`Description: ${feature.description}`); | ||
| if (feature.type) parts.push(`Type: ${feature.type}`); | ||
| if (feature.subtype) parts.push(`Subtype: ${feature.subtype}`); |
There was a problem hiding this comment.
nit: we could have constants for these texts "Stream" , "Title", etc.
|
Closing as this will be split up in two smaller changes |
## Summary Additive foundation for #270979 — introduces the **unified Knowledge Indicators (KI) data stream** as a new storage backend for features and queries, without touching any existing code. Existing `FeatureClient` and `QueryClient` paths remain fully active. **Model** - One hidden data stream (`.significant_events-knowledge_indicators`) will hold both `feature` and `query` documents, discriminated by `type`. - Identity is `(stream.name, type, id)`. State is reconstructed by selecting the **latest revision per group** (two-stage `INLINE STATS` — `MAX(@timestamp)` then `MAX(_id)` tiebreak). - Writes are append-only: updates add a revision; deletes append a tombstone (`deleted: true`). - Reads filter on the latest revision (drop tombstoned / excluded / expired) after the per-group reduction. - Supports **keyword + semantic hybrid search** across indicators. The index template is installed at Kibana startup so the data stream is ready when callers are migrated over. --- ## Testing This PR has **no behavior change**. All existing routes continue to use `FeatureClient` and `QueryClient`. - verify the index template was installed at startup: GET /_index_template/.significant_events-knowledge_indicators --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
## Summary Additive foundation for elastic#270979 — introduces the **unified Knowledge Indicators (KI) data stream** as a new storage backend for features and queries, without touching any existing code. Existing `FeatureClient` and `QueryClient` paths remain fully active. **Model** - One hidden data stream (`.significant_events-knowledge_indicators`) will hold both `feature` and `query` documents, discriminated by `type`. - Identity is `(stream.name, type, id)`. State is reconstructed by selecting the **latest revision per group** (two-stage `INLINE STATS` — `MAX(@timestamp)` then `MAX(_id)` tiebreak). - Writes are append-only: updates add a revision; deletes append a tombstone (`deleted: true`). - Reads filter on the latest revision (drop tombstoned / excluded / expired) after the per-group reduction. - Supports **keyword + semantic hybrid search** across indicators. The index template is installed at Kibana startup so the data stream is ready when callers are migrated over. --- ## Testing This PR has **no behavior change**. All existing routes continue to use `FeatureClient` and `QueryClient`. - verify the index template was installed at startup: GET /_index_template/.significant_events-knowledge_indicators --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
## Summary Additive foundation for elastic#270979 — introduces the **unified Knowledge Indicators (KI) data stream** as a new storage backend for features and queries, without touching any existing code. Existing `FeatureClient` and `QueryClient` paths remain fully active. **Model** - One hidden data stream (`.significant_events-knowledge_indicators`) will hold both `feature` and `query` documents, discriminated by `type`. - Identity is `(stream.name, type, id)`. State is reconstructed by selecting the **latest revision per group** (two-stage `INLINE STATS` — `MAX(@timestamp)` then `MAX(_id)` tiebreak). - Writes are append-only: updates add a revision; deletes append a tombstone (`deleted: true`). - Reads filter on the latest revision (drop tombstoned / excluded / expired) after the per-group reduction. - Supports **keyword + semantic hybrid search** across indicators. The index template is installed at Kibana startup so the data stream is ready when callers are migrated over. --- ## Testing This PR has **no behavior change**. All existing routes continue to use `FeatureClient` and `QueryClient`. - verify the index template was installed at startup: GET /_index_template/.significant_events-knowledge_indicators --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Summary
Ports features and queries from two separate mutable storage backends to a single unified, append-only Knowledge Indicators (KI) data stream, maintaining feature parity. The old
FeatureClientandQueryClientare removed and replaced by a singleKnowledgeIndicatorClient.Model
.significant_events-knowledge_indicators) holds bothfeatureandquerydocuments, discriminated bytype.(stream.name, type, id). State is reconstructed by selecting the latest revision per group (two-stageINLINE STATS—MAX(@timestamp)thenMAX(_id)tiebreak).deleted: true).uuid/status/last_seen/excluded_at(timestamp) are gone; identity isid, withupdated_at(revision time) andexcluded(boolean).Testing
Streams: significant events(Advanced Settings) in a space with an Enterprise license; confirm the Significant events tab loads.idacross streams, confirm they render as distinct rows, applying/removing filters produces no duplicate/ghost rows, and bulk exclude/restore/delete/promote target the correct stream.