-
Notifications
You must be signed in to change notification settings - Fork 2k
/
Copy pathReplicationTracker.java
2064 lines (1902 loc) · 99.2 KB
/
ReplicationTracker.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.index.seqno;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.AllocationId;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.WriteStateException;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.engine.SafeCommitInfo;
import org.opensearch.index.shard.AbstractIndexShardComponent;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ReplicationGroup;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationLagTimer;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints).
* <p>
* The global checkpoint is the highest sequence number for which all lower (or equal) sequence number have been processed
* on all shards that are currently active. Since shards count as "active" when the cluster-manager starts
* them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery. These shards
* have received all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set of
* shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
* <p>
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
*
* @opensearch.internal
*/
public class ReplicationTracker extends AbstractIndexShardComponent implements LongSupplier {
/**
* The allocation ID for the shard to which this tracker is a component of.
*/
final String shardAllocationId;
/**
* The global checkpoint tracker can operate in two modes:
* - primary: this shard is in charge of collecting local checkpoint information from all shard copies and computing the global
* checkpoint based on the local checkpoints of all in-sync shard copies.
* - replica: this shard receives global checkpoint information from the primary (see
* {@link #updateGlobalCheckpointOnReplica(long, String)}).
* <p>
* When a shard is initialized (be it a primary or replica), it initially operates in replica mode. The global checkpoint tracker is
* then switched to primary mode in the following three scenarios:
* <p>
* - An initializing primary shard that is not a relocation target is moved to primary mode (using {@link #activatePrimaryMode}) once
* the shard becomes active.
* - An active replica shard is moved to primary mode (using {@link #activatePrimaryMode}) once it is promoted to primary.
* - A primary relocation target is moved to primary mode (using {@link #activateWithPrimaryContext}) during the primary relocation
* handoff. If the target shard is successfully initialized in primary mode, the source shard of a primary relocation is then moved
* to replica mode (using {@link #completeRelocationHandoff}), as the relocation target will be in charge of the global checkpoint
* computation from that point on.
*/
volatile boolean primaryMode;
/**
* The current operation primary term. Management of this value is done through {@link IndexShard} and must only be done when safe. See
* {@link #setOperationPrimaryTerm(long)}.
*/
private volatile long operationPrimaryTerm;
/**
* Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling
* {@link #startRelocationHandoff(String)} and is finished by either calling {@link #completeRelocationHandoff} or
* {@link #abortRelocationHandoff}, depending on whether the handoff was successful or not. During the handoff, which has as main
* objective to transfer the internal state of the global checkpoint tracker from the relocation source to the target, the list of
* in-sync shard copies cannot grow, otherwise the relocation target might miss this information and increase the global checkpoint
* to eagerly. As consequence, some of the methods in this class are not allowed to be called while a handoff is in progress,
* in particular {@link #markAllocationIdAsInSync}.
* <p>
* A notable exception to this is the method {@link #updateFromClusterManager}, which is still allowed to be called during a relocation handoff.
* The reason for this is that the handoff might fail and can be aborted (using {@link #abortRelocationHandoff}), in which case
* it is important that the global checkpoint tracker does not miss any state updates that might happened during the handoff attempt.
* This means, however, that the global checkpoint can still advance after the primary relocation handoff has been initiated, but only
* because the cluster-manager could have failed some of the in-sync shard copies and marked them as stale. That is ok though, as this
* information is conveyed through cluster state updates, and the new primary relocation target will also eventually learn about those.
*/
boolean handoffInProgress;
/**
* Boolean flag that indicates whether a relocation handoff completed (see {@link #completeRelocationHandoff}).
*/
volatile boolean relocated;
/**
* The global checkpoint tracker relies on the property that cluster state updates are applied in-order. After transferring a primary
* context from the primary relocation source to the target and initializing the target, it is possible for the target to apply a
* cluster state that is older than the one upon which the primary context was based. If we allowed this old cluster state
* to influence the list of in-sync shard copies here, this could possibly remove such an in-sync copy from the internal structures
* until the newer cluster state were to be applied, which would unsafely advance the global checkpoint. This field thus captures
* the version of the last applied cluster state to ensure in-order updates.
*/
long appliedClusterStateVersion;
IndexShardRoutingTable routingTable;
/**
* Local checkpoint information for all shard copies that are tracked. Has an entry for all shard copies that are either initializing
* and / or in-sync, possibly also containing information about unassigned in-sync shard copies. The information that is tracked for
* each shard copy is explained in the docs for the {@link CheckpointState} class.
*/
final Map<String, CheckpointState> checkpoints;
/**
* The current in-memory global checkpoint. In primary mode, this is a cached version of the checkpoint computed from the local
* checkpoints. In replica mode, this is the in-memory global checkpoint that's communicated by the primary.
*/
volatile long globalCheckpoint;
/**
* A callback invoked when the in-memory global checkpoint is updated. For primary mode this occurs if the computed global checkpoint
* advances on the basis of state changes tracked here. For non-primary mode this occurs if the local knowledge of the global checkpoint
* advances due to an update from the primary.
*/
private final LongConsumer onGlobalCheckpointUpdated;
/**
* A supplier of the current time. This supplier is used to add a timestamp to retention leases, and to determine retention lease
* expiration.
*/
private final LongSupplier currentTimeMillisSupplier;
/**
* A callback when a new retention lease is created or an existing retention lease is removed. In practice, this callback invokes the
* retention lease sync action, to sync retention leases to replicas.
*/
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
* current global checkpoint.
*/
final Set<String> pendingInSync;
/**
* Cached value for the last replication group that was computed
*/
volatile ReplicationGroup replicationGroup;
/**
* The current retention leases.
*/
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;
/**
* The primary term of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
* leases.
*/
private long persistedRetentionLeasesPrimaryTerm;
/**
* The version of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
* leases.
*/
private long persistedRetentionLeasesVersion;
/**
* Whether there should be a peer recovery retention lease (PRRL) for every tracked shard copy. Always true on indices created from
* {@code LegacyESVersion#V_7_4_0} onwards, because these versions create PRRLs properly. May be false on indices created in an
* earlier version if we recently did a rolling upgrade and
* {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)} has not yet completed. Is only permitted
* to change from false to true; can be removed once support for pre-PRRL indices is no longer needed.
*/
private boolean hasAllPeerRecoveryRetentionLeases;
/**
* Supplies information about the current safe commit which may be used to expire peer-recovery retention leases.
*/
private final Supplier<SafeCommitInfo> safeCommitInfoSupplier;
/**
* Threshold for expiring peer-recovery retention leases and falling back to file-based recovery. See
* {@link IndexSettings#FILE_BASED_RECOVERY_THRESHOLD_SETTING}.
*/
private final double fileBasedRecoveryThreshold;
private final Consumer<ReplicationGroup> onReplicationGroupUpdated;
private volatile ReplicationCheckpoint latestReplicationCheckpoint;
private final Function<String, Boolean> isShardOnRemoteEnabledNode;
/**
* Flag to indicate whether {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)}
* has been run successfully
*/
private boolean createdMissingRetentionLeases;
/**
* Get all retention leases tracked on this shard.
*
* @return the retention leases
*/
public RetentionLeases getRetentionLeases() {
return getRetentionLeases(false).v2();
}
/**
* If the expire leases parameter is false, gets all retention leases tracked on this shard and otherwise first calculates
* expiration of existing retention leases, and then gets all non-expired retention leases tracked on this shard. Note that only the
* primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. If the
* expire leases parameter is true, this replication tracker must be in primary mode.
*
* @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases
*/
public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boolean expireLeases) {
if (expireLeases == false) {
return Tuple.tuple(false, retentionLeases);
}
assert primaryMode;
// the primary calculates the non-expired retention leases and syncs them to replicas
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
final Set<String> leaseIdsForCurrentPeers = routingTable.assignedShards()
.stream()
.map(ReplicationTracker::getPeerRecoveryRetentionLeaseId)
.collect(Collectors.toSet());
final boolean allShardsStarted = routingTable.allShardsStarted();
final long minimumReasonableRetainedSeqNo = allShardsStarted ? 0L : getMinimumReasonableRetainedSeqNo();
final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases.leases()
.stream()
.collect(Collectors.groupingBy(lease -> {
if (lease.source().equals(PEER_RECOVERY_RETENTION_LEASE_SOURCE)) {
if (leaseIdsForCurrentPeers.contains(lease.id())) {
return false;
}
if (allShardsStarted) {
logger.trace("expiring unused [{}]", lease);
return true;
}
if (lease.retainingSequenceNumber() < minimumReasonableRetainedSeqNo) {
logger.trace("expiring unreasonable [{}] retaining history before [{}]", lease, minimumReasonableRetainedSeqNo);
return true;
}
}
return currentTimeMillis - lease.timestamp() > retentionLeaseMillis;
}));
final Collection<RetentionLease> expiredLeases = partitionByExpiration.get(true);
if (expiredLeases == null) {
// early out as no retention leases have expired
logger.debug("no retention leases are expired from current retention leases [{}]", retentionLeases);
return Tuple.tuple(false, retentionLeases);
}
final Collection<RetentionLease> nonExpiredLeases = partitionByExpiration.get(false) != null
? partitionByExpiration.get(false)
: Collections.emptyList();
logger.debug("expiring retention leases [{}] from current retention leases [{}]", expiredLeases, retentionLeases);
retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases);
return Tuple.tuple(true, retentionLeases);
}
private long getMinimumReasonableRetainedSeqNo() {
final SafeCommitInfo safeCommitInfo = safeCommitInfoSupplier.get();
return safeCommitInfo.localCheckpoint + 1 - Math.round(Math.ceil(safeCommitInfo.docCount * fileBasedRecoveryThreshold));
// NB safeCommitInfo.docCount is a very low-level count of the docs in the index, and in particular if this shard contains nested
// docs then safeCommitInfo.docCount counts every child doc separately from the parent doc. However every part of a nested document
// has the same seqno, so we may be overestimating the cost of a file-based recovery when compared to an ops-based recovery and
// therefore preferring ops-based recoveries inappropriately in this case. Correctly accounting for nested docs seems difficult to
// do cheaply, and the circumstances in which this matters should be relatively rare, so we use this naive calculation regardless.
// TODO improve this measure for when nested docs are in use
}
/**
* Adds a new retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
* @param listener the callback when the retention lease is successfully added and synced to replicas
* @return the new retention lease
* @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists
*/
public RetentionLease addRetentionLease(
final String id,
final long retainingSequenceNumber,
final String source,
final ActionListener<ReplicationResponse> listener
) {
Objects.requireNonNull(listener);
final RetentionLease retentionLease;
final RetentionLeases currentRetentionLeases;
synchronized (this) {
retentionLease = innerAddRetentionLease(id, retainingSequenceNumber, source);
currentRetentionLeases = retentionLeases;
}
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
return retentionLease;
}
/**
* Atomically clones an existing retention lease to a new ID.
*
* @param sourceLeaseId the identifier of the source retention lease
* @param targetLeaseId the identifier of the retention lease to create
* @param listener the callback when the retention lease is successfully added and synced to replicas
* @return the new retention lease
* @throws RetentionLeaseNotFoundException if the specified source retention lease does not exist
* @throws RetentionLeaseAlreadyExistsException if the specified target retention lease already exists
*/
RetentionLease cloneRetentionLease(String sourceLeaseId, String targetLeaseId, ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(listener);
final RetentionLease retentionLease;
final RetentionLeases currentRetentionLeases;
synchronized (this) {
assert primaryMode;
if (getRetentionLeases().contains(sourceLeaseId) == false) {
throw new RetentionLeaseNotFoundException(sourceLeaseId);
}
final RetentionLease sourceLease = getRetentionLeases().get(sourceLeaseId);
retentionLease = innerAddRetentionLease(targetLeaseId, sourceLease.retainingSequenceNumber(), sourceLease.source());
currentRetentionLeases = retentionLeases;
}
// Syncing here may not be strictly necessary, because this new lease isn't retaining any extra history that wasn't previously
// retained by the source lease; however we prefer to sync anyway since we expect to do so whenever creating a new lease.
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
return retentionLease;
}
/**
* Adds a new retention lease, but does not synchronise it with the rest of the replication group.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
* @return the new retention lease
* @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists
*/
private RetentionLease innerAddRetentionLease(String id, long retainingSequenceNumber, String source) {
assert Thread.holdsLock(this);
assert primaryMode : id + "/" + retainingSequenceNumber + "/" + source;
if (retentionLeases.contains(id)) {
throw new RetentionLeaseAlreadyExistsException(id);
}
final RetentionLease retentionLease = new RetentionLease(
id,
retainingSequenceNumber,
currentTimeMillisSupplier.getAsLong(),
source
);
logger.debug("adding new retention lease [{}] to current retention leases [{}]", retentionLease, retentionLeases);
retentionLeases = new RetentionLeases(
operationPrimaryTerm,
retentionLeases.version() + 1,
Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList())
);
return retentionLease;
}
/**
* Renews an existing retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
* @return the renewed retention lease
* @throws RetentionLeaseNotFoundException if the specified retention lease does not exist
* @throws RetentionLeaseInvalidRetainingSeqNoException if the new retaining sequence number is lower than
* the retaining sequence number of the current retention lease.
*/
public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert primaryMode;
final RetentionLease existingRetentionLease = retentionLeases.get(id);
if (existingRetentionLease == null) {
throw new RetentionLeaseNotFoundException(id);
}
if (retainingSequenceNumber < existingRetentionLease.retainingSequenceNumber()) {
assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(source) == false : "renewing peer recovery retention lease ["
+ existingRetentionLease
+ "]"
+ " with a lower retaining sequence number ["
+ retainingSequenceNumber
+ "]";
throw new RetentionLeaseInvalidRetainingSeqNoException(id, source, retainingSequenceNumber, existingRetentionLease);
}
final RetentionLease retentionLease = new RetentionLease(
id,
retainingSequenceNumber,
currentTimeMillisSupplier.getAsLong(),
source
);
retentionLeases = new RetentionLeases(
operationPrimaryTerm,
retentionLeases.version() + 1,
Stream.concat(retentionLeases.leases().stream().filter(lease -> lease.id().equals(id) == false), Stream.of(retentionLease))
.collect(Collectors.toList())
);
return retentionLease;
}
/**
* Removes an existing retention lease.
*
* @param id the identifier of the retention lease
* @param listener the callback when the retention lease is successfully removed and synced to replicas
*/
public void removeRetentionLease(final String id, final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(listener);
final RetentionLeases currentRetentionLeases;
synchronized (this) {
assert primaryMode;
if (retentionLeases.contains(id) == false) {
throw new RetentionLeaseNotFoundException(id);
}
logger.debug("removing retention lease [{}] from current retention leases [{}]", id, retentionLeases);
retentionLeases = new RetentionLeases(
operationPrimaryTerm,
retentionLeases.version() + 1,
retentionLeases.leases().stream().filter(lease -> lease.id().equals(id) == false).collect(Collectors.toList())
);
currentRetentionLeases = retentionLeases;
}
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
}
/**
* Updates retention leases on a replica.
*
* @param retentionLeases the retention leases
*/
public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases) {
assert primaryMode == false;
if (retentionLeases.supersedes(this.retentionLeases)) {
this.retentionLeases = retentionLeases;
}
}
/**
* Loads the latest retention leases from their dedicated state file.
*
* @param path the path to the directory containing the state file
* @return the retention leases
* @throws IOException if an I/O exception occurs reading the retention leases
*/
public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
final RetentionLeases retentionLeases;
synchronized (retentionLeasePersistenceLock) {
retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
}
// TODO after backporting we expect this never to happen in 8.x, so adjust this to throw an exception instead.
assert Version.CURRENT.major <= 8 : "throw an exception instead of returning EMPTY on null";
if (retentionLeases == null) {
return RetentionLeases.EMPTY;
}
return retentionLeases;
}
private final Object retentionLeasePersistenceLock = new Object();
/**
* Persists the current retention leases to their dedicated state file. If this version of the retention leases are already persisted
* then persistence is skipped.
*
* @param path the path to the directory containing the state file
* @throws WriteStateException if an exception occurs writing the state file
*/
public void persistRetentionLeases(final Path path) throws WriteStateException {
synchronized (retentionLeasePersistenceLock) {
final RetentionLeases currentRetentionLeases;
synchronized (this) {
if (retentionLeases.supersedes(persistedRetentionLeasesPrimaryTerm, persistedRetentionLeasesVersion) == false) {
logger.trace("skipping persisting retention leases [{}], already persisted", retentionLeases);
return;
}
currentRetentionLeases = retentionLeases;
}
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
persistedRetentionLeasesPrimaryTerm = currentRetentionLeases.primaryTerm();
persistedRetentionLeasesVersion = currentRetentionLeases.version();
}
}
public boolean assertRetentionLeasesPersisted(final Path path) throws IOException {
assert RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path) != null;
return true;
}
/**
* Retention leases for peer recovery have source {@link ReplicationTracker#PEER_RECOVERY_RETENTION_LEASE_SOURCE}, a lease ID
* containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations
* with sequence numbers strictly greater than the given global checkpoint.
*/
public RetentionLease addPeerRecoveryRetentionLease(
String nodeId,
long globalCheckpoint,
ActionListener<ReplicationResponse> listener
) {
return addRetentionLease(
getPeerRecoveryRetentionLeaseId(nodeId),
globalCheckpoint + 1,
PEER_RECOVERY_RETENTION_LEASE_SOURCE,
listener
);
}
public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
return cloneRetentionLease(
getPeerRecoveryRetentionLeaseId(routingTable.primaryShard()),
getPeerRecoveryRetentionLeaseId(nodeId),
listener
);
}
public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
removeRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), listener);
}
/**
* Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
*/
public static final String PEER_RECOVERY_RETENTION_LEASE_SOURCE = "peer recovery";
/**
* Id for a peer recovery retention lease for the given node. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
*/
public static String getPeerRecoveryRetentionLeaseId(String nodeId) {
return "peer_recovery/" + nodeId;
}
/**
* Id for a peer recovery retention lease for the given {@link ShardRouting}.
* See {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
*/
public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting) {
return getPeerRecoveryRetentionLeaseId(shardRouting.currentNodeId());
}
/**
* Returns a list of peer recovery retention leases installed in this replication group
*/
public List<RetentionLease> getPeerRecoveryRetentionLeases() {
return getRetentionLeases().leases()
.stream()
.filter(lease -> PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(lease.source()))
.collect(Collectors.toList());
}
/**
* Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global
* checkpoint, and renew any leases that are approaching expiry.
*/
public synchronized void renewPeerRecoveryRetentionLeases() {
assert primaryMode;
assert invariant();
/*
* Peer-recovery retention leases never expire while the associated shard is assigned, but we must still renew them occasionally in
* case the associated shard is temporarily unassigned. However we must not renew them too often, since each renewal must be
* persisted and the resulting IO can be expensive on nodes with large numbers of shards (see #42299). We choose to renew them after
* half the expiry time, so that by default the cluster has at least 6 hours to recover before these leases start to expire.
*/
final long renewalTimeMillis = currentTimeMillisSupplier.getAsLong() - indexSettings.getRetentionLeaseMillis() / 2;
/*
* If any of the peer-recovery retention leases need renewal, it's a good opportunity to renew them all.
*/
final boolean renewalNeeded = StreamSupport.stream(routingTable.spliterator(), false)
.filter(ShardRouting::assignedToNode)
.filter(r -> r.isSearchOnly() == false)
.anyMatch(shardRouting -> {
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
if (retentionLease == null) {
/*
* If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't
* create peer recovery retention leases for every shard copy.
*/
assert (checkpoints.get(shardRouting.allocationId().getId()).tracked
&& checkpoints.get(shardRouting.allocationId().getId()).replicated == false)
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false
|| hasAllPeerRecoveryRetentionLeases == false;
return false;
}
return retentionLease.timestamp() <= renewalTimeMillis
|| retentionLease.retainingSequenceNumber() <= checkpoints.get(shardRouting.allocationId().getId()).globalCheckpoint;
});
if (renewalNeeded) {
for (ShardRouting shardRouting : routingTable) {
if (shardRouting.assignedToNode()) {
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
if (retentionLease != null) {
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
final long newRetainedSequenceNumber = Math.max(0L, checkpointState.globalCheckpoint + 1L);
if (retentionLease.retainingSequenceNumber() <= newRetainedSequenceNumber) {
renewRetentionLease(
getPeerRecoveryRetentionLeaseId(shardRouting),
newRetainedSequenceNumber,
PEER_RECOVERY_RETENTION_LEASE_SOURCE
);
} else {
// the retention lease is tied to the node, not the shard copy, so it's possible a copy was removed and now
// we are in the process of recovering it again, or maybe we were just promoted and have not yet received the
// global checkpoints from our peers.
assert checkpointState.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO : "cannot renew "
+ retentionLease
+ " according to "
+ checkpointState
+ " for "
+ shardRouting;
}
}
}
}
}
assert invariant();
}
/**
* The state of the lucene checkpoint
*
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public static class CheckpointState implements Writeable {
/**
* the last local checkpoint information that we have for this shard. All operations up to this point are properly fsynced to disk.
*/
long localCheckpoint;
/**
* the last global checkpoint information that we have for this shard. This is the global checkpoint that's fsynced to disk on the
* respective shard, and all operations up to this point are properly fsynced to disk as well.
*/
long globalCheckpoint;
/**
* When a shard is in-sync, it is capable of being promoted as the primary during a failover. An in-sync shard
* contributes to global checkpoint calculation on the primary iff {@link CheckpointState#replicated} is true.
*/
boolean inSync;
/**
* whether this shard is tracked in the replication group and has localTranslog, i.e., should receive document updates
* from the primary. Tracked shards with localTranslog would have corresponding retention leases on the primary shard's
* {@link ReplicationTracker}.
*/
boolean tracked;
/**
* Whether the replication requests to the primary are replicated to the concerned shard or not.
*/
boolean replicated;
/**
* The currently searchable replication checkpoint.
*/
ReplicationCheckpoint visibleReplicationCheckpoint;
/**
* Map of ReplicationCheckpoints to ReplicationTimers. Timers are added as new checkpoints are published, and removed when
* the replica is caught up.
*/
Map<ReplicationCheckpoint, SegmentReplicationLagTimer> checkpointTimers;
/**
* The time it took to complete the most recent replication event.
*/
long lastCompletedReplicationLag;
public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked, boolean replicated) {
this.localCheckpoint = localCheckpoint;
this.globalCheckpoint = globalCheckpoint;
this.inSync = inSync;
this.tracked = tracked;
this.replicated = replicated;
this.checkpointTimers = ConcurrentCollections.newConcurrentMap();
}
public CheckpointState(StreamInput in) throws IOException {
this.localCheckpoint = in.readZLong();
this.globalCheckpoint = in.readZLong();
this.inSync = in.readBoolean();
this.tracked = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_2_5_0)) {
this.replicated = in.readBoolean();
} else {
this.replicated = true;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(localCheckpoint);
out.writeZLong(globalCheckpoint);
out.writeBoolean(inSync);
out.writeBoolean(tracked);
if (out.getVersion().onOrAfter(Version.V_2_5_0)) {
out.writeBoolean(replicated);
}
}
/**
* Returns a full copy of this object
*/
public CheckpointState copy() {
return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked, replicated);
}
public long getLocalCheckpoint() {
return localCheckpoint;
}
public long getGlobalCheckpoint() {
return globalCheckpoint;
}
@Override
public String toString() {
return "LocalCheckpointState{"
+ "localCheckpoint="
+ localCheckpoint
+ ", globalCheckpoint="
+ globalCheckpoint
+ ", inSync="
+ inSync
+ ", tracked="
+ tracked
+ ", replicated="
+ replicated
+ '}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CheckpointState that = (CheckpointState) o;
if (localCheckpoint != that.localCheckpoint) return false;
if (globalCheckpoint != that.globalCheckpoint) return false;
if (inSync != that.inSync) return false;
if (tracked != that.tracked) return false;
return replicated == that.replicated;
}
@Override
public int hashCode() {
int result = Long.hashCode(localCheckpoint);
result = 31 * result + Long.hashCode(globalCheckpoint);
result = 31 * result + Boolean.hashCode(inSync);
result = 31 * result + Boolean.hashCode(tracked);
result = 31 * result + Boolean.hashCode(replicated);
return result;
}
}
/**
* Get the local knowledge of the persisted global checkpoints for all in-sync allocation IDs.
*
* @return a map from allocation ID to the local knowledge of the persisted global checkpoint for that allocation ID
*/
public synchronized Map<String, Long> getInSyncGlobalCheckpoints() {
assert primaryMode;
assert handoffInProgress == false;
final Map<String, Long> globalCheckpoints = new HashMap<>(checkpoints.size()); // upper bound on the size
checkpoints.entrySet()
.stream()
.filter(e -> e.getValue().inSync && e.getValue().replicated)
.forEach(e -> globalCheckpoints.put(e.getKey(), e.getValue().globalCheckpoint));
return globalCheckpoints;
}
/**
* Returns whether the replication tracker is in primary mode, i.e., whether the current shard is acting as primary from the point of
* view of replication.
*/
public boolean isPrimaryMode() {
return primaryMode;
}
/**
* Returns the current operation primary term.
*
* @return the primary term
*/
public long getOperationPrimaryTerm() {
return operationPrimaryTerm;
}
/**
* Sets the current operation primary term. This method should be invoked only when no other operations are possible on the shard. That
* is, either from the constructor of {@link IndexShard} or while holding all permits on the {@link IndexShard} instance.
*
* @param operationPrimaryTerm the new operation primary term
*/
public void setOperationPrimaryTerm(final long operationPrimaryTerm) {
this.operationPrimaryTerm = operationPrimaryTerm;
}
/**
* Returns whether the replication tracker has relocated away to another shard copy.
*/
public boolean isRelocated() {
return relocated;
}
/**
* Class invariant that should hold before and after every invocation of public methods on this class. As Java lacks implication
* as a logical operator, many of the invariants are written under the form (!A || B), they should be read as (A implies B) however.
*/
private boolean invariant() {
// local checkpoints only set during primary mode
assert primaryMode || checkpoints.values().stream().allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO);
// global checkpoints only set during primary mode
assert primaryMode || checkpoints.values().stream().allMatch(cps -> cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO);
// relocation handoff can only occur in primary mode
assert !handoffInProgress || primaryMode;
// a relocated copy is not in primary mode
assert !relocated || !primaryMode;
// the current shard is marked as in-sync when the global checkpoint tracker operates in primary mode
assert !primaryMode || checkpoints.get(shardAllocationId).inSync;
// the current shard is marked as tracked when the global checkpoint tracker operates in primary mode
assert !primaryMode || checkpoints.get(shardAllocationId).tracked;
// the routing table and replication group is set when the global checkpoint tracker operates in primary mode
assert !primaryMode || (routingTable != null && replicationGroup != null) : "primary mode but routing table is "
+ routingTable
+ " and replication group is "
+ replicationGroup;
// when in primary mode, the current allocation ID is the allocation ID of the primary or the relocation allocation ID
assert !primaryMode
|| (routingTable.primaryShard().allocationId().getId().equals(shardAllocationId)
|| routingTable.primaryShard().allocationId().getRelocationId().equals(shardAllocationId));
// during relocation handoff there are no entries blocking global checkpoint advancement
assert !handoffInProgress || pendingInSync.isEmpty() : "entries blocking global checkpoint advancement during relocation handoff: "
+ pendingInSync;
// entries blocking global checkpoint advancement can only exist in primary mode and when not having a relocation handoff
assert pendingInSync.isEmpty() || (primaryMode && !handoffInProgress);
// the computed global checkpoint is always up-to-date
assert !primaryMode || globalCheckpoint == computeGlobalCheckpoint(pendingInSync, checkpoints.values(), globalCheckpoint)
: "global checkpoint is not up-to-date, expected: "
+ computeGlobalCheckpoint(pendingInSync, checkpoints.values(), globalCheckpoint)
+ " but was: "
+ globalCheckpoint;
// when in primary mode, the global checkpoint is at most the minimum local checkpoint on all in-sync shard copies
assert !primaryMode || globalCheckpoint <= inSyncCheckpointStates(checkpoints, CheckpointState::getLocalCheckpoint, LongStream::min)
: "global checkpoint ["
+ globalCheckpoint
+ "] "
+ "for primary mode allocation ID ["
+ shardAllocationId
+ "] "
+ "more than in-sync local checkpoints ["
+ checkpoints
+ "]";
// we have a routing table iff we have a replication group
assert (routingTable == null) == (replicationGroup == null) : "routing table is "
+ routingTable
+ " but replication group is "
+ replicationGroup;
assert replicationGroup == null || replicationGroup.equals(calculateReplicationGroup())
: "cached replication group out of sync: expected: " + calculateReplicationGroup() + " but was: " + replicationGroup;
// all assigned shards from the routing table are tracked
assert routingTable == null || checkpoints.keySet().containsAll(routingTable.getAllAllocationIds()) : "local checkpoints "
+ checkpoints
+ " not in-sync with routing table "
+ routingTable;
for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) {
// blocking global checkpoint advancement only happens for shards that are not in-sync
assert !pendingInSync.contains(entry.getKey()) || !entry.getValue().inSync : "shard copy "
+ entry.getKey()
+ " blocks global checkpoint advancement but is in-sync";
// in-sync shard copies are tracked
assert !entry.getValue().inSync || entry.getValue().tracked : "shard copy " + entry.getKey() + " is in-sync but not tracked";
}
// all pending in sync shards are tracked
for (String aId : pendingInSync) {
assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked";
}
if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases
// Skip assertion if createMissingPeerRecoveryRetentionLeases has not yet run after activating primary context
// This is required since during an ongoing remote store migration,
// remote enabled primary taking over primary context from another remote enabled shard
// might not have retention leases for docrep shard copies
// (since all RetentionLease sync actions are blocked on remote shard copies)
&& createdMissingRetentionLeases) {
// all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
final CheckpointState cps = checkpoints.get(shardRouting.allocationId().getId());
if (cps.tracked && cps.replicated) {
assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
: "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases;
assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(
retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source()
) : "incorrect source ["
+ retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source()
+ "] for ["
+ shardRouting
+ "] in "
+ retentionLeases;
}
}
}
return true;
}
private static long inSyncCheckpointStates(
final Map<String, CheckpointState> checkpoints,
ToLongFunction<CheckpointState> function,
Function<LongStream, OptionalLong> reducer
) {
final OptionalLong value = reducer.apply(
checkpoints.values()
.stream()