Skip to content

Commit 78ac5d5

Browse files
authored
[Failure store] Support failure store for system data streams (#126585)
In this PR we add support for the failure store for system data streams. Specifically: - We pass the system descriptor so the failure index can be created based on that. - We extend the tests to ensure it works - We remove a guard we had but I wasn't able to test it because it only gets triggered if the data stream gets created right after a failure in the ingest pipeline, and I didn't see how to add one (yet). - We extend the system data stream migration to ensure this is also working.
1 parent a73f923 commit 78ac5d5

File tree

6 files changed

+200
-36
lines changed

6 files changed

+200
-36
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java

+90
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.elasticsearch.client.internal.Client;
2525
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2626
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate.DataStreamTemplate;
27+
import org.elasticsearch.cluster.metadata.DataStream;
28+
import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
29+
import org.elasticsearch.cluster.metadata.DataStreamOptions;
2730
import org.elasticsearch.cluster.metadata.Template;
2831
import org.elasticsearch.cluster.project.ProjectResolver;
2932
import org.elasticsearch.cluster.project.TestProjectResolvers;
@@ -33,6 +36,8 @@
3336
import org.elasticsearch.common.network.NetworkModule;
3437
import org.elasticsearch.common.settings.Settings;
3538
import org.elasticsearch.common.xcontent.XContentHelper;
39+
import org.elasticsearch.index.mapper.DateFieldMapper;
40+
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
3641
import org.elasticsearch.indices.ExecutorNames;
3742
import org.elasticsearch.indices.SystemDataStreamDescriptor;
3843
import org.elasticsearch.indices.SystemDataStreamDescriptor.Type;
@@ -48,12 +53,15 @@
4853
import java.util.ArrayList;
4954
import java.util.Collection;
5055
import java.util.List;
56+
import java.util.Locale;
5157
import java.util.Map;
5258
import java.util.stream.Collectors;
5359

60+
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
5461
import static org.hamcrest.Matchers.containsString;
5562
import static org.hamcrest.Matchers.equalTo;
5663
import static org.hamcrest.Matchers.is;
64+
import static org.hamcrest.Matchers.startsWith;
5765

5866
public class SystemDataStreamIT extends ESIntegTestCase {
5967

@@ -62,6 +70,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
6270
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
6371
plugins.add(DataStreamsPlugin.class);
6472
plugins.add(TestSystemDataStreamPlugin.class);
73+
plugins.add(MapperExtrasPlugin.class);
6574
return plugins;
6675
}
6776

@@ -169,6 +178,63 @@ public void testDataStreamStats() throws Exception {
169178
}
170179
}
171180

181+
public void testSystemDataStreamWithFailureStore() throws Exception {
182+
String dataStreamName = ".test-failure-store";
183+
RequestOptions productHeader = RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "product").build();
184+
try (RestClient restClient = createRestClient()) {
185+
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
186+
indexRequest.setOptions(productHeader);
187+
String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
188+
indexRequest.setJsonEntity(
189+
String.format(Locale.ROOT, "{\"%s\":\"%s\",\"count\":\"not-a-number\"}", DEFAULT_TIMESTAMP_FIELD, value)
190+
);
191+
192+
Response indexResponse = restClient.performRequest(indexRequest);
193+
assertThat(indexResponse.getStatusLine().getStatusCode(), is(201));
194+
Map<String, Object> responseMap = XContentHelper.convertToMap(
195+
XContentType.JSON.xContent(),
196+
EntityUtils.toString(indexResponse.getEntity()),
197+
false
198+
);
199+
assertThat(responseMap.get("result"), equalTo("created"));
200+
assertThat((String) responseMap.get("_index"), startsWith(DataStream.FAILURE_STORE_PREFIX));
201+
assertThat(responseMap.get("failure_store"), equalTo("used"));
202+
203+
// Rollover
204+
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "::failures/_rollover");
205+
rolloverRequest.setOptions(productHeader);
206+
Response rolloverResponse = restClient.performRequest(rolloverRequest);
207+
assertThat(rolloverResponse.getStatusLine().getStatusCode(), is(200));
208+
responseMap = XContentHelper.convertToMap(
209+
XContentType.JSON.xContent(),
210+
EntityUtils.toString(rolloverResponse.getEntity()),
211+
false
212+
);
213+
assertThat(responseMap.get("acknowledged"), equalTo(true));
214+
assertThat(responseMap.get("rolled_over"), equalTo(true));
215+
assertThat((String) responseMap.get("new_index"), startsWith(DataStream.FAILURE_STORE_PREFIX));
216+
217+
// Edit data stream options
218+
Request editOptionsRequest = new Request("PUT", "/_data_stream/" + dataStreamName + "/_options");
219+
editOptionsRequest.setJsonEntity("{\"failure_store\":{\"enabled\":\"false\"}}");
220+
editOptionsRequest.setOptions(productHeader);
221+
Response editOptionsResponse = restClient.performRequest(editOptionsRequest);
222+
assertThat(editOptionsResponse.getStatusLine().getStatusCode(), is(200));
223+
responseMap = XContentHelper.convertToMap(
224+
XContentType.JSON.xContent(),
225+
EntityUtils.toString(editOptionsResponse.getEntity()),
226+
false
227+
);
228+
assertThat(responseMap.get("acknowledged"), equalTo(true));
229+
230+
// delete
231+
Request deleteRequest = new Request("DELETE", "/_data_stream/" + dataStreamName);
232+
deleteRequest.setOptions(productHeader);
233+
Response deleteResponse = restClient.performRequest(deleteRequest);
234+
assertThat(deleteResponse.getStatusLine().getStatusCode(), is(200));
235+
}
236+
}
237+
172238
@SuppressWarnings("unchecked")
173239
public void testSystemDataStreamReadWrite() throws Exception {
174240
try (RestClient restClient = createRestClient()) {
@@ -328,6 +394,30 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
328394
List.of("product"),
329395
"product",
330396
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
397+
),
398+
new SystemDataStreamDescriptor(
399+
".test-failure-store",
400+
"system data stream test with failure store",
401+
Type.EXTERNAL,
402+
ComposableIndexTemplate.builder()
403+
.indexPatterns(List.of(".test-failure-store"))
404+
.template(Template.builder().mappings(new CompressedXContent("""
405+
{
406+
"properties": {
407+
"@timestamp" : {
408+
"type": "date"
409+
},
410+
"count": {
411+
"type": "long"
412+
}
413+
}
414+
}""")).dataStreamOptions(new DataStreamOptions.Template(new DataStreamFailureStore.Template(true))))
415+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
416+
.build(),
417+
Map.of(),
418+
List.of("product"),
419+
"product",
420+
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
331421
)
332422
);
333423
} catch (IOException e) {

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

+1
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ private RolloverResult rolloverDataStream(
361361
now.toEpochMilli(),
362362
dataStreamName,
363363
templateV2,
364+
systemDataStreamDescriptor,
364365
newWriteIndexName,
365366
(builder, indexMetadata) -> builder.put(dataStream.rolloverFailureStore(indexMetadata.getIndex(), newGeneration))
366367
);

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -278,9 +278,6 @@ static ClusterState createDataStream(
278278
// responsibility to check that before setting.
279279
IndexMetadata failureStoreIndex = null;
280280
if (initializeFailureStore) {
281-
if (isSystem) {
282-
throw new IllegalArgumentException("Failure stores are not supported on system data streams");
283-
}
284281
String failureStoreIndexName = DataStream.getDefaultFailureStoreName(dataStreamName, initialGeneration, request.startTime());
285282
currentState = createFailureStoreIndex(
286283
metadataCreateIndexService,
@@ -291,6 +288,7 @@ static ClusterState createDataStream(
291288
request.startTime(),
292289
dataStreamName,
293290
template,
291+
systemDataStreamDescriptor,
294292
failureStoreIndexName,
295293
null
296294
);
@@ -420,6 +418,7 @@ public static ClusterState createFailureStoreIndex(
420418
long nameResolvedInstant,
421419
String dataStreamName,
422420
ComposableIndexTemplate template,
421+
SystemDataStreamDescriptor systemDataStreamDescriptor,
423422
String failureStoreIndexName,
424423
@Nullable BiConsumer<ProjectMetadata.Builder, IndexMetadata> metadataTransformer
425424
) throws Exception {
@@ -439,7 +438,8 @@ public static ClusterState createFailureStoreIndex(
439438
.performReroute(false)
440439
.setMatchingTemplate(template)
441440
.settings(indexSettings)
442-
.isFailureIndex(true);
441+
.isFailureIndex(true)
442+
.systemDataStreamDescriptor(systemDataStreamDescriptor);
443443

444444
try {
445445
currentState = metadataCreateIndexService.applyCreateIndexRequest(

x-pack/plugin/migrate/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ dependencies {
2525

2626
internalClusterTestImplementation project(path: ':modules:lang-painless')
2727
internalClusterTestImplementation project(path: ':modules:lang-painless:spi')
28+
internalClusterTestImplementation project(path: ':modules:mapper-extras')
2829
}
2930

3031
addQaCheckDependencies(project)

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/SystemDataStreamMigrationIT.java

+61-17
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2222
import org.elasticsearch.cluster.metadata.Template;
2323
import org.elasticsearch.common.Strings;
24+
import org.elasticsearch.common.compress.CompressedXContent;
2425
import org.elasticsearch.common.settings.Settings;
2526
import org.elasticsearch.datastreams.DataStreamsPlugin;
2627
import org.elasticsearch.index.Index;
2728
import org.elasticsearch.index.IndexVersion;
29+
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
2830
import org.elasticsearch.indices.ExecutorNames;
2931
import org.elasticsearch.indices.SystemDataStreamDescriptor;
3032
import org.elasticsearch.plugins.ActionPlugin;
@@ -33,6 +35,8 @@
3335
import org.elasticsearch.test.ESIntegTestCase;
3436
import org.junit.After;
3537

38+
import java.io.IOException;
39+
import java.io.UncheckedIOException;
3640
import java.util.ArrayList;
3741
import java.util.Collection;
3842
import java.util.List;
@@ -52,23 +56,38 @@ public class SystemDataStreamMigrationIT extends AbstractFeatureMigrationIntegTe
5256
);
5357

5458
private static SystemDataStreamDescriptor createSystemDataStreamDescriptor(IndexVersion indexVersion) {
55-
return new SystemDataStreamDescriptor(
56-
TEST_DATA_STREAM_NAME,
57-
"system data stream test",
58-
SystemDataStreamDescriptor.Type.EXTERNAL,
59-
ComposableIndexTemplate.builder()
60-
.template(
61-
Template.builder()
62-
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true))
63-
.settings(indexSettings(indexVersion, 1, 0))
64-
)
65-
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
66-
.build(),
67-
Map.of(),
68-
List.of("product"),
69-
ORIGIN,
70-
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
71-
);
59+
try {
60+
return new SystemDataStreamDescriptor(
61+
TEST_DATA_STREAM_NAME,
62+
"system data stream test",
63+
SystemDataStreamDescriptor.Type.EXTERNAL,
64+
ComposableIndexTemplate.builder()
65+
.template(
66+
Template.builder()
67+
.mappings(new CompressedXContent("""
68+
{
69+
"properties": {
70+
"@timestamp" : {
71+
"type": "date"
72+
},
73+
"count": {
74+
"type": "long"
75+
}
76+
}
77+
}"""))
78+
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true))
79+
.settings(indexSettings(indexVersion, 1, 0))
80+
)
81+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
82+
.build(),
83+
Map.of(),
84+
List.of("product"),
85+
ORIGIN,
86+
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
87+
);
88+
} catch (IOException e) {
89+
throw new UncheckedIOException(e);
90+
}
7291
}
7392

7493
@Override
@@ -87,6 +106,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
87106
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
88107
plugins.add(DataStreamsPlugin.class);
89108
plugins.add(DataStreamTestPlugin.class);
109+
plugins.add(MapperExtrasPlugin.class);
90110
return plugins;
91111
}
92112

@@ -110,6 +130,20 @@ private static void indexDocsToDataStream(String dataStreamName) {
110130

111131
BulkResponse actionGet = bulkBuilder.get();
112132
assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false));
133+
134+
// Index docs to failure store too
135+
bulkBuilder = client().prepareBulk();
136+
for (int i = 0; i < INDEX_DOC_COUNT; i++) {
137+
IndexRequestBuilder requestBuilder = ESIntegTestCase.prepareIndex(dataStreamName)
138+
.setId(Integer.toString(i))
139+
.setRequireDataStream(true)
140+
.setOpType(DocWriteRequest.OpType.CREATE)
141+
.setSource(DataStream.TIMESTAMP_FIELD_NAME, 1741271969000L, "count", "not-a-number");
142+
bulkBuilder.add(requestBuilder);
143+
}
144+
145+
actionGet = bulkBuilder.get();
146+
assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false));
113147
}
114148

115149
public void testMigrateSystemDataStream() throws Exception {
@@ -136,6 +170,16 @@ public void testMigrateSystemDataStream() throws Exception {
136170
assertThat(indexMetadata.isSystem(), is(true));
137171
assertThat(indexMetadata.getCreationVersion(), is(IndexVersion.current()));
138172
}
173+
174+
// Migrate action does not migrate the failure store indices
175+
// here we check that they are preserved.
176+
List<Index> failureIndices = dataStream.getFailureIndices();
177+
assertThat(failureIndices, hasSize(1));
178+
for (Index failureIndex : failureIndices) {
179+
IndexMetadata indexMetadata = finalMetadata.index(failureIndex);
180+
assertThat(indexMetadata.isSystem(), is(true));
181+
assertThat(indexMetadata.getCreationVersion(), is(IndexVersion.current()));
182+
}
139183
}
140184

141185
public void testMigrationRestartAfterFailure() throws Exception {

x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java

+43-15
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import org.elasticsearch.action.index.IndexRequest;
2020
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2121
import org.elasticsearch.cluster.metadata.DataStream;
22+
import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
2223
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
24+
import org.elasticsearch.cluster.metadata.DataStreamOptions;
2325
import org.elasticsearch.cluster.metadata.Metadata;
2426
import org.elasticsearch.cluster.metadata.Template;
2527
import org.elasticsearch.common.compress.CompressedXContent;
@@ -262,21 +264,47 @@ public static class SystemDataStreamTestPlugin extends Plugin implements SystemI
262264

263265
@Override
264266
public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
265-
return List.of(
266-
new SystemDataStreamDescriptor(
267-
SYSTEM_DATA_STREAM_NAME,
268-
"a system data stream for testing",
269-
SystemDataStreamDescriptor.Type.EXTERNAL,
270-
ComposableIndexTemplate.builder()
271-
.indexPatterns(List.of(SYSTEM_DATA_STREAM_NAME))
272-
.template(Template.builder().lifecycle(DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO)))
273-
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
274-
.build(),
275-
Map.of(),
276-
Collections.singletonList("test"),
277-
"test",
278-
new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE)
279-
)
267+
try {
268+
return List.of(
269+
new SystemDataStreamDescriptor(
270+
SYSTEM_DATA_STREAM_NAME,
271+
"a system data stream for testing",
272+
SystemDataStreamDescriptor.Type.EXTERNAL,
273+
ComposableIndexTemplate.builder()
274+
.indexPatterns(List.of(SYSTEM_DATA_STREAM_NAME))
275+
.template(
276+
Template.builder()
277+
.mappings(new CompressedXContent("""
278+
{
279+
"properties": {
280+
"@timestamp" : {
281+
"type": "date"
282+
},
283+
"count": {
284+
"type": "long"
285+
}
286+
}
287+
}"""))
288+
.lifecycle(DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO))
289+
.dataStreamOptions(new DataStreamOptions.Template(new DataStreamFailureStore.Template(true)))
290+
)
291+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
292+
.build(),
293+
Map.of(),
294+
Collections.singletonList("test"),
295+
"test",
296+
new ExecutorNames(
297+
ThreadPool.Names.SYSTEM_CRITICAL_READ,
298+
ThreadPool.Names.SYSTEM_READ,
299+
ThreadPool.Names.SYSTEM_WRITE
300+
)
301+
)
302+
);
303+
} catch (IOException e) {
304+
fail(e.getMessage());
305+
}
306+
throw new IllegalStateException(
307+
"Something went wrong, it should have either returned the descriptor or it should have thrown an assertion error"
280308
);
281309
}
282310

0 commit comments

Comments
 (0)