Skip to content

Commit 933f31c

Browse files
swuferhongLiebingYu
authored andcommitted
[Server] TabletServer support controlled shutdown
1 parent 56062d8 commit 933f31c

23 files changed

+892
-43
lines changed

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,12 @@ public class ConfigOptions {
422422
.withDescription(
423423
"Defines how long the buffer pool will block when waiting for segments to become available.");
424424

425+
public static final ConfigOption<Boolean> TABLET_SERVER_CONTROLLED_SHUTDOWN_ENABLED =
426+
key("tablet-server.controlled-shutdown.enabled")
427+
.booleanType()
428+
.defaultValue(true)
429+
.withDescription("Whether to enable controlled shutdown for TabletServer.");
430+
425431
// ------------------------------------------------------------------
426432
// ZooKeeper Settings
427433
// ------------------------------------------------------------------
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.exception;
19+
20+
import com.alibaba.fluss.annotation.PublicEvolving;
21+
22+
/**
23+
* Thrown when the tabletServer is not available.
24+
*
25+
* @since 0.8
26+
*/
27+
@PublicEvolving
28+
public class TabletServerNotAvailableException extends ApiException {
29+
public TabletServerNotAvailableException(String message) {
30+
super(message);
31+
}
32+
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/CoordinatorGateway.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
2727
import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestRequest;
2828
import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestResponse;
29+
import com.alibaba.fluss.rpc.messages.ControlledShutdownRequest;
30+
import com.alibaba.fluss.rpc.messages.ControlledShutdownResponse;
2931
import com.alibaba.fluss.rpc.messages.LakeTieringHeartbeatRequest;
3032
import com.alibaba.fluss.rpc.messages.LakeTieringHeartbeatResponse;
3133
import com.alibaba.fluss.rpc.protocol.ApiKeys;
@@ -78,4 +80,9 @@ CompletableFuture<CommitLakeTableSnapshotResponse> commitLakeTableSnapshot(
7880
@RPC(api = ApiKeys.LAKE_TIERING_HEARTBEAT)
7981
CompletableFuture<LakeTieringHeartbeatResponse> lakeTieringHeartbeat(
8082
LakeTieringHeartbeatRequest request);
83+
84+
/** Try to controlled shutdown for tabletServer with specify tabletServerId. */
85+
@RPC(api = ApiKeys.CONTROLLED_SHUTDOWN)
86+
CompletableFuture<ControlledShutdownResponse> controlledShutdown(
87+
ControlledShutdownRequest request);
8188
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public enum ApiKeys {
7070
CREATE_ACLS(1039, 0, 0, PUBLIC),
7171
LIST_ACLS(1040, 0, 0, PUBLIC),
7272
DROP_ACLS(1041, 0, 0, PUBLIC),
73-
LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE);
73+
LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE),
74+
CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE);
7475

7576
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
7677
Arrays.stream(ApiKeys.values())

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,14 @@ message LakeTieringHeartbeatResponse {
529529
repeated PbHeartbeatRespForTable failed_table_resp = 5;
530530
}
531531

532+
message ControlledShutdownRequest {
533+
required int32 tablet_server_id = 1;
534+
required int32 tablet_server_epoch = 2;
535+
}
536+
537+
message ControlledShutdownResponse {
538+
repeated PbTableBucket remaining_leader_buckets = 1;
539+
}
532540

533541
// --------------- Inner classes ----------------
534542
message PbApiVersion {

fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public abstract class ServerBase implements AutoCloseableAsync, FatalErrorHandle
6565

6666
protected FileSystem remoteFileSystem;
6767
protected PluginManager pluginManager;
68+
protected volatile ServerState serverState = ServerState.NOT_RUNNING;
6869

6970
protected ServerBase(Configuration conf) {
7071
this.conf = conf;
@@ -108,6 +109,9 @@ protected static void startServer(ServerBase server) {
108109
public void start() throws Exception {
109110
try {
110111
addShutDownHook();
112+
113+
serverState = ServerState.STARTING;
114+
111115
// at first, we need to initialize the file system
112116
pluginManager = PluginUtils.createPluginManagerFromRootFolder(conf);
113117
FileSystem.initialize(conf, pluginManager);
@@ -117,6 +121,7 @@ public void start() throws Exception {
117121
remoteFileSystem = new FsPath(remoteDir).getFileSystem();
118122

119123
startServices();
124+
serverState = ServerState.RUNNING;
120125
} catch (Throwable t) {
121126
final Throwable strippedThrowable =
122127
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.server;
19+
20+
import com.alibaba.fluss.server.coordinator.CoordinatorServer;
21+
import com.alibaba.fluss.server.tablet.TabletServer;
22+
23+
/**
24+
* The server state.
25+
*
26+
* <p>For {@link CoordinatorServer}, The expected state transitions are:
27+
*
28+
* <p>NOT_RUNNING -> STARTING -> RUNNING -> SHUTTING_DOWN
29+
*
30+
* <p>For {@link TabletServer}, The expected state transitions are:
31+
*
32+
* <p>NOT_RUNNING -> STARTING -> RECOVERY -> RUNNING -> PENDING_CONTROLLED_SHUTDOWN -> SHUTTING_DOWN
33+
*/
34+
public enum ServerState {
35+
/** The state the server is in when it first starts up. */
36+
NOT_RUNNING,
37+
38+
/** The state the server is in when it is catching up with cluster metadata. */
39+
STARTING,
40+
41+
/**
42+
* The state the TabletServer is in when it is catching up with cluster metadata (like reload
43+
* log).
44+
*/
45+
RECOVERY,
46+
47+
/** The state the server is in when it has registered, and is accepting client requests. */
48+
RUNNING,
49+
50+
/** The state the TabletServer is in when it is attempting to perform a controlled shutdown. */
51+
PENDING_CONTROLLED_SHUTDOWN,
52+
53+
/** The state the server is in when it is shutting down. */
54+
SHUTTING_DOWN,
55+
56+
/** The state the server is in when it is unknown. */
57+
UNKNOWN
58+
}

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class CoordinatorContext {
6767
private final Map<TableBucketReplica, Integer> failDeleteNumbers = new HashMap<>();
6868

6969
private final Map<Integer, ServerInfo> liveTabletServers = new HashMap<>();
70+
private final Set<Integer> shuttingDownTabletServers = new HashSet<>();
7071

7172
// a map from the table bucket to the state of the bucket.
7273
private final Map<TableBucket, BucketState> bucketStates = new HashMap<>();
@@ -114,6 +115,24 @@ public Map<Integer, ServerInfo> getLiveTabletServers() {
114115
return liveTabletServers;
115116
}
116117

118+
public Set<Integer> liveTabletServerSet() {
119+
Set<Integer> liveTabletServers = new HashSet<>();
120+
for (Integer brokerId : this.liveTabletServers.keySet()) {
121+
if (!shuttingDownTabletServers.contains(brokerId)) {
122+
liveTabletServers.add(brokerId);
123+
}
124+
}
125+
return liveTabletServers;
126+
}
127+
128+
public Set<Integer> shuttingDownTabletServers() {
129+
return shuttingDownTabletServers;
130+
}
131+
132+
public Set<Integer> liveOrShuttingDownTabletServers() {
133+
return liveTabletServers.keySet();
134+
}
135+
117136
@VisibleForTesting
118137
public void setLiveTabletServers(List<ServerInfo> servers) {
119138
liveTabletServers.clear();
@@ -136,8 +155,20 @@ public void removeLiveTabletServer(int serverId) {
136155
this.liveTabletServers.remove(serverId);
137156
}
138157

139-
public boolean isReplicaAndServerOnline(int serverId, TableBucket tableBucket) {
140-
return liveTabletServers.containsKey(serverId)
158+
public boolean isReplicaOnline(int serverId, TableBucket tableBucket) {
159+
return isReplicaOnline(serverId, tableBucket, false);
160+
}
161+
162+
public boolean isReplicaOnline(
163+
int serverId, TableBucket tableBucket, boolean includeShuttingDownTabletServers) {
164+
boolean serverOnline;
165+
if (includeShuttingDownTabletServers) {
166+
serverOnline = liveOrShuttingDownTabletServers().contains(serverId);
167+
} else {
168+
serverOnline = liveTabletServerSet().contains(serverId);
169+
}
170+
171+
return serverOnline
141172
&& !replicasOnOffline
142173
.getOrDefault(serverId, Collections.emptySet())
143174
.contains(tableBucket);
@@ -636,5 +667,6 @@ public void resetContext() {
636667
clearTablesState();
637668
// clear the live tablet servers
638669
liveTabletServers.clear();
670+
shuttingDownTabletServers.clear();
639671
}
640672
}

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.alibaba.fluss.exception.FlussRuntimeException;
2828
import com.alibaba.fluss.exception.InvalidCoordinatorException;
2929
import com.alibaba.fluss.exception.InvalidUpdateVersionException;
30+
import com.alibaba.fluss.exception.TabletServerNotAvailableException;
3031
import com.alibaba.fluss.exception.UnknownTableOrBucketException;
3132
import com.alibaba.fluss.metadata.PhysicalTablePath;
3233
import com.alibaba.fluss.metadata.TableBucket;
@@ -39,13 +40,15 @@
3940
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
4041
import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
4142
import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestResponse;
43+
import com.alibaba.fluss.rpc.messages.ControlledShutdownResponse;
4244
import com.alibaba.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
4345
import com.alibaba.fluss.rpc.protocol.ApiError;
4446
import com.alibaba.fluss.server.coordinator.event.AccessContextEvent;
4547
import com.alibaba.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
4648
import com.alibaba.fluss.server.coordinator.event.CommitKvSnapshotEvent;
4749
import com.alibaba.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
4850
import com.alibaba.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
51+
import com.alibaba.fluss.server.coordinator.event.ControlledShutdownEvent;
4952
import com.alibaba.fluss.server.coordinator.event.CoordinatorEvent;
5053
import com.alibaba.fluss.server.coordinator.event.CoordinatorEventManager;
5154
import com.alibaba.fluss.server.coordinator.event.CreatePartitionEvent;
@@ -73,6 +76,7 @@
7376
import com.alibaba.fluss.server.metadata.CoordinatorMetadataCache;
7477
import com.alibaba.fluss.server.metadata.ServerInfo;
7578
import com.alibaba.fluss.server.metrics.group.CoordinatorMetricGroup;
79+
import com.alibaba.fluss.server.utils.ServerRpcMessageUtils;
7680
import com.alibaba.fluss.server.zk.ZooKeeperClient;
7781
import com.alibaba.fluss.server.zk.data.BucketAssignment;
7882
import com.alibaba.fluss.server.zk.data.LakeTableSnapshot;
@@ -104,6 +108,7 @@
104108

105109
import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
106110
import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.OnlineBucket;
111+
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.CONTROLLED_SHUTDOWN_ELECTION;
107112
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OfflineReplica;
108113
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
109114
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted;
@@ -493,6 +498,11 @@ public void process(CoordinatorEvent event) {
493498
completeFromCallable(
494499
commitLakeTableSnapshotEvent.getRespCallback(),
495500
() -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
501+
} else if (event instanceof ControlledShutdownEvent) {
502+
ControlledShutdownEvent controlledShutdownEvent = (ControlledShutdownEvent) event;
503+
completeFromCallable(
504+
controlledShutdownEvent.getRespCallback(),
505+
() -> tryProcessControlledShutdown(controlledShutdownEvent));
496506
} else if (event instanceof AccessContextEvent) {
497507
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
498508
processAccessContext(accessContextEvent);
@@ -839,6 +849,7 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent
839849
LOG.info("Tablet server failure callback for {}.", tabletServerId);
840850
coordinatorContext.removeOfflineBucketInServer(tabletServerId);
841851
coordinatorContext.removeLiveTabletServer(tabletServerId);
852+
coordinatorContext.shuttingDownTabletServers().remove(tabletServerId);
842853
coordinatorChannelManager.removeTabletServer(tabletServerId);
843854

844855
// Here, we will first update alive tabletServer info for all tabletServers and
@@ -1102,6 +1113,79 @@ private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot(
11021113
return response;
11031114
}
11041115

1116+
private ControlledShutdownResponse tryProcessControlledShutdown(
1117+
ControlledShutdownEvent controlledShutdownEvent) {
1118+
ControlledShutdownResponse response = new ControlledShutdownResponse();
1119+
1120+
// TODO here we need to check tabletServerEpoch, avoid to receive controlled shutdown
1121+
// request from and old tabletServer.
1122+
int tabletServerEpoch = controlledShutdownEvent.getTabletServerEpoch();
1123+
1124+
int tabletServerId = controlledShutdownEvent.getTabletServerId();
1125+
LOG.info(
1126+
"Try to process controlled shutdown for tabletServer: {}",
1127+
controlledShutdownEvent.getTabletServerId());
1128+
1129+
if (!coordinatorContext.liveOrShuttingDownTabletServers().contains(tabletServerId)) {
1130+
throw new TabletServerNotAvailableException(
1131+
"TabletServer" + tabletServerId + " is not available.");
1132+
}
1133+
1134+
coordinatorContext.shuttingDownTabletServers().add(tabletServerId);
1135+
LOG.debug(
1136+
"All shutting down tabletServers: {}",
1137+
coordinatorContext.shuttingDownTabletServers());
1138+
LOG.debug("All live tabletServers: {}", coordinatorContext.liveTabletServerSet());
1139+
1140+
List<TableBucketReplica> replicasToActOn =
1141+
coordinatorContext.replicasOnTabletServer(tabletServerId).stream()
1142+
.filter(
1143+
replica -> {
1144+
TableBucket tableBucket = replica.getTableBucket();
1145+
return coordinatorContext.getAssignment(tableBucket).size() >= 1
1146+
&& coordinatorContext
1147+
.getBucketLeaderAndIsr(tableBucket)
1148+
.isPresent()
1149+
&& !coordinatorContext.isToBeDeleted(tableBucket);
1150+
})
1151+
.collect(Collectors.toList());
1152+
1153+
Set<TableBucket> bucketsLedByServer = new HashSet<>();
1154+
Set<TableBucketReplica> replicasFollowedByServer = new HashSet<>();
1155+
for (TableBucketReplica replica : replicasToActOn) {
1156+
TableBucket tableBucket = replica.getTableBucket();
1157+
if (replica.getReplica()
1158+
== coordinatorContext.getBucketLeaderAndIsr(tableBucket).get().leader()) {
1159+
bucketsLedByServer.add(tableBucket);
1160+
} else {
1161+
replicasFollowedByServer.add(replica);
1162+
}
1163+
}
1164+
1165+
tableBucketStateMachine.handleStateChange(
1166+
bucketsLedByServer, OnlineBucket, CONTROLLED_SHUTDOWN_ELECTION);
1167+
1168+
coordinatorRequestBatch.newBatch();
1169+
replicasFollowedByServer.forEach(
1170+
replica ->
1171+
coordinatorRequestBatch.addStopReplicaRequestForTabletServers(
1172+
Collections.singleton(tabletServerId),
1173+
replica.getTableBucket(),
1174+
false,
1175+
coordinatorContext.getBucketLeaderEpoch(replica.getTableBucket())));
1176+
coordinatorRequestBatch.sendRequestToTabletServers(
1177+
coordinatorContext.getCoordinatorEpoch());
1178+
1179+
// If the tabletServer is a follower, updates the isr in ZK and notifies the current leader.
1180+
replicaStateMachine.handleStateChanges(replicasFollowedByServer, OfflineReplica);
1181+
1182+
response.addAllRemainingLeaderBuckets(
1183+
coordinatorContext.getBucketsWithLeaderIn(tabletServerId).stream()
1184+
.map(ServerRpcMessageUtils::fromTableBucket)
1185+
.collect(Collectors.toList()));
1186+
return response;
1187+
}
1188+
11051189
private void validateFencedEvent(FencedCoordinatorEvent event) {
11061190
TableBucket tb = event.getTableBucket();
11071191
if (coordinatorContext.getTablePathById(tb.getTableId()) == null) {

0 commit comments

Comments
 (0)