Skip to content

Commit 4c0b355

Browse files
🌊 Streams: Fix migration on read for stream definitions (elastic#220878)
Fixes elastic#220395 This is mostly meant to address the existing issue - there is a separate issue to track a more long-term solution to the issue. Here, we make sure that an empty description is added when a stream definition is loaded from source - it might not be available because the description property was added after streams was initially released. It also adds an API integration to test for this - this should notify us if the problem ever occurs again. --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
1 parent 10b8f72 commit 4c0b355

5 files changed

Lines changed: 141 additions & 1 deletion

File tree

‎x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import { SecurityError } from './errors/security_error';
2626
import { State } from './state_management/state';
2727
import { StatusError } from './errors/status_error';
2828
import { ASSET_ID, ASSET_TYPE } from './assets/fields';
29+
import { migrateOnRead } from './helpers/migrate_on_read';
2930

3031
interface AcknowledgeResponse<TResult extends Result> {
3132
acknowledged: true;
@@ -339,7 +340,7 @@ export class StreamsClient {
339340
try {
340341
const response = await this.dependencies.storageClient.get({ id: name });
341342

342-
const streamDefinition = response._source!;
343+
const streamDefinition = migrateOnRead(response._source!);
343344

344345
Streams.all.Definition.asserts(streamDefinition);
345346

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import { Streams } from '@kbn/streams-schema';
9+
10+
export function migrateOnRead(definition: Streams.all.Definition): Streams.all.Definition {
11+
if (typeof definition.description !== 'string') {
12+
return {
13+
...definition,
14+
description: '',
15+
};
16+
}
17+
return definition;
18+
}

‎x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/stream_active_record/stream_from_definition.ts‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ import type { StreamActiveRecord } from './stream_active_record';
1111
import { UnwiredStream } from '../streams/unwired_stream';
1212
import { WiredStream } from '../streams/wired_stream';
1313
import { GroupStream } from '../streams/group_stream';
14+
import { migrateOnRead } from '../../helpers/migrate_on_read';
1415

1516
// This should be the only thing that knows about the various stream types
1617
export function streamFromDefinition(
1718
definition: Streams.all.Definition,
1819
dependencies: StateDependencies
1920
): StreamActiveRecord {
21+
definition = migrateOnRead(definition);
2022
if (Streams.WiredStream.Definition.is(definition)) {
2123
return new WiredStream(definition, dependencies);
2224
} else if (Streams.UnwiredStream.Definition.is(definition)) {

‎x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext)
2323
loadTestFile(require.resolve('./queries'));
2424
loadTestFile(require.resolve('./discover'));
2525
loadTestFile(require.resolve('./content'));
26+
loadTestFile(require.resolve('./migration_on_read'));
2627
});
2728
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import expect from '@kbn/expect';
9+
import { Streams } from '@kbn/streams-schema';
10+
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
11+
import { disableStreams, enableStreams, indexDocument } from './helpers/requests';
12+
import {
13+
StreamsSupertestRepositoryClient,
14+
createStreamsRepositoryAdminClient,
15+
} from './helpers/repository_client';
16+
17+
const TEST_STREAM_NAME = 'logs-test-default';
18+
19+
const streamDefinition = {
20+
name: TEST_STREAM_NAME,
21+
ingest: {
22+
lifecycle: {
23+
ilm: {
24+
policy: 'logs-default',
25+
},
26+
},
27+
processing: [
28+
{
29+
grok: {
30+
field: 'message',
31+
patterns: [
32+
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
33+
],
34+
if: { always: {} },
35+
},
36+
},
37+
],
38+
unwired: {},
39+
},
40+
};
41+
42+
const expectedStreamsResponse: Streams.UnwiredStream.Definition = {
43+
name: TEST_STREAM_NAME,
44+
description: '',
45+
ingest: {
46+
lifecycle: {
47+
ilm: {
48+
policy: 'logs-default',
49+
},
50+
},
51+
processing: [
52+
{
53+
grok: {
54+
field: 'message',
55+
patterns: [
56+
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
57+
],
58+
if: { always: {} },
59+
},
60+
},
61+
],
62+
unwired: {},
63+
},
64+
};
65+
66+
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
67+
const roleScopedSupertest = getService('roleScopedSupertest');
68+
let apiClient: StreamsSupertestRepositoryClient;
69+
const esClient = getService('es');
70+
71+
// This test verifies that it's still possible to read an existing stream definition without
72+
// error. If it fails, it indicates that the migration logic is not working as expected.
73+
describe('read existing stream definition format', () => {
74+
before(async () => {
75+
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
76+
await enableStreams(apiClient);
77+
});
78+
79+
after(async () => {
80+
await disableStreams(apiClient);
81+
});
82+
83+
it('should read and return existing orphaned classic stream', async () => {
84+
await esClient.index({
85+
index: '.kibana_streams-000001',
86+
id: TEST_STREAM_NAME,
87+
document: streamDefinition,
88+
});
89+
90+
// Refresh the index to make the document searchable
91+
await esClient.indices.refresh({ index: '.kibana_streams-000001' });
92+
const getResponse = await apiClient.fetch('GET /api/streams/{name} 2023-10-31', {
93+
params: {
94+
path: { name: TEST_STREAM_NAME },
95+
},
96+
});
97+
98+
expect(getResponse.status).to.eql(200);
99+
expect(getResponse.body.stream).to.eql(expectedStreamsResponse);
100+
});
101+
102+
it('should read and return existing regular classic stream', async () => {
103+
const doc = {
104+
message: '2023-01-01T00:00:10.000Z error test',
105+
};
106+
const response = await indexDocument(esClient, TEST_STREAM_NAME, doc);
107+
expect(response.result).to.eql('created');
108+
const getResponse = await apiClient.fetch('GET /api/streams/{name} 2023-10-31', {
109+
params: {
110+
path: { name: TEST_STREAM_NAME },
111+
},
112+
});
113+
114+
expect(getResponse.status).to.eql(200);
115+
expect(getResponse.body.stream).to.eql(expectedStreamsResponse);
116+
});
117+
});
118+
}

0 commit comments

Comments
 (0)