Skip to content

Commit 896e4e2

Browse files
committed
Optimize indexing performance in replica shard
Signed-off-by: kkewwei <[email protected]> Signed-off-by: kkewwei <[email protected]> test
1 parent 56825f6 commit 896e4e2

File tree

19 files changed

+565
-63
lines changed

19 files changed

+565
-63
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2424
### Changed
2525
- Convert transport-reactor-netty4 to use gradle version catalog [#17233](https://github.com/opensearch-project/OpenSearch/pull/17233)
2626
- Increase force merge threads to 1/8th of cores [#17255](https://github.com/opensearch-project/OpenSearch/pull/17255)
27+
- Optimize indexing performance in replica shard [#17371](https://github.com/opensearch-project/OpenSearch/pull/17371)
2728

2829
### Deprecated
2930

qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java

+85-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
import org.opensearch.common.settings.Settings;
4646
import org.opensearch.common.xcontent.json.JsonXContent;
4747
import org.opensearch.common.xcontent.support.XContentMapValues;
48-
import org.opensearch.core.common.Strings;
48+
import org.opensearch.index.IndexSettings;
4949
import org.opensearch.index.seqno.SeqNoStats;
5050
import org.opensearch.indices.replication.common.ReplicationType;
5151
import org.opensearch.core.rest.RestStatus;
@@ -465,6 +465,90 @@ public void testSyncedFlushTransition() throws Exception {
465465
}
466466
}
467467

468+
public void testReplicasUsePrimaryIndexingStrategy() throws Exception {
469+
Nodes nodes = buildNodeAndVersions();
470+
logger.info("cluster discovered:\n {}", nodes.toString());
471+
Settings.Builder settings = Settings.builder()
472+
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
473+
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1m")
474+
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2);
475+
final String index = "test-index";
476+
createIndex(index, settings.build());
477+
ensureNoInitializingShards(); // wait for all other shard activity to finish
478+
ensureGreen(index);
479+
480+
int docCount = 200;
481+
try (RestClient nodeClient = buildClient(restClientSettings(),
482+
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
483+
indexDocs(index, 0, docCount);
484+
485+
Thread[] indexThreads = new Thread[5];
486+
for (int i = 0; i < indexThreads.length; i++) {
487+
indexThreads[i] = new Thread(() -> {
488+
try {
489+
int idStart = randomInt(docCount / 2);
490+
indexDocs(index, idStart, idStart + docCount / 2);
491+
if (randomBoolean()) {
492+
// perform a refresh
493+
assertOK(client().performRequest(new Request("POST", index + "/_flush")));
494+
}
495+
} catch (IOException e) {
496+
throw new AssertionError("failed while indexing [" + e.getMessage() + "]");
497+
}
498+
});
499+
indexThreads[i].start();
500+
}
501+
for (Thread indexThread : indexThreads) {
502+
indexThread.join();
503+
}
504+
if (randomBoolean()) {
505+
// perform a refresh
506+
assertOK(client().performRequest(new Request("POST", index + "/_flush")));
507+
}
508+
// verify replica catch up with primary
509+
assertSeqNoOnShards(index, nodes, docCount, nodeClient);
510+
assertSourceEqualWithPrimary(index, docCount);
511+
}
512+
}
513+
514+
private void assertSourceEqualWithPrimary(final String index, final int expectedCount) throws IOException {
515+
Request primaryRequest = new Request("GET", index + "/_search");
516+
primaryRequest.addParameter("preference", "_primary");
517+
primaryRequest.addParameter("size", String.valueOf(expectedCount+100));
518+
final Response primaryResponse = client().performRequest(primaryRequest);
519+
520+
Map<String, Object> primaryHits = ObjectPath.createFromResponse(primaryResponse).evaluate("hits");
521+
Map<String, Object> totals = ObjectPath.evaluate(primaryHits, "total");
522+
assertEquals(expectedCount, totals.get("values"));
523+
524+
List<Object> primarySources = ObjectPath.evaluate(primaryHits, "hits");
525+
assertEquals(expectedCount, primarySources.size());
526+
527+
Map<String, Object> primarys = new HashMap<>(expectedCount);
528+
for (int i = 0; i < primarySources.size(); i++) {
529+
primarys.put(ObjectPath.evaluate(primarySources.get(i), "_id"), primarySources.get(i));
530+
}
531+
532+
533+
// replicas source
534+
Request replicaRequest = new Request("GET", index + "/_search");
535+
replicaRequest.addParameter("preference", "_replica");
536+
replicaRequest.addParameter("size", String.valueOf(expectedCount+100));
537+
final Response replicaResponse = client().performRequest(replicaRequest);
538+
539+
Map<String, Object> replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits");
540+
Map<String, Object> replicaTotals = ObjectPath.evaluate(primaryHits, "total");
541+
assertEquals(expectedCount, replicaTotals.get("values"));
542+
543+
List<Object> replicaSources = ObjectPath.evaluate(replicaHits, "hits");
544+
assertEquals(expectedCount, replicaSources.size());
545+
546+
for (Object replicaSource : replicaSources) {
547+
String id = ObjectPath.evaluate(replicaSource, "_id").toString();
548+
assertEquals(primarys.get(id), replicaSource);
549+
}
550+
}
551+
468552
private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
469553
Request request = new Request("GET", index + "/_count");
470554
request.addParameter("preference", preference);

server/src/main/java/org/opensearch/action/DocWriteResponse.java

+5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.opensearch.core.xcontent.XContentBuilder;
5050
import org.opensearch.core.xcontent.XContentParser;
5151
import org.opensearch.index.IndexSettings;
52+
import org.opensearch.index.engine.InternalEngine;
5253
import org.opensearch.index.mapper.MapperService;
5354
import org.opensearch.index.seqno.SequenceNumbers;
5455

@@ -392,6 +393,10 @@ protected static void parseInnerToXContent(XContentParser parser, Builder contex
392393
}
393394
}
394395

396+
public InternalEngine.WriteStrategy writeStrategy() {
397+
return null;
398+
};
399+
395400
/**
396401
* Base class of all {@link DocWriteResponse} builders. These {@link DocWriteResponse.Builder} are used during
397402
* xcontent parsing to temporarily store the parsed values, then the {@link Builder#build()} method is called to

server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,8 @@ public void markOperationAsExecuted(Engine.Result result) {
276276
result.getSeqNo(),
277277
result.getTerm(),
278278
indexResult.getVersion(),
279-
indexResult.isCreated()
279+
indexResult.isCreated(),
280+
indexResult.indexingStrategy()
280281
);
281282
} else if (result.getOperationType() == Engine.Operation.TYPE.DELETE) {
282283
Engine.DeleteResult deleteResult = (Engine.DeleteResult) result;
@@ -286,7 +287,8 @@ public void markOperationAsExecuted(Engine.Result result) {
286287
deleteResult.getSeqNo(),
287288
result.getTerm(),
288289
deleteResult.getVersion(),
289-
deleteResult.isFound()
290+
deleteResult.isFound(),
291+
deleteResult.deletionStrategy()
290292
);
291293

292294
} else {

server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.opensearch.index.IndexingPressureService;
8585
import org.opensearch.index.SegmentReplicationPressureService;
8686
import org.opensearch.index.engine.Engine;
87+
import org.opensearch.index.engine.InternalEngine;
8788
import org.opensearch.index.engine.VersionConflictEngineException;
8889
import org.opensearch.index.get.GetResult;
8990
import org.opensearch.index.mapper.MapperException;
@@ -751,7 +752,8 @@ static BulkItemResponse processUpdateResponse(
751752
indexResponse.getSeqNo(),
752753
indexResponse.getPrimaryTerm(),
753754
indexResponse.getVersion(),
754-
indexResponse.getResult()
755+
indexResponse.getResult(),
756+
indexResponse.writeStrategy()
755757
);
756758

757759
if (updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) {
@@ -783,7 +785,8 @@ static BulkItemResponse processUpdateResponse(
783785
deleteResponse.getSeqNo(),
784786
deleteResponse.getPrimaryTerm(),
785787
deleteResponse.getVersion(),
786-
deleteResponse.getResult()
788+
deleteResponse.getResult(),
789+
deleteResponse.writeStrategy()
787790
);
788791

789792
final GetResult getResult = UpdateHelper.extractGetResult(
@@ -880,7 +883,8 @@ private static Engine.Result performOpOnReplica(
880883
primaryResponse.getVersion(),
881884
indexRequest.getAutoGeneratedTimestamp(),
882885
indexRequest.isRetry(),
883-
sourceToParse
886+
sourceToParse,
887+
(InternalEngine.IndexingStrategy) primaryResponse.writeStrategy()
884888
);
885889
break;
886890
case DELETE:
@@ -889,7 +893,8 @@ private static Engine.Result performOpOnReplica(
889893
primaryResponse.getSeqNo(),
890894
primaryResponse.getPrimaryTerm(),
891895
primaryResponse.getVersion(),
892-
deleteRequest.id()
896+
deleteRequest.id(),
897+
(InternalEngine.DeletionStrategy) primaryResponse.writeStrategy()
893898
);
894899
break;
895900
default:

server/src/main/java/org/opensearch/action/delete/DeleteResponse.java

+59-3
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,16 @@
3535
import org.opensearch.action.DocWriteResponse;
3636
import org.opensearch.common.annotation.PublicApi;
3737
import org.opensearch.core.common.io.stream.StreamInput;
38+
import org.opensearch.core.common.io.stream.StreamOutput;
3839
import org.opensearch.core.index.shard.ShardId;
3940
import org.opensearch.core.rest.RestStatus;
4041
import org.opensearch.core.xcontent.XContentParser;
4142
import org.opensearch.transport.client.Client;
43+
import org.opensearch.index.engine.InternalEngine;
4244

4345
import java.io.IOException;
4446

47+
import static org.opensearch.Version.V_3_0_0;
4548
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
4649

4750
/**
@@ -54,21 +57,53 @@
5457
*/
5558
@PublicApi(since = "1.0.0")
5659
public class DeleteResponse extends DocWriteResponse {
60+
private final InternalEngine.DeletionStrategy deletionStrategy;
5761

5862
public DeleteResponse(ShardId shardId, StreamInput in) throws IOException {
5963
super(shardId, in);
64+
if (in.getVersion().onOrAfter(V_3_0_0)) {
65+
this.deletionStrategy = new InternalEngine.DeletionStrategy(in);
66+
} else {
67+
this.deletionStrategy = null;
68+
}
6069
}
6170

6271
public DeleteResponse(StreamInput in) throws IOException {
6372
super(in);
73+
if (in.getVersion().onOrAfter(V_3_0_0)) {
74+
this.deletionStrategy = new InternalEngine.DeletionStrategy(in);
75+
} else {
76+
this.deletionStrategy = null;
77+
}
6478
}
6579

6680
public DeleteResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, boolean found) {
67-
this(shardId, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND);
81+
this(shardId, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND, null);
82+
}
83+
84+
public DeleteResponse(
85+
ShardId shardId,
86+
String id,
87+
long seqNo,
88+
long primaryTerm,
89+
long version,
90+
boolean found,
91+
InternalEngine.DeletionStrategy deletionStrategy
92+
) {
93+
this(shardId, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND, deletionStrategy);
6894
}
6995

70-
private DeleteResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) {
96+
private DeleteResponse(
97+
ShardId shardId,
98+
String id,
99+
long seqNo,
100+
long primaryTerm,
101+
long version,
102+
Result result,
103+
InternalEngine.DeletionStrategy deletionStrategy
104+
) {
71105
super(shardId, id, seqNo, primaryTerm, version, assertDeletedOrNotFound(result));
106+
this.deletionStrategy = deletionStrategy;
72107
}
73108

74109
private static Result assertDeletedOrNotFound(Result result) {
@@ -93,6 +128,27 @@ public String toString() {
93128
return builder.append("]").toString();
94129
}
95130

131+
@Override
132+
public void writeThin(StreamOutput out) throws IOException {
133+
super.writeThin(out);
134+
if (out.getVersion().onOrAfter(V_3_0_0)) {
135+
deletionStrategy.writeTo(out);
136+
}
137+
}
138+
139+
@Override
140+
public void writeTo(StreamOutput out) throws IOException {
141+
super.writeTo(out);
142+
if (out.getVersion().onOrAfter(V_3_0_0)) {
143+
deletionStrategy.writeTo(out);
144+
}
145+
}
146+
147+
@Override
148+
public InternalEngine.DeletionStrategy writeStrategy() {
149+
return deletionStrategy;
150+
}
151+
96152
public static DeleteResponse fromXContent(XContentParser parser) throws IOException {
97153
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
98154

@@ -122,7 +178,7 @@ public static class Builder extends DocWriteResponse.Builder {
122178

123179
@Override
124180
public DeleteResponse build() {
125-
DeleteResponse deleteResponse = new DeleteResponse(shardId, id, seqNo, primaryTerm, version, result);
181+
DeleteResponse deleteResponse = new DeleteResponse(shardId, id, seqNo, primaryTerm, version, result, null);
126182
deleteResponse.setForcedRefresh(forcedRefresh);
127183
if (shardInfo != null) {
128184
deleteResponse.setShardInfo(shardInfo);

0 commit comments

Comments
 (0)