Skip to content

Commit 8cbfc61

Browse files
authored
[SigEvents] Migrate continuous extraction workflow inputs under manual trigger (elastic#270116)
1 parent 19abd49 commit 8cbfc61

3 files changed

Lines changed: 36 additions & 30 deletions

File tree

x-pack/platform/plugins/shared/streams/server/lib/workflows/continuous_extraction_workflow.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,21 @@ describe('continuous_extraction_workflow.yaml stays in sync with constants', ()
2727

2828
it('uses the correct maxScheduledStreams input', () => {
2929
assertYamlContains(
30-
`name: maxScheduledStreams\n type: number\n default: ${MAX_SCHEDULED_STREAMS}`
30+
`name: maxScheduledStreams\n type: number\n default: ${MAX_SCHEDULED_STREAMS}`
3131
);
3232
});
3333

3434
it('uses the correct lookbackHours input', () => {
35-
assertYamlContains(`name: lookbackHours\n type: number\n default: 24`);
35+
assertYamlContains(`name: lookbackHours\n type: number\n default: 24`);
3636
});
3737

3838
it('declares extractionIntervalHours as an optional input without default', () => {
39-
assertYamlContains('name: extractionIntervalHours\n type: number\n description:');
39+
assertYamlContains('name: extractionIntervalHours\n type: number\n description:');
4040
expect(WORKFLOW_YAML).not.toMatch(/name: extractionIntervalHours[\s\S]*?default:/m);
4141
});
4242

4343
it('declares excludedStreamPatterns as an optional input without default', () => {
44-
assertYamlContains('name: excludedStreamPatterns\n type: string\n description:');
44+
assertYamlContains('name: excludedStreamPatterns\n type: string\n description:');
4545
expect(WORKFLOW_YAML).not.toMatch(/name: excludedStreamPatterns[\s\S]*?default:/m);
4646
});
4747

x-pack/platform/plugins/shared/streams/server/lib/workflows/continuous_extraction_workflow.yaml

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
version: "1"
1515
name: ".streams-continuous-ki-extraction"
1616
description: "This workflow is used by the system and should not be modified."
17+
tags:
18+
- observability
19+
- streams
20+
- knowledge-indicators
21+
- continuous-extraction
1722
settings:
1823
# Timeout is 1 minute shorter than the schedule to avoid overlapping runs.
1924
timeout: "9m"
@@ -26,21 +31,22 @@ triggers:
2631
- type: scheduled
2732
with:
2833
every: "10m"
29-
inputs:
30-
- name: maxScheduledStreams
31-
type: number
32-
default: 5
33-
description: "Max streams to schedule per run; remaining eligible streams are skipped."
34-
- name: lookbackHours
35-
type: number
36-
default: 24
37-
description: "Hours of data to sample when identifying KIs."
38-
- name: extractionIntervalHours
39-
type: number
40-
description: "Override for the min hours between extractions per stream. When omitted the Advanced Setting is used."
41-
- name: excludedStreamPatterns
42-
type: string
43-
description: "Override for the comma-separated exclude patterns. When omitted the Advanced Setting is used."
34+
- type: manual
35+
inputs:
36+
- name: maxScheduledStreams
37+
type: number
38+
default: 5
39+
description: "Max streams to schedule per run; remaining eligible streams are skipped."
40+
- name: lookbackHours
41+
type: number
42+
default: 24
43+
description: "Hours of data to sample when identifying KIs."
44+
- name: extractionIntervalHours
45+
type: number
46+
description: "Override for the min hours between extractions per stream. When omitted the Advanced Setting is used."
47+
- name: excludedStreamPatterns
48+
type: string
49+
description: "Override for the comma-separated exclude patterns. When omitted the Advanced Setting is used."
4450
steps:
4551
# Step 1: Fetch eligible streams from the classification endpoint.
4652
- name: get_eligible
@@ -51,7 +57,6 @@ steps:
5157
/internal/streams/_extraction/_eligible?maxScheduledStreams={{ inputs.maxScheduledStreams }}&lookbackHours={{ inputs.lookbackHours }}
5258
{%- if inputs.extractionIntervalHours %}&extractionIntervalHours={{ inputs.extractionIntervalHours }}{% endif -%}
5359
{%- if inputs.excludedStreamPatterns %}&excludedStreamPatterns={{ inputs.excludedStreamPatterns }}{% endif -%}
54-
use_server_info: true
5560
headers:
5661
x-elastic-internal-origin: "kibana"
5762
on-failure:
@@ -77,7 +82,6 @@ steps:
7782
with:
7883
method: POST
7984
path: "/internal/streams/{{ foreach.item.streamName }}/features/_task"
80-
use_server_info: true
8185
headers:
8286
x-elastic-internal-origin: "kibana"
8387
body:
@@ -98,7 +102,6 @@ steps:
98102
with:
99103
method: GET
100104
path: "/internal/streams/{{ foreach.item.streamName }}/features/_status"
101-
use_server_info: true
102105
headers:
103106
x-elastic-internal-origin: "kibana"
104107
- name: if_not_started_delay
@@ -126,7 +129,6 @@ steps:
126129
with:
127130
method: GET
128131
path: "/internal/streams/{{ foreach.item.streamName }}/features/_status"
129-
use_server_info: true
130132
headers:
131133
x-elastic-internal-origin: "kibana"
132134
on-failure:

x-pack/platform/plugins/shared/streams/server/routes/internal/sig_events/extraction/eligible_streams_route.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,17 @@ export interface EligibleStreamsResponse {
5353
};
5454
}
5555

56-
const NumberFromString = z.string().transform((value) => {
57-
const trimmed = value.trim();
58-
if (trimmed === '') {
59-
return undefined;
60-
}
61-
return Number(trimmed);
62-
});
56+
const NumberFromString = z
57+
.string()
58+
.optional()
59+
.transform((value) => {
60+
if (value === undefined) return undefined;
61+
const trimmed = value.trim();
62+
if (trimmed === '') {
63+
return undefined;
64+
}
65+
return Number(trimmed);
66+
});
6367

6468
const eligibleStreamsRoute = createServerRoute({
6569
endpoint: 'GET /internal/streams/_extraction/_eligible',

0 commit comments

Comments
 (0)