Skip to content

Commit fd9180a

Browse files
teresaromeropaulinashakirova
authored andcommitted
[Fleet] Support data_stream.type variable in simplified policy API (elastic#269895)
## Summary Closes elastic#266321 The simplified package policy API did not support `data_stream.type` as an overridable variable. Input-only packages define a default data stream type in their manifest (for example, `metrics`), but there was no way to override it via the simplified API — any attempt threw a `Variable not found` error because the variable was not synthesised into the policy's stream vars. This blocked elastic-package policy tests that rely on the simplified API to set a non-default `data_stream.type`. This PR adds full support for `data_stream.type` as a top-level variable in input-only package policies, following the same pattern as `data_stream.dataset` and `use_apm`. ### What changed **`policy_template.ts`** - Adds `DATA_STREAM_TYPE_VAR` — a synthetic `RegistryVarsEntry` definition for `data_stream.type`. - Adds `addDataStreamTypeVarIfNotPresent` and `shouldIncludeDataStreamTypeVar` helpers, mirroring the existing dataset helpers. - `getNormalizedDataStreams` now injects the synthetic `data_stream.type` var into every input-only stream's vars (with the package's default type as the default value), unless the policy template declares `dynamic_signal_types: true` — in which case the type is resolved at runtime and the var is intentionally omitted. **`simplified_package_policy_helper.ts`** - Adds `syncDataStreamTypeFromVar`: iterates all streams and, when `stream.vars['data_stream.type'].value` differs from `stream.data_stream.type`, updates the field in place. This is what makes the compiled agent YAML and index template lookup use the user-supplied type rather than the package default. - `simplifiedPackagePolicytoNewPackagePolicy` calls `syncDataStreamTypeFromVar` before returning, so the returned `NewPackagePolicy` is consistent. **`package_policy.ts` (server)** - Calls `syncDataStreamTypeFromVar` before `preflightCheckPackagePolicy` in both `create()` and `bulkCreate()`, covering the direct Fleet API path in addition to the simplified API. **`package_policies_to_agent_permissions.ts`** - When computing index permissions, reads `stream.vars['data_stream.type'].value` first and falls back to `stream.data_stream.type`. This ensures the correct index prefix (`logs-*` vs `metrics-*`) is granted when the type has been overridden. **Rejection for `dynamic_signal_types` packages** Packages with `dynamic_signal_types: true` (for example, OTel collector) do not get the `data_stream.type` var injected. Because the var is absent from `varsRecord`, `assignVariables` throws a `PackagePolicyValidationError` if a caller attempts to set it, preventing misuse. ### Testing - Unit tests added and updated in `policy_template.test.ts`, `simplified_package_policy_helper.test.ts`, and `package_policies_to_agent_permissions.test.ts`. - Manually validated end-to-end using the test fixture from [elastic/elastic-package#3509](https://github.com/elastic/elastic-package/pull/3509/changes#diff-d01b43d83cadd491f2e69dca24265787b53b9513812615389d30bbb63c7b9664) with the `policy_api_format` workaround removed from the test configuration. The downloaded agent policy correctly showed `data_stream.type: logs` in the stream and `logs-*-*` in the index permissions. --- ### Checklist - [ ] Any text added follows [EUI's writing guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses sentence case text and includes [i18n support](https://github.com/elastic/kibana/blob/main/src/platform/packages/shared/kbn-i18n/README.md) - [ ] [Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html) was added for features that require explanation or tutorials - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios - [ ] If a plugin configuration key changed, check if it needs to be allowlisted in the cloud and added to the [docker list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker) - [ ] This was checked for breaking HTTP API changes, and any breaking changes have been approved by the breaking-change committee. The `release_note:breaking` label should be applied in these situations. - [ ] [Flaky Test Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was used on any tests changed - [ ] The PR description includes the appropriate Release Notes section, and the correct `release_note:*` label is applied per the [guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process) - [ ] Review the [backport guidelines](https://docs.google.com/document/d/1VyN5k91e5OVumlc0Gb9RPa3h1ewuPE705nRtioPiTvY/edit?usp=sharing) and apply applicable `backport:*` labels. ### Identify risks - **Immutable type after creation** — like `data_stream.dataset`, the `data_stream.type` var is immutable once the policy is saved. An attempt to update it is rejected with a validation error (existing guard in `package_policy.ts`). Low risk; consistent with existing dataset immutability. - **Input-only packages only** — the synthetic var is only injected for `type: input` packages. Composable integration packages are unaffected. - [ ] [See some risk examples](https://github.com/elastic/kibana/blob/main/RISK_MATRIX.mdx) ### Release note ``` Fleet now supports overriding `data_stream.type` via the simplified package policy API for input-only packages. Setting this variable routes collected data to a different Elasticsearch index prefix (e.g. `logs-*` instead of `metrics-*`) and grants the appropriate index permissions to the agent. ```
1 parent dd46233 commit fd9180a

15 files changed

Lines changed: 565 additions & 10 deletions

.buildkite/pipelines/storybooks_from_pr.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ steps:
1717
diskType: hyperdisk-balanced
1818
preemptible: true
1919
spotZones: us-central1-a,us-central1-b,us-central1-c,us-central1-f,us-east1-b,us-east1-c,us-east1-d,us-west1-a,us-west1-b,us-west1-c
20-
preemptible: true
2120
diskSizeGb: 105
2221
timeout_in_minutes: 50
2322
retry:

x-pack/platform/plugins/shared/fleet/common/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ export {
8787
// Package policy helpers
8888
isValidNamespace,
8989
isValidDataset,
90+
isValidDataStreamType,
9091
INVALID_NAMESPACE_CHARACTERS,
92+
VALID_DATA_STREAM_TYPES,
9193
getFileMetadataIndexName,
9294
getFileDataIndexName,
9395
removeSOAttributes,

x-pack/platform/plugins/shared/fleet/common/services/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ export { fullAgentPolicyToYaml } from './full_agent_policy_to_yaml';
2727
export { isPackageLimited, doesAgentPolicyAlreadyIncludePackage } from './limited_package';
2828
export {
2929
isValidDataset,
30+
isValidDataStreamType,
3031
isValidNamespace,
3132
INVALID_NAMESPACE_CHARACTERS,
33+
VALID_DATA_STREAM_TYPES,
3234
} from './is_valid_namespace';
3335
export { isDiffPathProtocol } from './is_diff_path_protocol';
3436
export { LicenseService } from './license';
@@ -47,10 +49,14 @@ export {
4749
MINIMUM_PRIVILEGE_LEVEL_CHANGE_AGENT_VERSION,
4850
isAgentEligibleForPrivilegeLevelChange,
4951
} from './agent_privilege_level_change_helpers';
52+
export { syncDataStreamTypeFromVar } from './simplified_package_policy_helper';
5053
export {
5154
addUseAPMVarIfNotPresent,
5255
DATA_STREAM_USE_APM_VAR,
5356
shouldIncludeUseAPMVar,
57+
addDataStreamTypeVarIfNotPresent,
58+
DATA_STREAM_TYPE_VAR,
59+
shouldIncludeDataStreamTypeVar,
5460
isInputOnlyPolicyTemplate,
5561
isIntegrationPolicyTemplate,
5662
getNormalizedInputs,

x-pack/platform/plugins/shared/fleet/common/services/is_valid_namespace.test.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* 2.0.
66
*/
77

8-
import { isValidNamespace } from './is_valid_namespace';
8+
import { isValidNamespace, isValidDataStreamType } from './is_valid_namespace';
99

1010
describe('Fleet - isValidNamespace', () => {
1111
it('returns true for valid namespaces', () => {
@@ -51,3 +51,28 @@ describe('Fleet - isValidNamespace', () => {
5151
expect(isValidNamespace('qaenv', false, ['prod', 'qa']).valid).toBe(true);
5252
});
5353
});
54+
55+
describe('Fleet - isValidDataStreamType', () => {
56+
it('returns true for each valid type', () => {
57+
expect(isValidDataStreamType('logs').valid).toBe(true);
58+
expect(isValidDataStreamType('metrics').valid).toBe(true);
59+
expect(isValidDataStreamType('traces').valid).toBe(true);
60+
expect(isValidDataStreamType('synthetics').valid).toBe(true);
61+
expect(isValidDataStreamType('profiling').valid).toBe(true);
62+
});
63+
64+
it('returns false for unknown types', () => {
65+
expect(isValidDataStreamType('bogus').valid).toBe(false);
66+
expect(isValidDataStreamType('LOGS').valid).toBe(false);
67+
expect(isValidDataStreamType('').valid).toBe(false);
68+
});
69+
70+
it('returns true for blank when allowBlank is true', () => {
71+
expect(isValidDataStreamType('', true).valid).toBe(true);
72+
expect(isValidDataStreamType(' ', true).valid).toBe(true);
73+
});
74+
75+
it('returns false for blank when allowBlank is false', () => {
76+
expect(isValidDataStreamType('', false).valid).toBe(false);
77+
});
78+
});

x-pack/platform/plugins/shared/fleet/common/services/is_valid_namespace.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import { i18n } from '@kbn/i18n';
99

10+
import type { PackageDataStreamTypes } from '../types';
11+
1012
// Namespace string eventually becomes part of an index name. This method partially implements index name rules from
1113
// https://github.com/elastic/elasticsearch/blob/master/docs/reference/indices/create-index.asciidoc
1214
// and implements a limit based on https://github.com/elastic/kibana/issues/75846
@@ -118,3 +120,30 @@ function isValidEntity(
118120
}
119121

120122
export const INVALID_NAMESPACE_CHARACTERS = /[\*\\/\?"<>|\s,#:-]+/;
123+
124+
export const VALID_DATA_STREAM_TYPES: readonly PackageDataStreamTypes[] = [
125+
'logs',
126+
'metrics',
127+
'traces',
128+
'synthetics',
129+
'profiling',
130+
];
131+
132+
export function isValidDataStreamType(
133+
type: string,
134+
allowBlank?: boolean
135+
): { valid: boolean; error?: string } {
136+
if (!type.trim() && allowBlank) {
137+
return { valid: true };
138+
}
139+
if (!VALID_DATA_STREAM_TYPES.includes(type as PackageDataStreamTypes)) {
140+
return {
141+
valid: false,
142+
error: i18n.translate('xpack.fleet.dataStreamTypeValidation.invalidValueErrorMessage', {
143+
defaultMessage: 'Data stream type must be one of: {allowedTypes}',
144+
values: { allowedTypes: VALID_DATA_STREAM_TYPES.join(', ') },
145+
}),
146+
};
147+
}
148+
return { valid: true };
149+
}

x-pack/platform/plugins/shared/fleet/common/services/policy_template.test.ts

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import {
3030
packagePolicyInputAllowsUndefinedDataStreamType,
3131
hasDynamicSignalTypes,
3232
shouldIncludeUseAPMVar,
33+
shouldIncludeDataStreamTypeVar,
34+
addDataStreamTypeVarIfNotPresent,
3335
} from './policy_template';
3436

3537
describe('isInputOnlyPolicyTemplate', () => {
@@ -198,6 +200,54 @@ describe('shouldIncludeUseAPMVar', () => {
198200
});
199201
});
200202

203+
describe('shouldIncludeDataStreamTypeVar', () => {
204+
it('returns true when dynamic_signal_types is false', () => {
205+
expect(shouldIncludeDataStreamTypeVar(false)).toBe(true);
206+
});
207+
208+
it('returns false when dynamic_signal_types is true', () => {
209+
expect(shouldIncludeDataStreamTypeVar(true)).toBe(false);
210+
});
211+
});
212+
213+
describe('addDataStreamTypeVarIfNotPresent', () => {
214+
it('adds the data_stream.type var with the given default when not already present', () => {
215+
const result = addDataStreamTypeVarIfNotPresent([], 'metrics');
216+
expect(result).toHaveLength(1);
217+
expect(result[0].name).toEqual('data_stream.type');
218+
expect(result[0].default).toEqual('metrics');
219+
expect(result[0].show_user).toEqual(false);
220+
});
221+
222+
it('uses empty array when vars is undefined', () => {
223+
const result = addDataStreamTypeVarIfNotPresent(undefined, 'logs');
224+
expect(result).toHaveLength(1);
225+
expect(result[0].name).toEqual('data_stream.type');
226+
});
227+
228+
it('does not set a default when defaultType is not provided', () => {
229+
const result = addDataStreamTypeVarIfNotPresent([]);
230+
expect(result).toHaveLength(1);
231+
expect(result[0].name).toEqual('data_stream.type');
232+
expect(result[0].default).toBeUndefined();
233+
});
234+
235+
it('does not add the var when data_stream.type is already present', () => {
236+
const existing = {
237+
name: 'data_stream.type',
238+
type: 'text' as RegistryVarType,
239+
title: 'existing',
240+
description: 'existing',
241+
multi: false,
242+
required: false,
243+
show_user: true,
244+
};
245+
const result = addDataStreamTypeVarIfNotPresent([existing], 'metrics');
246+
expect(result).toHaveLength(1);
247+
expect(result[0]).toBe(existing);
248+
});
249+
});
250+
201251
describe('getNormalizedDataStreams', () => {
202252
const integrationPkg: PackageInfo = {
203253
name: 'nginx',
@@ -324,7 +374,88 @@ describe('getNormalizedDataStreams', () => {
324374
});
325375
expect(result).toHaveLength(1);
326376
expect(result[0].streams).toHaveLength(1);
327-
expect(result?.[0].streams?.[0]?.vars).toEqual([datasetVar]);
377+
const vars = result?.[0].streams?.[0]?.vars;
378+
// dataset var should not be duplicated
379+
expect(vars?.filter((v) => v.name === 'data_stream.dataset')).toHaveLength(1);
380+
expect(vars?.find((v) => v.name === 'data_stream.dataset')).toMatchObject(datasetVar);
381+
});
382+
383+
it('should add synthetic data_stream.type var with policyTemplate.type as default', () => {
384+
const result = getNormalizedDataStreams({
385+
...integrationPkg,
386+
type: 'input',
387+
policy_templates: [
388+
{
389+
input: 'logfile',
390+
type: 'logs',
391+
name: 'myinput',
392+
template_path: 'some/path.hbl',
393+
title: 'My Input',
394+
description: 'My Input',
395+
vars: [],
396+
},
397+
],
398+
});
399+
expect(result).toHaveLength(1);
400+
const vars = result[0].streams![0].vars;
401+
const typeVar = vars?.find((v) => v.name === 'data_stream.type');
402+
expect(typeVar).toBeDefined();
403+
expect(typeVar?.default).toEqual('logs');
404+
expect(typeVar?.show_user).toEqual(false);
405+
});
406+
407+
it('should not add data_stream.type var when already declared by the package', () => {
408+
const existingTypeVar = {
409+
name: 'data_stream.type',
410+
type: 'text' as RegistryVarType,
411+
title: 'custom type',
412+
description: 'custom',
413+
multi: false,
414+
required: false,
415+
show_user: true,
416+
};
417+
const result = getNormalizedDataStreams({
418+
...integrationPkg,
419+
type: 'input',
420+
policy_templates: [
421+
{
422+
input: 'logfile',
423+
type: 'logs',
424+
name: 'myinput',
425+
template_path: 'some/path.hbl',
426+
title: 'My Input',
427+
description: 'My Input',
428+
vars: [existingTypeVar],
429+
},
430+
],
431+
});
432+
expect(result).toHaveLength(1);
433+
const vars = result[0].streams![0].vars;
434+
const typeVars = vars?.filter((v) => v.name === 'data_stream.type');
435+
expect(typeVars).toHaveLength(1);
436+
expect(typeVars![0]).toMatchObject(existingTypeVar);
437+
});
438+
439+
it('should not add data_stream.type var when dynamic_signal_types is true', () => {
440+
const result = getNormalizedDataStreams({
441+
...integrationPkg,
442+
type: 'input',
443+
policy_templates: [
444+
{
445+
input: 'otelcol',
446+
name: 'otel-dynamic',
447+
template_path: 'some/path.hbl',
448+
title: 'OTel Dynamic',
449+
description: 'OTel with dynamic signal types',
450+
dynamic_signal_types: true,
451+
vars: [],
452+
},
453+
],
454+
} as any);
455+
expect(result).toHaveLength(1);
456+
const vars = result[0].streams![0].vars;
457+
const typeVar = vars?.find((v) => v.name === 'data_stream.type');
458+
expect(typeVar).toBeUndefined();
328459
});
329460

330461
const inputPkg: PackageInfo = {

x-pack/platform/plugins/shared/fleet/common/services/policy_template.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { i18n } from '@kbn/i18n';
99

1010
import {
1111
DATASET_VAR_NAME,
12+
DATA_STREAM_TYPE_VAR_NAME,
1213
dataTypes,
1314
OTEL_COLLECTOR_INPUT_TYPE,
1415
USE_APM_VAR_NAME,
@@ -43,6 +44,21 @@ const DATA_STREAM_DATASET_VAR: RegistryVarsEntry = {
4344
show_user: true,
4445
};
4546

47+
export const DATA_STREAM_TYPE_VAR: RegistryVarsEntry = {
48+
name: DATA_STREAM_TYPE_VAR_NAME,
49+
type: 'text',
50+
title: i18n.translate('xpack.fleet.policyTemplate.dataStreamTypeVar.title', {
51+
defaultMessage: 'Data stream type',
52+
}),
53+
description: i18n.translate('xpack.fleet.policyTemplate.dataStreamTypeVar.description', {
54+
defaultMessage:
55+
'Set the type for your data stream. Valid values are logs, metrics, traces, and synthetics.',
56+
}),
57+
multi: false,
58+
required: false,
59+
show_user: false,
60+
};
61+
4662
export const DATA_STREAM_USE_APM_VAR: RegistryVarsEntry = {
4763
name: USE_APM_VAR_NAME,
4864
type: 'bool',
@@ -202,6 +218,9 @@ export function getNormalizedDataStreams(
202218
const dataset = datasetName || createDefaultDatasetName(packageInfo, policyTemplate);
203219

204220
let vars = addDatasetVarIfNotPresent(policyTemplate.vars, policyTemplate.name);
221+
if (shouldIncludeDataStreamTypeVar(policyTemplate.dynamic_signal_types === true)) {
222+
vars = addDataStreamTypeVarIfNotPresent(vars, policyTemplate.type);
223+
}
205224
if (
206225
shouldIncludeUseAPMVar(
207226
policyTemplate.input,
@@ -288,6 +307,26 @@ export const addUseAPMVarIfNotPresent = (vars?: RegistryVarsEntry[]): RegistryVa
288307
}
289308
};
290309

310+
export const shouldIncludeDataStreamTypeVar = (isDynamicSignalTypes: boolean): boolean =>
311+
!isDynamicSignalTypes;
312+
313+
export const addDataStreamTypeVarIfNotPresent = (
314+
vars?: RegistryVarsEntry[],
315+
defaultType?: string
316+
): RegistryVarsEntry[] => {
317+
const newVars = vars ?? [];
318+
319+
const isDataStreamTypeVarAlreadyAdded = newVars.find(
320+
(varEntry) => varEntry.name === DATA_STREAM_TYPE_VAR_NAME
321+
);
322+
323+
if (isDataStreamTypeVarAlreadyAdded) {
324+
return newVars;
325+
} else {
326+
return [...newVars, { ...DATA_STREAM_TYPE_VAR, ...(defaultType && { default: defaultType }) }];
327+
}
328+
};
329+
291330
const createDefaultDatasetName = (
292331
packageInfo: { name: string },
293332
policyTemplate: { name: string }

0 commit comments

Comments
 (0)