|
21 | 21 | import org.elasticsearch.action.bulk.BulkRequestBuilder; |
22 | 22 | import org.elasticsearch.action.get.GetResponse; |
23 | 23 | import org.elasticsearch.action.index.IndexRequest; |
| 24 | +import org.elasticsearch.action.support.ActionTestUtils; |
24 | 25 | import org.elasticsearch.action.support.ActiveShardCount; |
25 | 26 | import org.elasticsearch.action.support.IndicesOptions; |
26 | 27 | import org.elasticsearch.action.support.PlainActionFuture; |
|
51 | 52 | import org.elasticsearch.snapshots.SnapshotId; |
52 | 53 | import org.elasticsearch.snapshots.SnapshotShardSizeInfo; |
53 | 54 | import org.elasticsearch.snapshots.SnapshotsInfoService; |
| 55 | +import org.elasticsearch.test.ClusterServiceUtils; |
54 | 56 | import org.elasticsearch.test.transport.MockTransportService; |
55 | 57 | import org.elasticsearch.transport.TransportActionProxy; |
56 | 58 | import org.elasticsearch.transport.TransportService; |
@@ -655,39 +657,39 @@ public void testCcrRepositoryFailsToFetchSnapshotShardSizes() throws Exception { |
655 | 657 | try { |
656 | 658 | final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class); |
657 | 659 |
|
| 660 | + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); |
658 | 661 | final PlainActionFuture<Void> waitForAllShardSnapshotSizesFailures = new PlainActionFuture<>(); |
659 | | - final ClusterStateListener listener = event -> { |
660 | | - if (RestoreInProgress.get(event.state()).isEmpty() == false && event.state().routingTable().hasIndex(followerIndex)) { |
661 | | - try { |
662 | | - final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(followerIndex); |
663 | | - // this assertBusy completes because the listener is added after the InternalSnapshotsInfoService |
664 | | - // and ClusterService preserves the order of listeners. |
665 | | - assertBusy(() -> { |
666 | | - List<Long> sizes = indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED) |
667 | | - .stream() |
668 | | - .filter(shard -> shard.unassignedInfo().lastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA) |
669 | | - .sorted(Comparator.comparingInt(ShardRouting::getId)) |
670 | | - .map(shard -> snapshotsInfoService.snapshotShardSizes().getShardSize(shard)) |
671 | | - .filter(Objects::nonNull) |
672 | | - .filter(size -> ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE == size) |
673 | | - .collect(Collectors.toList()); |
674 | | - assertThat(sizes, hasSize(numberOfShards)); |
675 | | - }); |
676 | | - waitForAllShardSnapshotSizesFailures.onResponse(null); |
677 | | - } catch (Exception e) { |
678 | | - throw new AssertionError("Failed to retrieve all snapshot shard sizes", e); |
679 | | - } |
| 662 | + ClusterServiceUtils.addTemporaryStateListener( |
| 663 | + clusterService, |
| 664 | + state -> RestoreInProgress.get(state).isEmpty() == false && state.routingTable().hasIndex(followerIndex) |
| 665 | + ).addListener(ActionTestUtils.assertNoFailureListener(ignore -> { |
| 666 | + try { |
| 667 | + // This listener runs synchronously in the same thread so that clusterService.state() returns the same state |
| 668 | + // that satisfied the predicate. |
| 669 | + final IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(followerIndex); |
| 670 | + // this assertBusy completes because the listener is added after the InternalSnapshotsInfoService |
| 671 | + // and ClusterService preserves the order of listeners. |
| 672 | + assertBusy(() -> { |
| 673 | + List<Long> sizes = indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED) |
| 674 | + .stream() |
| 675 | + .filter(shard -> shard.unassignedInfo().lastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA) |
| 676 | + .sorted(Comparator.comparingInt(ShardRouting::getId)) |
| 677 | + .map(shard -> snapshotsInfoService.snapshotShardSizes().getShardSize(shard)) |
| 678 | + .filter(Objects::nonNull) |
| 679 | + .filter(size -> ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE == size) |
| 680 | + .collect(Collectors.toList()); |
| 681 | + assertThat(sizes, hasSize(numberOfShards)); |
| 682 | + }); |
| 683 | + waitForAllShardSnapshotSizesFailures.onResponse(null); |
| 684 | + } catch (Exception e) { |
| 685 | + throw new AssertionError("Failed to retrieve all snapshot shard sizes", e); |
680 | 686 | } |
681 | | - }; |
682 | | - |
683 | | - final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); |
684 | | - clusterService.addListener(listener); |
| 687 | + })); |
685 | 688 |
|
686 | 689 | logger.debug("--> creating follower index [{}]", followerIndex); |
687 | 690 | followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, followerIndex, ActiveShardCount.NONE)); |
688 | 691 |
|
689 | 692 | waitForAllShardSnapshotSizesFailures.get(30L, TimeUnit.SECONDS); |
690 | | - clusterService.removeListener(listener); |
691 | 693 |
|
692 | 694 | assertThat(simulatedFailures.get(), equalTo(numberOfShards)); |
693 | 695 |
|
|
0 commit comments