Skip to content

Commit 79d9dbb

Browse files
authored
[server] TabletServer support controlled shutdown (#1159)
1 parent 67eefc4 commit 79d9dbb

24 files changed

+734
-97
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.exception;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
/**
23+
* Thrown when the tabletServer is not available.
24+
*
25+
* @since 0.8
26+
*/
27+
@PublicEvolving
28+
public class TabletServerNotAvailableException extends ApiException {
29+
public TabletServerNotAvailableException(String message) {
30+
super(message);
31+
}
32+
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
2727
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
2828
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
29+
import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
30+
import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
2931
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest;
3032
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse;
3133
import org.apache.fluss.rpc.protocol.ApiKeys;
@@ -78,4 +80,9 @@ CompletableFuture<CommitLakeTableSnapshotResponse> commitLakeTableSnapshot(
7880
@RPC(api = ApiKeys.LAKE_TIERING_HEARTBEAT)
7981
CompletableFuture<LakeTieringHeartbeatResponse> lakeTieringHeartbeat(
8082
LakeTieringHeartbeatRequest request);
83+
84+
/** Try to controlled shutdown for tabletServer with specify tabletServerId. */
85+
@RPC(api = ApiKeys.CONTROLLED_SHUTDOWN)
86+
CompletableFuture<ControlledShutdownResponse> controlledShutdown(
87+
ControlledShutdownRequest request);
8188
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public enum ApiKeys {
7070
CREATE_ACLS(1039, 0, 0, PUBLIC),
7171
LIST_ACLS(1040, 0, 0, PUBLIC),
7272
DROP_ACLS(1041, 0, 0, PUBLIC),
73-
LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE);
73+
LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE),
74+
CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE);
7475

7576
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
7677
Arrays.stream(ApiKeys.values())

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,14 @@ message LakeTieringHeartbeatResponse {
532532
repeated PbHeartbeatRespForTable failed_table_resp = 5;
533533
}
534534

535+
message ControlledShutdownRequest {
536+
required int32 tablet_server_id = 1;
537+
required int32 tablet_server_epoch = 2;
538+
}
539+
540+
message ControlledShutdownResponse {
541+
repeated PbTableBucket remaining_leader_buckets = 1;
542+
}
535543

536544
// --------------- Inner classes ----------------
537545
message PbApiVersion {

fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ protected static void startServer(ServerBase server) {
115115
public void start() throws Exception {
116116
try {
117117
addShutDownHook();
118+
118119
// at first, we need to initialize the file system
119120
pluginManager = PluginUtils.createPluginManagerFromRootFolder(conf);
120121
FileSystem.initialize(conf, pluginManager);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class CoordinatorContext {
6767
private final Map<TableBucketReplica, Integer> failDeleteNumbers = new HashMap<>();
6868

6969
private final Map<Integer, ServerInfo> liveTabletServers = new HashMap<>();
70+
private final Set<Integer> shuttingDownTabletServers = new HashSet<>();
7071

7172
// a map from the table bucket to the state of the bucket.
7273
private final Map<TableBucket, BucketState> bucketStates = new HashMap<>();
@@ -114,6 +115,24 @@ public Map<Integer, ServerInfo> getLiveTabletServers() {
114115
return liveTabletServers;
115116
}
116117

118+
public Set<Integer> liveTabletServerSet() {
119+
Set<Integer> liveTabletServers = new HashSet<>();
120+
for (Integer brokerId : this.liveTabletServers.keySet()) {
121+
if (!shuttingDownTabletServers.contains(brokerId)) {
122+
liveTabletServers.add(brokerId);
123+
}
124+
}
125+
return liveTabletServers;
126+
}
127+
128+
public Set<Integer> shuttingDownTabletServers() {
129+
return shuttingDownTabletServers;
130+
}
131+
132+
public Set<Integer> liveOrShuttingDownTabletServers() {
133+
return liveTabletServers.keySet();
134+
}
135+
117136
@VisibleForTesting
118137
public void setLiveTabletServers(List<ServerInfo> servers) {
119138
liveTabletServers.clear();
@@ -136,8 +155,8 @@ public void removeLiveTabletServer(int serverId) {
136155
this.liveTabletServers.remove(serverId);
137156
}
138157

139-
public boolean isReplicaAndServerOnline(int serverId, TableBucket tableBucket) {
140-
return liveTabletServers.containsKey(serverId)
158+
public boolean isReplicaOnline(int serverId, TableBucket tableBucket) {
159+
return liveTabletServerSet().contains(serverId)
141160
&& !replicasOnOffline
142161
.getOrDefault(serverId, Collections.emptySet())
143162
.contains(tableBucket);
@@ -636,5 +655,6 @@ public void resetContext() {
636655
clearTablesState();
637656
// clear the live tablet servers
638657
liveTabletServers.clear();
658+
shuttingDownTabletServers.clear();
639659
}
640660
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.fluss.exception.FlussRuntimeException;
2828
import org.apache.fluss.exception.InvalidCoordinatorException;
2929
import org.apache.fluss.exception.InvalidUpdateVersionException;
30+
import org.apache.fluss.exception.TabletServerNotAvailableException;
3031
import org.apache.fluss.exception.UnknownTableOrBucketException;
3132
import org.apache.fluss.metadata.PhysicalTablePath;
3233
import org.apache.fluss.metadata.TableBucket;
@@ -38,13 +39,15 @@
3839
import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse;
3940
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
4041
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
42+
import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
4143
import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
4244
import org.apache.fluss.rpc.protocol.ApiError;
4345
import org.apache.fluss.server.coordinator.event.AccessContextEvent;
4446
import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
4547
import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
4648
import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
4749
import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
50+
import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
4851
import org.apache.fluss.server.coordinator.event.CoordinatorEvent;
4952
import org.apache.fluss.server.coordinator.event.CoordinatorEventManager;
5053
import org.apache.fluss.server.coordinator.event.CreatePartitionEvent;
@@ -72,6 +75,7 @@
7275
import org.apache.fluss.server.metadata.CoordinatorMetadataCache;
7376
import org.apache.fluss.server.metadata.ServerInfo;
7477
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
78+
import org.apache.fluss.server.utils.ServerRpcMessageUtils;
7579
import org.apache.fluss.server.zk.ZooKeeperClient;
7680
import org.apache.fluss.server.zk.data.BucketAssignment;
7781
import org.apache.fluss.server.zk.data.LakeTableSnapshot;
@@ -104,6 +108,7 @@
104108

105109
import static org.apache.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
106110
import static org.apache.fluss.server.coordinator.statemachine.BucketState.OnlineBucket;
111+
import static org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.CONTROLLED_SHUTDOWN_ELECTION;
107112
import static org.apache.fluss.server.coordinator.statemachine.ReplicaState.OfflineReplica;
108113
import static org.apache.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
109114
import static org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted;
@@ -529,6 +534,11 @@ public void process(CoordinatorEvent event) {
529534
completeFromCallable(
530535
commitLakeTableSnapshotEvent.getRespCallback(),
531536
() -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
537+
} else if (event instanceof ControlledShutdownEvent) {
538+
ControlledShutdownEvent controlledShutdownEvent = (ControlledShutdownEvent) event;
539+
completeFromCallable(
540+
controlledShutdownEvent.getRespCallback(),
541+
() -> tryProcessControlledShutdown(controlledShutdownEvent));
532542
} else if (event instanceof AccessContextEvent) {
533543
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
534544
processAccessContext(accessContextEvent);
@@ -865,6 +875,7 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent
865875
LOG.info("Tablet server failure callback for {}.", tabletServerId);
866876
coordinatorContext.removeOfflineBucketInServer(tabletServerId);
867877
coordinatorContext.removeLiveTabletServer(tabletServerId);
878+
coordinatorContext.shuttingDownTabletServers().remove(tabletServerId);
868879
coordinatorChannelManager.removeTabletServer(tabletServerId);
869880

870881
// Here, we will first update alive tabletServer info for all tabletServers and
@@ -1165,6 +1176,73 @@ private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot(
11651176
return response;
11661177
}
11671178

1179+
private ControlledShutdownResponse tryProcessControlledShutdown(
1180+
ControlledShutdownEvent controlledShutdownEvent) {
1181+
ControlledShutdownResponse response = new ControlledShutdownResponse();
1182+
1183+
// TODO here we need to check tabletServerEpoch, avoid to receive controlled shutdown
1184+
// request from an old tabletServer. Trace by https://github.com/alibaba/fluss/issues/1153
1185+
int tabletServerEpoch = controlledShutdownEvent.getTabletServerEpoch();
1186+
1187+
int tabletServerId = controlledShutdownEvent.getTabletServerId();
1188+
LOG.info(
1189+
"Try to process controlled shutdown for tabletServer: {} of tabletServer epoch: {}",
1190+
controlledShutdownEvent.getTabletServerId(),
1191+
tabletServerEpoch);
1192+
1193+
if (!coordinatorContext.liveOrShuttingDownTabletServers().contains(tabletServerId)) {
1194+
throw new TabletServerNotAvailableException(
1195+
"TabletServer" + tabletServerId + " is not available.");
1196+
}
1197+
1198+
coordinatorContext.shuttingDownTabletServers().add(tabletServerId);
1199+
LOG.debug(
1200+
"All shutting down tabletServers: {}",
1201+
coordinatorContext.shuttingDownTabletServers());
1202+
LOG.debug("All live tabletServers: {}", coordinatorContext.liveTabletServerSet());
1203+
1204+
List<TableBucketReplica> replicasToActOn =
1205+
coordinatorContext.replicasOnTabletServer(tabletServerId).stream()
1206+
.filter(
1207+
replica -> {
1208+
TableBucket tableBucket = replica.getTableBucket();
1209+
return !coordinatorContext.getAssignment(tableBucket).isEmpty()
1210+
&& coordinatorContext
1211+
.getBucketLeaderAndIsr(tableBucket)
1212+
.isPresent()
1213+
&& !coordinatorContext.isToBeDeleted(tableBucket);
1214+
})
1215+
.collect(Collectors.toList());
1216+
1217+
Set<TableBucket> bucketsLedByServer = new HashSet<>();
1218+
Set<TableBucketReplica> replicasFollowedByServer = new HashSet<>();
1219+
for (TableBucketReplica replica : replicasToActOn) {
1220+
TableBucket tableBucket = replica.getTableBucket();
1221+
if (replica.getReplica()
1222+
== coordinatorContext.getBucketLeaderAndIsr(tableBucket).get().leader()) {
1223+
bucketsLedByServer.add(tableBucket);
1224+
} else {
1225+
replicasFollowedByServer.add(replica);
1226+
}
1227+
}
1228+
1229+
tableBucketStateMachine.handleStateChange(
1230+
bucketsLedByServer, OnlineBucket, CONTROLLED_SHUTDOWN_ELECTION);
1231+
1232+
// TODO need send stop request to the leader?
1233+
1234+
// If the tabletServer is a follower, updates the isr in ZK and notifies the current leader.
1235+
replicaStateMachine.handleStateChanges(replicasFollowedByServer, OfflineReplica);
1236+
1237+
// Return the list of buckets that are still being managed by the controlled shutdown
1238+
// tabletServer after leader migration.
1239+
response.addAllRemainingLeaderBuckets(
1240+
coordinatorContext.getBucketsWithLeaderIn(tabletServerId).stream()
1241+
.map(ServerRpcMessageUtils::fromTableBucket)
1242+
.collect(Collectors.toList()));
1243+
return response;
1244+
}
1245+
11681246
private void validateFencedEvent(FencedCoordinatorEvent event) {
11691247
TableBucket tb = event.getTableBucket();
11701248
if (coordinatorContext.getTablePathById(tb.getTableId()) == null) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public void addNotifyLeaderRequestForTabletServers(
211211
List<Integer> bucketReplicas,
212212
LeaderAndIsr leaderAndIsr) {
213213
tabletServers.stream()
214-
.filter(s -> s >= 0)
214+
.filter(s -> s >= 0 && !coordinatorContext.shuttingDownTabletServers().contains(s))
215215
.forEach(
216216
id -> {
217217
Map<TableBucket, PbNotifyLeaderAndIsrReqForBucket>

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
5050
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
5151
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
52+
import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
53+
import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
5254
import org.apache.fluss.rpc.messages.CreateAclsRequest;
5355
import org.apache.fluss.rpc.messages.CreateAclsResponse;
5456
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
@@ -86,6 +88,7 @@
8688
import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
8789
import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
8890
import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
91+
import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
8992
import org.apache.fluss.server.coordinator.event.EventManager;
9093
import org.apache.fluss.server.entity.CommitKvSnapshotData;
9194
import org.apache.fluss.server.entity.LakeTieringTableInfo;
@@ -575,6 +578,20 @@ public CompletableFuture<LakeTieringHeartbeatResponse> lakeTieringHeartbeat(
575578
return CompletableFuture.completedFuture(heartbeatResponse);
576579
}
577580

581+
@Override
582+
public CompletableFuture<ControlledShutdownResponse> controlledShutdown(
583+
ControlledShutdownRequest request) {
584+
CompletableFuture<ControlledShutdownResponse> response = new CompletableFuture<>();
585+
eventManagerSupplier
586+
.get()
587+
.put(
588+
new ControlledShutdownEvent(
589+
request.getTabletServerId(),
590+
request.getTabletServerEpoch(),
591+
response));
592+
return response;
593+
}
594+
578595
private void validateHeartbeatRequest(
579596
PbHeartbeatReqForTable heartbeatReqForTable, int currentEpoch) {
580597
if (heartbeatReqForTable.getCoordinatorEpoch() != currentEpoch) {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.coordinator.event;
19+
20+
import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
21+
22+
import java.util.concurrent.CompletableFuture;
23+
24+
/** An event for controlled shutdown of TabletServer. */
25+
public class ControlledShutdownEvent implements CoordinatorEvent {
26+
private final int tabletServerId;
27+
private final int tabletServerEpoch;
28+
private final CompletableFuture<ControlledShutdownResponse> respCallback;
29+
30+
public ControlledShutdownEvent(
31+
int tabletServerId,
32+
int tabletServerEpoch,
33+
CompletableFuture<ControlledShutdownResponse> respCallback) {
34+
this.tabletServerId = tabletServerId;
35+
this.tabletServerEpoch = tabletServerEpoch;
36+
this.respCallback = respCallback;
37+
}
38+
39+
public int getTabletServerId() {
40+
return tabletServerId;
41+
}
42+
43+
public int getTabletServerEpoch() {
44+
return tabletServerEpoch;
45+
}
46+
47+
public CompletableFuture<ControlledShutdownResponse> getRespCallback() {
48+
return respCallback;
49+
}
50+
}

0 commit comments

Comments
 (0)