Skip to content

Commit d235a0d

Browse files
swuferhongLiebingYu
authored andcommitted
Address yuxia's comments and fix the error encounted when upgrading
1 parent 933f31c commit d235a0d

File tree

12 files changed

+68
-282
lines changed

12 files changed

+68
-282
lines changed

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -422,12 +422,6 @@ public class ConfigOptions {
422422
.withDescription(
423423
"Defines how long the buffer pool will block when waiting for segments to become available.");
424424

425-
public static final ConfigOption<Boolean> TABLET_SERVER_CONTROLLED_SHUTDOWN_ENABLED =
426-
key("tablet-server.controlled-shutdown.enabled")
427-
.booleanType()
428-
.defaultValue(true)
429-
.withDescription("Whether to enable controlled shutdown for TabletServer.");
430-
431425
// ------------------------------------------------------------------
432426
// ZooKeeper Settings
433427
// ------------------------------------------------------------------

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,17 @@ public int getCoordinatorEpoch() {
111111
return coordinatorEpoch;
112112
}
113113

114-
public Map<Integer, ServerInfo> getLiveTabletServers() {
114+
public Set<ServerInfo> liveTabletServerInfos() {
115+
Set<ServerInfo> liveTabletServers = new HashSet<>();
116+
for (ServerInfo serverInfo : this.liveTabletServers.values()) {
117+
if (!shuttingDownTabletServers.contains(serverInfo.id())) {
118+
liveTabletServers.add(serverInfo);
119+
}
120+
}
115121
return liveTabletServers;
116122
}
117123

118-
public Set<Integer> liveTabletServerSet() {
124+
public Set<Integer> liveTabletServerIds() {
119125
Set<Integer> liveTabletServers = new HashSet<>();
120126
for (Integer brokerId : this.liveTabletServers.keySet()) {
121127
if (!shuttingDownTabletServers.contains(brokerId)) {
@@ -165,7 +171,7 @@ public boolean isReplicaOnline(
165171
if (includeShuttingDownTabletServers) {
166172
serverOnline = liveOrShuttingDownTabletServers().contains(serverId);
167173
} else {
168-
serverOnline = liveTabletServerSet().contains(serverId);
174+
serverOnline = liveTabletServerIds().contains(serverId);
169175
}
170176

171177
return serverOnline
@@ -669,4 +675,9 @@ public void resetContext() {
669675
liveTabletServers.clear();
670676
shuttingDownTabletServers.clear();
671677
}
678+
679+
@VisibleForTesting
680+
public Map<Integer, ServerInfo> getLiveTabletServers() {
681+
return liveTabletServers;
682+
}
672683
}

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 16 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,7 @@ public void startup() {
239239
// can process the LeaderRequests that are generated by replicaStateMachine.startup() and
240240
// partitionStateMachine.startup().
241241
// update coordinator metadata cache when CoordinatorServer start.
242-
HashSet<ServerInfo> tabletServerInfoList =
243-
new HashSet<>(coordinatorContext.getLiveTabletServers().values());
242+
Set<ServerInfo> tabletServerInfoList = coordinatorContext.liveTabletServerInfos();
244243
serverMetadataCache.updateMetadata(
245244
coordinatorContext.getCoordinatorServerInfo(), tabletServerInfoList);
246245
updateTabletServerMetadataCacheWhenStartup(tabletServerInfoList);
@@ -515,7 +514,7 @@ public void process(CoordinatorEvent event) {
515514
}
516515

517516
private void updateMetrics() {
518-
tabletServerCount = coordinatorContext.getLiveTabletServers().size();
517+
tabletServerCount = coordinatorContext.liveOrShuttingDownTabletServers().size();
519518
tableCount = coordinatorContext.allTables().size();
520519
bucketCount = coordinatorContext.bucketLeaderAndIsr().size();
521520
offlineBucketCount = coordinatorContext.getOfflineBucketCount();
@@ -570,13 +569,10 @@ private void processCreateTable(CreateTableEvent createTableEvent) {
570569
.keySet()
571570
.forEach(bucketId -> tableBuckets.add(new TableBucket(tableId, bucketId)));
572571
updateTabletServerMetadataCache(
573-
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
574-
null,
575-
null,
576-
tableBuckets);
572+
coordinatorContext.liveTabletServerInfos(), null, null, tableBuckets);
577573
} else {
578574
updateTabletServerMetadataCache(
579-
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
575+
coordinatorContext.liveTabletServerInfos(),
580576
tableId,
581577
null,
582578
Collections.emptySet());
@@ -609,10 +605,7 @@ private void processCreatePartition(CreatePartitionEvent createPartitionEvent) {
609605
bucketId ->
610606
tableBuckets.add(new TableBucket(tableId, partitionId, bucketId)));
611607
updateTabletServerMetadataCache(
612-
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
613-
null,
614-
null,
615-
tableBuckets);
608+
coordinatorContext.liveTabletServerInfos(), null, null, tableBuckets);
616609
}
617610

618611
private void processDropTable(DropTableEvent dropTableEvent) {
@@ -636,10 +629,7 @@ private void processDropTable(DropTableEvent dropTableEvent) {
636629

637630
// send update metadata request.
638631
updateTabletServerMetadataCache(
639-
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
640-
tableId,
641-
null,
642-
Collections.emptySet());
632+
coordinatorContext.liveTabletServerInfos(), tableId, null, Collections.emptySet());
643633
}
644634

645635
private void processDropPartition(DropPartitionEvent dropPartitionEvent) {
@@ -663,7 +653,7 @@ private void processDropPartition(DropPartitionEvent dropPartitionEvent) {
663653

664654
// send update metadata request.
665655
updateTabletServerMetadataCache(
666-
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
656+
coordinatorContext.liveTabletServerInfos(),
667657
tableId,
668658
tablePartition.getPartitionId(),
669659
Collections.emptySet());
@@ -781,7 +771,7 @@ private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) {
781771
// when we finish the logic of tablet server
782772
ServerInfo serverInfo = newTabletServerEvent.getServerInfo();
783773
int tabletServerId = serverInfo.id();
784-
if (coordinatorContext.getLiveTabletServers().containsKey(serverInfo.id())) {
774+
if (coordinatorContext.liveTabletServerIds().contains(serverInfo.id())) {
785775
// if the dead server is already in live servers, return directly
786776
// it may happen during coordinator server initiation, the watcher watch a new tablet
787777
// server register event and put it to event manager, but after that, the coordinator
@@ -803,13 +793,10 @@ private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) {
803793
// update coordinatorServer metadata cache for the new added table server.
804794
serverMetadataCache.updateMetadata(
805795
coordinatorContext.getCoordinatorServerInfo(),
806-
new HashSet<>(coordinatorContext.getLiveTabletServers().values()));
796+
coordinatorContext.liveTabletServerInfos());
807797
// update server info for all tablet servers.
808798
updateTabletServerMetadataCache(
809-
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
810-
null,
811-
null,
812-
Collections.emptySet());
799+
coordinatorContext.liveTabletServerInfos(), null, null, Collections.emptySet());
813800
// update table info for the new added table server.
814801
updateTabletServerMetadataCache(
815802
Collections.singleton(serverInfo),
@@ -838,7 +825,7 @@ private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) {
838825

839826
private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent) {
840827
int tabletServerId = deadTabletServerEvent.getServerId();
841-
if (!coordinatorContext.getLiveTabletServers().containsKey(tabletServerId)) {
828+
if (!coordinatorContext.liveOrShuttingDownTabletServers().contains(tabletServerId)) {
842829
// if the dead server is already not in live servers, return directly
843830
// it may happen during coordinator server initiation, the watcher watch a new tablet
844831
// server unregister event, but the coordinator server also don't read it from zk and
@@ -856,8 +843,7 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent
856843
// coordinatorServer metadata. The purpose of this approach is to prevent the scenario where
857844
// NotifyLeaderAndIsrRequest gets sent before UpdateMetadataRequest, which could cause the
858845
// leader to incorrectly adjust isr.
859-
Set<ServerInfo> serverInfos =
860-
new HashSet<>(coordinatorContext.getLiveTabletServers().values());
846+
Set<ServerInfo> serverInfos = coordinatorContext.liveTabletServerInfos();
861847
// update coordinatorServer metadata cache.
862848
serverMetadataCache.updateMetadata(
863849
coordinatorContext.getCoordinatorServerInfo(), serverInfos);
@@ -1118,7 +1104,7 @@ private ControlledShutdownResponse tryProcessControlledShutdown(
11181104
ControlledShutdownResponse response = new ControlledShutdownResponse();
11191105

11201106
// TODO here we need to check tabletServerEpoch, avoid to receive controlled shutdown
1121-
// request from and old tabletServer.
1107+
// request from an old tabletServer. Trace by https://github.com/alibaba/fluss/issues/1153
11221108
int tabletServerEpoch = controlledShutdownEvent.getTabletServerEpoch();
11231109

11241110
int tabletServerId = controlledShutdownEvent.getTabletServerId();
@@ -1135,14 +1121,14 @@ private ControlledShutdownResponse tryProcessControlledShutdown(
11351121
LOG.debug(
11361122
"All shutting down tabletServers: {}",
11371123
coordinatorContext.shuttingDownTabletServers());
1138-
LOG.debug("All live tabletServers: {}", coordinatorContext.liveTabletServerSet());
1124+
LOG.debug("All live tabletServers: {}", coordinatorContext.liveTabletServerIds());
11391125

11401126
List<TableBucketReplica> replicasToActOn =
11411127
coordinatorContext.replicasOnTabletServer(tabletServerId).stream()
11421128
.filter(
11431129
replica -> {
11441130
TableBucket tableBucket = replica.getTableBucket();
1145-
return coordinatorContext.getAssignment(tableBucket).size() >= 1
1131+
return !coordinatorContext.getAssignment(tableBucket).isEmpty()
11461132
&& coordinatorContext
11471133
.getBucketLeaderAndIsr(tableBucket)
11481134
.isPresent()
@@ -1165,16 +1151,7 @@ private ControlledShutdownResponse tryProcessControlledShutdown(
11651151
tableBucketStateMachine.handleStateChange(
11661152
bucketsLedByServer, OnlineBucket, CONTROLLED_SHUTDOWN_ELECTION);
11671153

1168-
coordinatorRequestBatch.newBatch();
1169-
replicasFollowedByServer.forEach(
1170-
replica ->
1171-
coordinatorRequestBatch.addStopReplicaRequestForTabletServers(
1172-
Collections.singleton(tabletServerId),
1173-
replica.getTableBucket(),
1174-
false,
1175-
coordinatorContext.getBucketLeaderEpoch(replica.getTableBucket())));
1176-
coordinatorRequestBatch.sendRequestToTabletServers(
1177-
coordinatorContext.getCoordinatorEpoch());
1154+
// TODO need send stop request to the leader?
11781155

11791156
// If the tabletServer is a follower, updates the isr in ZK and notifies the current leader.
11801157
replicaStateMachine.handleStateChanges(replicasFollowedByServer, OfflineReplica);

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public void addNotifyLeaderRequestForTabletServers(
211211
List<Integer> bucketReplicas,
212212
LeaderAndIsr leaderAndIsr) {
213213
tabletServers.stream()
214-
.filter(s -> s >= 0)
214+
.filter(s -> s >= 0 && !coordinatorContext.shuttingDownTabletServers().contains(s))
215215
.forEach(
216216
id -> {
217217
Map<TableBucket, PbNotifyLeaderAndIsrReqForBucket>
@@ -231,7 +231,7 @@ public void addNotifyLeaderRequestForTabletServers(
231231
// TODO for these cases, we can send NotifyLeaderAndIsrRequest instead of another
232232
// updateMetadata request, trace by: https://github.com/alibaba/fluss/issues/983
233233
addUpdateMetadataRequestForTabletServers(
234-
coordinatorContext.getLiveTabletServers().keySet(),
234+
coordinatorContext.liveTabletServerIds(),
235235
null,
236236
null,
237237
Collections.singleton(tableBucket));
@@ -243,7 +243,7 @@ public void addStopReplicaRequestForTabletServers(
243243
boolean isDelete,
244244
int leaderEpoch) {
245245
tabletServers.stream()
246-
.filter(s -> s >= 0)
246+
.filter(s -> s >= 0 && !coordinatorContext.shuttingDownTabletServers().contains(s))
247247
.forEach(
248248
id -> {
249249
Map<TableBucket, PbStopReplicaReqForBucket> stopBucketReplica =
@@ -288,7 +288,7 @@ public void addUpdateMetadataRequestForTabletServers(
288288
Set<TableBucket> tableBuckets) {
289289
// case9:
290290
tabletServers.stream()
291-
.filter(s -> s >= 0)
291+
.filter(s -> s >= 0 && !coordinatorContext.shuttingDownTabletServers().contains(s))
292292
.forEach(updateMetadataRequestTabletServerSet::add);
293293

294294
if (tableId != null) {
@@ -349,7 +349,7 @@ public void addNotifyRemoteLogOffsetsRequestForTabletServers(
349349
long remoteLogStartOffset,
350350
long remoteLogEndOffset) {
351351
tabletServers.stream()
352-
.filter(s -> s >= 0)
352+
.filter(s -> s >= 0 && !coordinatorContext.shuttingDownTabletServers().contains(s))
353353
.forEach(
354354
id ->
355355
notifyRemoteLogOffsetsRequestMap.put(
@@ -363,7 +363,7 @@ public void addNotifyRemoteLogOffsetsRequestForTabletServers(
363363
public void addNotifyKvSnapshotOffsetRequestForTabletServers(
364364
List<Integer> tabletServers, TableBucket tableBucket, long minRetainOffset) {
365365
tabletServers.stream()
366-
.filter(s -> s >= 0)
366+
.filter(s -> s >= 0 && !coordinatorContext.shuttingDownTabletServers().contains(s))
367367
.forEach(
368368
id ->
369369
notifyKvSnapshotOffsetRequestMap.put(
@@ -377,7 +377,7 @@ public void addNotifyLakeTableOffsetRequestForTableServers(
377377
TableBucket tableBucket,
378378
LakeTableSnapshot lakeTableSnapshot) {
379379
tabletServers.stream()
380-
.filter(s -> s >= 0)
380+
.filter(s -> s >= 0 && !coordinatorContext.shuttingDownTabletServers().contains(s))
381381
.forEach(
382382
id -> {
383383
Map<TableBucket, PbNotifyLakeTableOffsetReqForBucket>
@@ -670,7 +670,7 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() {
670670
// tablet servers.
671671
return makeUpdateMetadataRequest(
672672
coordinatorContext.getCoordinatorServerInfo(),
673-
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
673+
coordinatorContext.liveTabletServerInfos(),
674674
tableMetadataList,
675675
partitionMetadataList);
676676
}

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,6 @@ private LakeCatalog createLakeCatalog() {
265265
@Override
266266
protected CompletableFuture<Result> closeAsync(Result result) {
267267
if (isShutDown.compareAndSet(false, true)) {
268-
269268
serverState = ServerState.SHUTTING_DOWN;
270269
LOG.info("Shutting down Coordinator server ({}).", result);
271270
CompletableFuture<Void> serviceShutdownFuture = stopServices();

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachine.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -427,8 +427,9 @@ private Optional<ElectionResult> doInitElectionForBucket(
427427
}
428428
// For the case that the table bucket has been initialized, we use all the live assigned
429429
// servers as inSyncReplica set.
430+
List<Integer> isr = liveServers;
430431
Optional<Integer> leaderOpt =
431-
defaultReplicaLeaderElection(assignedServers, liveServers, liveServers);
432+
defaultReplicaLeaderElection(assignedServers, liveServers, isr);
432433
if (!leaderOpt.isPresent()) {
433434
LOG.error(
434435
"The leader election for table bucket {} is empty.",
@@ -439,8 +440,7 @@ private Optional<ElectionResult> doInitElectionForBucket(
439440

440441
// Register the initial leader and isr.
441442
LeaderAndIsr leaderAndIsr =
442-
new LeaderAndIsr(
443-
leader, 0, liveServers, coordinatorContext.getCoordinatorEpoch(), 0);
443+
new LeaderAndIsr(leader, 0, isr, coordinatorContext.getCoordinatorEpoch(), 0);
444444

445445
return Optional.of(new ElectionResult(liveServers, leaderAndIsr));
446446
}
@@ -582,10 +582,10 @@ private String stringifyBucket(TableBucket tableBucket) {
582582
*
583583
* <p>The elect cases including:
584584
*
585-
* <ul>
586-
* <li>1. new or offline bucket
587-
* <li>2. tabletServer controlled shutdown
588-
* </ul>
585+
* <ol>
586+
* <li>new or offline bucket
587+
* <li>tabletServer controlled shutdown
588+
* </ol>
589589
*/
590590
private Optional<ElectionResult> electLeader(
591591
TableBucket tableBucket,
@@ -633,7 +633,18 @@ private Optional<ElectionResult> electLeader(
633633
new LeaderAndIsr(
634634
leaderOpt.get(),
635635
leaderAndIsr.leaderEpoch() + 1,
636-
leaderAndIsr.isr(),
636+
leaderAndIsr.isr().stream()
637+
.filter(
638+
isr -> {
639+
if (electionStrategy == CONTROLLED_SHUTDOWN_ELECTION) {
640+
return !coordinatorContext
641+
.shuttingDownTabletServers()
642+
.contains(isr);
643+
} else {
644+
return true;
645+
}
646+
})
647+
.collect(Collectors.toList()),
637648
coordinatorContext.getCoordinatorEpoch(),
638649
leaderAndIsr.bucketEpoch() + 1);
639650

fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,6 @@ public class TabletServer extends ServerBase {
104104
*/
105105
private final @Nullable String rack;
106106

107-
private final boolean controlledShutdownEnabled;
108-
109107
/** The lock to guard startup / shutdown / manipulation methods. */
110108
private final Object lock = new Object();
111109

@@ -171,8 +169,6 @@ public TabletServer(Configuration conf, Clock clock) {
171169
this.terminationFuture = new CompletableFuture<>();
172170
this.serverId = conf.getInt(ConfigOptions.TABLET_SERVER_ID);
173171
this.rack = conf.getString(ConfigOptions.TABLET_SERVER_RACK);
174-
this.controlledShutdownEnabled =
175-
conf.getBoolean(ConfigOptions.TABLET_SERVER_CONTROLLED_SHUTDOWN_ENABLED);
176172
this.interListenerName = conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
177173
this.clock = clock;
178174
}
@@ -433,10 +429,6 @@ CompletableFuture<Void> stopServices() {
433429
}
434430

435431
private void controlledShutDown() {
436-
if (!controlledShutdownEnabled) {
437-
return;
438-
}
439-
440432
LOG.info("Starting controlled shutdown.");
441433
serverState = ServerState.PENDING_CONTROLLED_SHUTDOWN;
442434

@@ -463,7 +455,7 @@ private void controlledShutDown() {
463455
remainingLeaderBuckets.add(
464456
toTableBucket(pbTableBucket)));
465457
LOG.warn(
466-
"TabletServer {} is still the leader for the following buckets: {} After Controlled Shutdown",
458+
"TabletServer {} is still the leader for the following buckets: {} after Controlled Shutdown",
467459
serverId,
468460
remainingLeaderBuckets);
469461
} else {

0 commit comments

Comments
 (0)