Skip to content

Commit fae1453

Browse files
authored
Revert "Avoid invalid retries on multiple replicas when querying (#17370)" (#18116)
This reverts commit eba12fa. Signed-off-by: Andrew Ross <[email protected]>
1 parent d706593 commit fae1453

File tree

5 files changed

+13
-108
lines changed

5 files changed

+13
-108
lines changed

CHANGELOG.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1313
- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088))
1414

1515
### Changed
16-
- Avoid invalid retries in multiple replicas when querying [#17370](https://github.com/opensearch-project/OpenSearch/pull/17370)
1716

1817
### Dependencies
1918
- Bump `com.google.code.gson:gson` from 2.12.1 to 2.13.0 ([#17923](https://github.com/opensearch-project/OpenSearch/pull/17923))

libs/core/src/main/java/org/opensearch/OpenSearchException.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -302,12 +302,8 @@ public Map<String, List<String>> getHeaders() {
302302
* Returns the rest status code associated with this exception.
303303
*/
304304
public RestStatus status() {
305-
return status(this);
306-
}
307-
308-
public static RestStatus status(Throwable t) {
309-
Throwable cause = ExceptionsHelper.unwrapCause(t);
310-
if (cause == t) {
305+
Throwable cause = unwrapCause();
306+
if (cause == this) {
311307
return RestStatus.INTERNAL_SERVER_ERROR;
312308
} else {
313309
return ExceptionsHelper.status(cause);

server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -514,19 +514,10 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh
514514
// we do make sure to clean it on a successful response from a shard
515515
setPhaseResourceUsages();
516516
onShardFailure(shardIndex, shard, e);
517+
SearchShardTarget nextShard = FailAwareWeightedRouting.getInstance()
518+
.findNext(shardIt, clusterState, e, () -> totalOps.incrementAndGet());
517519

518-
final SearchShardTarget nextShard;
519-
final boolean lastShard;
520-
final int advanceShardCount;
521-
if (TransportActions.isRetryableSearchException(e)) {
522-
nextShard = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterState, e, () -> totalOps.incrementAndGet());
523-
lastShard = nextShard == null;
524-
advanceShardCount = 1;
525-
} else {
526-
nextShard = null;
527-
lastShard = true;
528-
advanceShardCount = remainingOpsCount(shardIt);
529-
}
520+
final boolean lastShard = nextShard == null;
530521
if (logger.isTraceEnabled()) {
531522
logger.trace(
532523
() -> new ParameterizedMessage(
@@ -551,7 +542,7 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh
551542
if (lastShard) {
552543
onShardGroupFailure(shardIndex, shard, e);
553544
}
554-
final int totalOps = this.totalOps.addAndGet(advanceShardCount);
545+
final int totalOps = this.totalOps.incrementAndGet();
555546
if (totalOps == expectedTotalOps) {
556547
try {
557548
onPhaseDone();
@@ -570,14 +561,6 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh
570561
}
571562
}
572563

573-
private int remainingOpsCount(SearchShardIterator shardsIt) {
574-
if (shardsIt.skip()) {
575-
return shardsIt.remaining();
576-
} else {
577-
return shardsIt.remaining() + 1;
578-
}
579-
}
580-
581564
/**
582565
* Executed once for every {@link ShardId} that failed on all available shard routing.
583566
*
@@ -668,7 +651,12 @@ private void onShardResultConsumed(Result result, SearchShardIterator shardIt) {
668651
}
669652

670653
private void successfulShardExecution(SearchShardIterator shardsIt) {
671-
final int remainingOpsOnIterator = remainingOpsCount(shardsIt);
654+
final int remainingOpsOnIterator;
655+
if (shardsIt.skip()) {
656+
remainingOpsOnIterator = shardsIt.remaining();
657+
} else {
658+
remainingOpsOnIterator = shardsIt.remaining() + 1;
659+
}
672660
final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);
673661
if (xTotalOps == expectedTotalOps) {
674662
try {

server/src/main/java/org/opensearch/action/support/TransportActions.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,8 @@
3434

3535
import org.apache.lucene.store.AlreadyClosedException;
3636
import org.opensearch.ExceptionsHelper;
37-
import org.opensearch.OpenSearchException;
3837
import org.opensearch.action.NoShardAvailableActionException;
3938
import org.opensearch.action.UnavailableShardsException;
40-
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
41-
import org.opensearch.core.tasks.TaskCancelledException;
4239
import org.opensearch.index.IndexNotFoundException;
4340
import org.opensearch.index.shard.IllegalIndexShardStateException;
4441
import org.opensearch.index.shard.ShardNotFoundException;
@@ -67,12 +64,4 @@ public static boolean isReadOverrideException(Exception e) {
6764
return !isShardNotAvailableException(e);
6865
}
6966

70-
public static boolean isRetryableSearchException(final Exception e) {
71-
return (OpenSearchException.status(e).getStatusFamilyCode() != 4) && (e.getCause() instanceof TaskCancelledException == false)
72-
// There exists a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the
73-
// target node but already deleted on the source node. Search request should still work.
74-
|| (e.getCause() instanceof IndexNotFoundException)
75-
|| (e.getCause() instanceof OpenSearchRejectedExecutionException);
76-
}
77-
7867
}

server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java

Lines changed: 1 addition & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
5050
import org.opensearch.core.index.Index;
5151
import org.opensearch.core.index.shard.ShardId;
52-
import org.opensearch.core.tasks.TaskCancelledException;
5352
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
5453
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
5554
import org.opensearch.index.query.MatchAllQueryBuilder;
@@ -67,7 +66,6 @@
6766
import org.opensearch.threadpool.TestThreadPool;
6867
import org.opensearch.threadpool.ThreadPool;
6968
import org.opensearch.transport.Transport;
70-
import org.opensearch.transport.TransportException;
7169
import org.junit.After;
7270
import org.junit.Before;
7371

@@ -138,7 +136,6 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
138136
controlled,
139137
false,
140138
false,
141-
false,
142139
expected,
143140
resourceUsage,
144141
new SearchShardIterator(null, null, Collections.emptyList(), null)
@@ -151,7 +148,6 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
151148
ActionListener<SearchResponse> listener,
152149
final boolean controlled,
153150
final boolean failExecutePhaseOnShard,
154-
final boolean throw4xxExceptionOnShard,
155151
final boolean catchExceptionWhenExecutePhaseOnShard,
156152
final AtomicLong expected,
157153
final TaskResourceUsage resourceUsage,
@@ -221,11 +217,7 @@ protected void executePhaseOnShard(
221217
final SearchActionListener<SearchPhaseResult> listener
222218
) {
223219
if (failExecutePhaseOnShard) {
224-
if (throw4xxExceptionOnShard) {
225-
listener.onFailure(new TransportException(new TaskCancelledException(shardIt.shardId().toString())));
226-
} else {
227-
listener.onFailure(new ShardNotFoundException(shardIt.shardId()));
228-
}
220+
listener.onFailure(new ShardNotFoundException(shardIt.shardId()));
229221
} else {
230222
if (catchExceptionWhenExecutePhaseOnShard) {
231223
try {
@@ -593,7 +585,6 @@ public void onFailure(Exception e) {
593585
false,
594586
true,
595587
false,
596-
false,
597588
new AtomicLong(),
598589
new TaskResourceUsage(randomLong(), randomLong()),
599590
shards
@@ -610,62 +601,6 @@ public void onFailure(Exception e) {
610601
assertThat(searchResponse.getSuccessfulShards(), equalTo(0));
611602
}
612603

613-
public void testSkipInValidRetryInMultiReplicas() throws InterruptedException {
614-
final Index index = new Index("test", UUID.randomUUID().toString());
615-
final CountDownLatch latch = new CountDownLatch(1);
616-
final AtomicBoolean fail = new AtomicBoolean(true);
617-
618-
List<String> targetNodeIds = List.of("n1", "n2", "n3");
619-
final SearchShardIterator[] shards = IntStream.range(2, 4)
620-
.mapToObj(i -> new SearchShardIterator(null, new ShardId(index, i), targetNodeIds, null, null, null))
621-
.toArray(SearchShardIterator[]::new);
622-
623-
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
624-
searchRequest.setMaxConcurrentShardRequests(1);
625-
626-
final ArraySearchPhaseResults<SearchPhaseResult> queryResult = new ArraySearchPhaseResults<>(shards.length);
627-
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(
628-
searchRequest,
629-
queryResult,
630-
new ActionListener<SearchResponse>() {
631-
@Override
632-
public void onResponse(SearchResponse response) {
633-
634-
}
635-
636-
@Override
637-
public void onFailure(Exception e) {
638-
if (fail.compareAndExchange(true, false)) {
639-
try {
640-
throw new RuntimeException("Simulated exception");
641-
} finally {
642-
executor.submit(() -> latch.countDown());
643-
}
644-
}
645-
}
646-
},
647-
false,
648-
true,
649-
true,
650-
false,
651-
new AtomicLong(),
652-
new TaskResourceUsage(randomLong(), randomLong()),
653-
shards
654-
);
655-
action.run();
656-
assertTrue(latch.await(1, TimeUnit.SECONDS));
657-
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
658-
SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, action.buildShardFailures(), null, null);
659-
assertSame(searchResponse.getAggregations(), internalSearchResponse.aggregations());
660-
assertSame(searchResponse.getSuggest(), internalSearchResponse.suggest());
661-
assertSame(searchResponse.getProfileResults(), internalSearchResponse.profile());
662-
assertSame(searchResponse.getHits(), internalSearchResponse.hits());
663-
assertThat(searchResponse.getSuccessfulShards(), equalTo(0));
664-
for (int i = 0; i < shards.length; i++) {
665-
assertEquals(targetNodeIds.size() - 1, shards[i].remaining());
666-
}
667-
}
668-
669604
public void testOnShardSuccessPhaseDoneFailure() throws InterruptedException {
670605
final Index index = new Index("test", UUID.randomUUID().toString());
671606
final CountDownLatch latch = new CountDownLatch(1);
@@ -698,7 +633,6 @@ public void onFailure(Exception e) {
698633
false,
699634
false,
700635
false,
701-
false,
702636
new AtomicLong(),
703637
new TaskResourceUsage(randomLong(), randomLong()),
704638
shards
@@ -751,7 +685,6 @@ public void onFailure(Exception e) {
751685
},
752686
false,
753687
false,
754-
false,
755688
catchExceptionWhenExecutePhaseOnShard,
756689
new AtomicLong(),
757690
new TaskResourceUsage(randomLong(), randomLong()),

0 commit comments

Comments
 (0)