Skip to content

Commit e40b47d

Browse files
authored
Introduce system generated ingest pipeline (#17817)
Signed-off-by: Bo Zhang <[email protected]>
1 parent 1ee8822 commit e40b47d

23 files changed

+2059
-392
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1919
- Allow to get the search request from the QueryCoordinatorContext ([#17818](https://github.com/opensearch-project/OpenSearch/pull/17818))
2020
- Improve sort-query performance by retaining the default `totalHitsThreshold` for approximated `match_all` queries ([#18189](https://github.com/opensearch-project/OpenSearch/pull/18189))
2121
- Enable testing for ExtensiblePlugins using classpath plugins ([#16908](https://github.com/opensearch-project/OpenSearch/pull/16908))
22+
- Introduce system generated ingest pipeline ([#17817](https://github.com/opensearch-project/OpenSearch/pull/17817)))
2223

2324
### Changed
2425

server/src/internalClusterTest/java/org/opensearch/index/FinalPipelineIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void testFinalPipelineCantChangeDestination() {
111111
IllegalStateException.class,
112112
() -> client().prepareIndex("index").setId("1").setSource(Collections.singletonMap("field", "value")).get()
113113
);
114-
assertThat(e, hasToString(containsString("final pipeline [final_pipeline] can't change the target index")));
114+
assertThat(e, hasToString(containsString("FINAL pipeline [final_pipeline] can't change the target index")));
115115
}
116116

117117
public void testFinalPipelineOfOldDestinationIsNotInvoked() {

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
251251
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
252252
if (indexRequest != null) {
253253
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
254-
boolean indexRequestHasPipeline = IngestService.resolvePipelines(actionRequest, indexRequest, metadata);
254+
boolean indexRequestHasPipeline = ingestService.resolvePipelines(actionRequest, indexRequest, metadata);
255255
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
256256
}
257257

server/src/main/java/org/opensearch/action/index/IndexRequest.java

+22
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
123123

124124
private String pipeline;
125125
private String finalPipeline;
126+
private String systemIngestPipeline;
126127

127128
private boolean isPipelineResolved;
128129

@@ -158,6 +159,9 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
158159
versionType = VersionType.fromValue(in.readByte());
159160
pipeline = in.readOptionalString();
160161
finalPipeline = in.readOptionalString();
162+
if (in.getVersion().onOrAfter(Version.V_3_1_0)) {
163+
systemIngestPipeline = in.readOptionalString();
164+
}
161165
isPipelineResolved = in.readBoolean();
162166
isRetry = in.readBoolean();
163167
autoGeneratedTimestamp = in.readLong();
@@ -314,6 +318,21 @@ public String getPipeline() {
314318
return this.pipeline;
315319
}
316320

321+
/**
322+
* Sets the system ingest pipeline to be executed before indexing the document
323+
*/
324+
public IndexRequest setSystemIngestPipeline(final String systemIngestPipeline) {
325+
this.systemIngestPipeline = systemIngestPipeline;
326+
return this;
327+
}
328+
329+
/**
330+
* Returns the system ingest pipeline to be executed before indexing the document
331+
*/
332+
public String getSystemIngestPipeline() {
333+
return this.systemIngestPipeline;
334+
}
335+
317336
/**
318337
* Sets the final ingest pipeline to be executed before indexing the document.
319338
*
@@ -668,6 +687,9 @@ private void writeBody(StreamOutput out) throws IOException {
668687
out.writeByte(versionType.getValue());
669688
out.writeOptionalString(pipeline);
670689
out.writeOptionalString(finalPipeline);
690+
if (out.getVersion().onOrAfter(Version.V_3_1_0)) {
691+
out.writeOptionalString(systemIngestPipeline);
692+
}
671693
out.writeBoolean(isPipelineResolved);
672694
out.writeBoolean(isRetry);
673695
out.writeLong(autoGeneratedTimestamp);

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,7 @@ public void apply(Settings value, Settings current, Settings previous) {
413413
ClusterService.USER_DEFINED_METADATA,
414414
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
415415
IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS,
416+
IngestService.SYSTEM_INGEST_PIPELINE_ENABLED,
416417
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
417418
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
418419
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest;
10+
11+
import java.util.Map;
12+
13+
/**
14+
* Abstract base class for batch system generated processors.
15+
*
16+
* System processors should not be used in the regular ingest pipelines.
17+
*
18+
* @opensearch.internal
19+
*/
20+
public abstract class AbstractBatchingSystemProcessor extends AbstractBatchingProcessor {
21+
protected AbstractBatchingSystemProcessor(String tag, String description, int batchSize) {
22+
super(tag, description, batchSize);
23+
}
24+
25+
@Override
26+
public boolean isSystemGenerated() {
27+
return true;
28+
}
29+
30+
/**
31+
* Factory class for creating {@link AbstractBatchingSystemProcessor} instances systematically.
32+
*
33+
* Since the processor config is generated based on the index config so the batch size info should also be defined
34+
* as part of it. And different processors can have their own logic to decide the batch size so let each
35+
* implementation of the newProcessor to handle it.
36+
*
37+
* @opensearch.internal
38+
*/
39+
public abstract static class Factory implements Processor.Factory {
40+
final String processorType;
41+
42+
protected Factory(String processorType) {
43+
this.processorType = processorType;
44+
}
45+
46+
@Override
47+
public boolean isSystemGenerated() {
48+
return true;
49+
}
50+
51+
/**
52+
* Creates a new processor instance. It will be invoked systematically.
53+
*
54+
* @param processorFactories The processor factories.
55+
* @param tag The processor tag.
56+
* @param description The processor description.
57+
* @param config The processor configuration.
58+
* @return The new AbstractBatchProcessor instance.
59+
* @throws Exception If the processor could not be created.
60+
*/
61+
@Override
62+
public AbstractBatchingSystemProcessor create(
63+
Map<String, Processor.Factory> processorFactories,
64+
String tag,
65+
String description,
66+
Map<String, Object> config
67+
) throws Exception {
68+
return newProcessor(tag, description, config);
69+
}
70+
71+
/**
72+
* Returns a new processor instance. It will be invoked systematically.
73+
*
74+
* @param tag tag of the processor
75+
* @param description description of the processor
76+
* @param config configuration of the processor
77+
* @return a new batch processor instance
78+
*/
79+
protected abstract AbstractBatchingSystemProcessor newProcessor(String tag, String description, Map<String, Object> config);
80+
}
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest;
10+
11+
import org.opensearch.action.DocWriteRequest;
12+
import org.opensearch.action.index.IndexRequest;
13+
14+
import java.util.List;
15+
16+
/**
17+
* A wrapper for the index request to help execute the ingest pipelines.
18+
*/
19+
public class IndexRequestWrapper {
20+
/**
21+
* slot of the IndexRequestWrapper is the index of the request in the list of the requests.
22+
* It can be used to map the ingested result or exception to right index request.
23+
*/
24+
private final int slot;
25+
private final IndexRequest indexRequest;
26+
private final DocWriteRequest<?> actionRequest;
27+
private final List<IngestPipelineInfo> pipelineInfoList;
28+
29+
public IndexRequestWrapper(
30+
int slot,
31+
IndexRequest indexRequest,
32+
DocWriteRequest<?> actionRequest,
33+
List<IngestPipelineInfo> pipelineInfoList
34+
) {
35+
this.slot = slot;
36+
this.indexRequest = indexRequest;
37+
this.actionRequest = actionRequest;
38+
this.pipelineInfoList = pipelineInfoList;
39+
}
40+
41+
public int getSlot() {
42+
return slot;
43+
}
44+
45+
public IndexRequest getIndexRequest() {
46+
return indexRequest;
47+
}
48+
49+
public DocWriteRequest<?> getActionRequest() {
50+
return actionRequest;
51+
}
52+
53+
public List<IngestPipelineInfo> getIngestPipelineInfoList() {
54+
return pipelineInfoList;
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest;
10+
11+
import reactor.util.annotation.NonNull;
12+
13+
/**
14+
* Ingest pipeline info help hold the pipeline id and type.
15+
*/
16+
public class IngestPipelineInfo {
17+
private final String pipelineId;
18+
private final IngestPipelineType type;
19+
20+
public IngestPipelineInfo(final @NonNull String pipelineId, final @NonNull IngestPipelineType type) {
21+
this.pipelineId = pipelineId;
22+
this.type = type;
23+
}
24+
25+
public String getPipelineId() {
26+
return pipelineId;
27+
}
28+
29+
public IngestPipelineType getType() {
30+
return type;
31+
}
32+
33+
@Override
34+
public String toString() {
35+
return pipelineId + ":" + type.name();
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest;
10+
11+
/**
12+
* An enum for the ingest pipeline type
13+
*/
14+
public enum IngestPipelineType {
15+
/**
16+
* Default pipeline is the pipeline provided through the index request or defined in
17+
* the index settings as the default pipeline.
18+
*/
19+
DEFAULT,
20+
/**
21+
* Final pipeline is the one defined in the index settings as the final pipeline.
22+
*/
23+
FINAL,
24+
/**
25+
* System final pipeline is a systematically generated pipeline which will be executed after the
26+
* user defined final pipeline.
27+
*/
28+
SYSTEM_FINAL
29+
}

0 commit comments

Comments
 (0)