Skip to content

Commit 170a35e

Browse files
committed
[server] Support AddServerTag and RemoveServerTag
1 parent 8ef3119 commit 170a35e

File tree

9 files changed

+344
-6
lines changed

9 files changed

+344
-6
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.fluss.rpc.gateway.AdminGateway;
4949
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
5050
import org.apache.fluss.rpc.gateway.TabletServerGateway;
51+
import org.apache.fluss.rpc.messages.AddServerTagRequest;
5152
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
5253
import org.apache.fluss.rpc.messages.AlterTableRequest;
5354
import org.apache.fluss.rpc.messages.CreateAclsRequest;
@@ -76,6 +77,7 @@
7677
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7778
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7879
import org.apache.fluss.rpc.messages.PbTablePath;
80+
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
7981
import org.apache.fluss.rpc.messages.TableExistsRequest;
8082
import org.apache.fluss.rpc.messages.TableExistsResponse;
8183
import org.apache.fluss.rpc.protocol.ApiError;
@@ -541,13 +543,17 @@ public CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> confi
541543

542544
@Override
543545
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
544-
throw new UnsupportedOperationException("Support soon");
546+
AddServerTagRequest request = new AddServerTagRequest().setServerTag(serverTag.value);
547+
tabletServers.forEach(request::addServerId);
548+
return gateway.addServerTag(request).thenApply(r -> null);
545549
}
546550

547551
@Override
548552
public CompletableFuture<Void> removeServerTag(
549553
List<Integer> tabletServers, ServerTag serverTag) {
550-
throw new UnsupportedOperationException("Support soon");
554+
RemoveServerTagRequest request = new RemoveServerTagRequest().setServerTag(serverTag.value);
555+
tabletServers.forEach(request::addServerId);
556+
return gateway.removeServerTag(request).thenApply(r -> null);
551557
}
552558

553559
@Override

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.client.table.Table;
2525
import org.apache.fluss.client.table.writer.UpsertWriter;
2626
import org.apache.fluss.cluster.ServerNode;
27+
import org.apache.fluss.cluster.rebalance.ServerTag;
2728
import org.apache.fluss.config.AutoPartitionTimeUnit;
2829
import org.apache.fluss.config.ConfigOptions;
2930
import org.apache.fluss.config.Configuration;
@@ -42,6 +43,9 @@
4243
import org.apache.fluss.exception.PartitionAlreadyExistsException;
4344
import org.apache.fluss.exception.PartitionNotExistException;
4445
import org.apache.fluss.exception.SchemaNotExistException;
46+
import org.apache.fluss.exception.ServerNotExistException;
47+
import org.apache.fluss.exception.ServerTagAlreadyExistException;
48+
import org.apache.fluss.exception.ServerTagNotExistException;
4549
import org.apache.fluss.exception.TableNotExistException;
4650
import org.apache.fluss.exception.TableNotPartitionedException;
4751
import org.apache.fluss.exception.TooManyBucketsException;
@@ -64,6 +68,7 @@
6468
import org.apache.fluss.metadata.TablePath;
6569
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
6670
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
71+
import org.apache.fluss.server.zk.ZooKeeperClient;
6772
import org.apache.fluss.types.DataTypes;
6873

6974
import org.junit.jupiter.api.BeforeEach;
@@ -1282,4 +1287,63 @@ public void testSystemsColumns() throws Exception {
12821287
+ "Please use other names for these columns. "
12831288
+ "The reserved system columns are: __offset, __timestamp, __bucket");
12841289
}
1290+
1291+
@Test
1292+
public void testAddAndRemoveServerTags() throws Exception {
1293+
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
1294+
// 1.add server tag to a none exists server.
1295+
assertThatThrownBy(
1296+
() ->
1297+
admin.addServerTag(
1298+
Collections.singletonList(100),
1299+
ServerTag.PERMANENT_OFFLINE)
1300+
.get())
1301+
.cause()
1302+
.isInstanceOf(ServerNotExistException.class)
1303+
.hasMessageContaining("Server 100 not exists when trying to add server tag.");
1304+
1305+
// 2.add server tag for server 0,1.
1306+
admin.addServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1307+
// TODO use api to get serverTags instead of getting from zk directly
1308+
assertThat(zkClient.getServerTags()).isPresent();
1309+
assertThat(zkClient.getServerTags().get().getServerTags())
1310+
.containsEntry(0, ServerTag.PERMANENT_OFFLINE)
1311+
.containsEntry(1, ServerTag.PERMANENT_OFFLINE);
1312+
1313+
// 3.add server tag for server 0,2. error will be thrown and tag for 2 will not be added.
1314+
assertThatThrownBy(
1315+
() ->
1316+
admin.addServerTag(Arrays.asList(0, 2), ServerTag.PERMANENT_OFFLINE)
1317+
.get())
1318+
.cause()
1319+
.isInstanceOf(ServerTagAlreadyExistException.class)
1320+
.hasMessageContaining("Server tag PERMANENT_OFFLINE already exists for server 0.");
1321+
1322+
// 4.remove server tag for server 100
1323+
assertThatThrownBy(
1324+
() ->
1325+
admin.removeServerTag(
1326+
Collections.singletonList(100),
1327+
ServerTag.PERMANENT_OFFLINE)
1328+
.get())
1329+
.cause()
1330+
.isInstanceOf(ServerNotExistException.class)
1331+
.hasMessageContaining("Server 100 not exists when trying to removing server tag.");
1332+
1333+
// 5.remove server tag for server 0,1.
1334+
admin.removeServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1335+
assertThat(zkClient.getServerTags()).isPresent();
1336+
assertThat(zkClient.getServerTags().get().getServerTags()).isEmpty();
1337+
1338+
// 6.remove server tag for server 2. error will be thrown and tag for 2 will not be removed.
1339+
assertThatThrownBy(
1340+
() ->
1341+
admin.removeServerTag(
1342+
Collections.singletonList(0),
1343+
ServerTag.PERMANENT_OFFLINE)
1344+
.get())
1345+
.cause()
1346+
.isInstanceOf(ServerTagNotExistException.class)
1347+
.hasMessageContaining("Server tag PERMANENT_OFFLINE not exists for server 0.");
1348+
}
12851349
}

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@
2828
*/
2929
@PublicEvolving
3030
public enum ServerTag {
31+
/**
32+
* The tabletServer is permanently offline. Such as the host where the tabletServer on is
33+
* upcoming decommissioning.
34+
*/
3135
PERMANENT_OFFLINE(0),
36+
37+
/** The tabletServer is temporarily offline. Such as the tabletServer is upcoming upgrading. */
3238
TEMPORARY_OFFLINE(1);
3339

3440
public final int value;

fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.apache.fluss.exception.SecurityTokenException;
7070
import org.apache.fluss.exception.ServerNotExistException;
7171
import org.apache.fluss.exception.ServerTagAlreadyExistException;
72+
import org.apache.fluss.exception.ServerTagNotExistException;
7273
import org.apache.fluss.exception.StorageException;
7374
import org.apache.fluss.exception.TableAlreadyExistException;
7475
import org.apache.fluss.exception.TableNotExistException;
@@ -236,8 +237,7 @@ public enum Errors {
236237
SERVER_NOT_EXIST_EXCEPTION(58, "The server is not exist.", ServerNotExistException::new),
237238
SEVER_TAG_ALREADY_EXIST_EXCEPTION(
238239
59, "The server tag already exist.", ServerTagAlreadyExistException::new),
239-
SEVER_TAG_NOT_EXIST_EXCEPTION(
240-
60, "The server tag not exist.", ServerTagAlreadyExistException::new),
240+
SEVER_TAG_NOT_EXIST_EXCEPTION(60, "The server tag not exist.", ServerTagNotExistException::new),
241241
REBALANCE_FAILURE_EXCEPTION(61, "The rebalance task failure.", RebalanceFailureException::new),
242242
NO_REBALANCE_IN_PROGRESS_EXCEPTION(
243243
62, "No rebalance task in progress.", NoRebalanceInProgressException::new);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.server.coordinator;
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
21+
import org.apache.fluss.cluster.rebalance.ServerTag;
2122
import org.apache.fluss.metadata.PhysicalTablePath;
2223
import org.apache.fluss.metadata.TableBucket;
2324
import org.apache.fluss.metadata.TableBucketReplica;
@@ -102,6 +103,9 @@ public class CoordinatorContext {
102103
*/
103104
private final Map<Integer, Set<TableBucket>> replicasOnOffline = new HashMap<>();
104105

106+
/** A mapping from tabletServers to server tag. */
107+
private final Map<Integer, ServerTag> serverTags = new HashMap<>();
108+
105109
private ServerInfo coordinatorServerInfo = null;
106110
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
107111

@@ -635,6 +639,26 @@ public void removePartition(TablePartition tablePartition) {
635639
}
636640
}
637641

642+
public void initSeverTags(Map<Integer, ServerTag> initialServerTags) {
643+
serverTags.putAll(initialServerTags);
644+
}
645+
646+
public void putServerTag(int serverId, ServerTag serverTag) {
647+
serverTags.put(serverId, serverTag);
648+
}
649+
650+
public Map<Integer, ServerTag> getServerTags() {
651+
return new HashMap<>(serverTags);
652+
}
653+
654+
public Optional<ServerTag> getServerTag(int serverId) {
655+
return Optional.ofNullable(serverTags.get(serverId));
656+
}
657+
658+
public void removeServerTag(int serverId) {
659+
serverTags.remove(serverId);
660+
}
661+
638662
private void clearTablesState() {
639663
tableAssignments.clear();
640664
partitionAssignments.clear();
@@ -656,6 +680,7 @@ public void resetContext() {
656680
// clear the live tablet servers
657681
liveTabletServers.clear();
658682
shuttingDownTabletServers.clear();
683+
serverTags.clear();
659684
}
660685

661686
public int getTotalPartitionCount() {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,37 @@
2121
import org.apache.fluss.cluster.Endpoint;
2222
import org.apache.fluss.cluster.ServerNode;
2323
import org.apache.fluss.cluster.ServerType;
24+
import org.apache.fluss.cluster.rebalance.ServerTag;
2425
import org.apache.fluss.config.ConfigOptions;
2526
import org.apache.fluss.config.Configuration;
2627
import org.apache.fluss.exception.FencedLeaderEpochException;
2728
import org.apache.fluss.exception.FlussRuntimeException;
2829
import org.apache.fluss.exception.IneligibleReplicaException;
2930
import org.apache.fluss.exception.InvalidCoordinatorException;
3031
import org.apache.fluss.exception.InvalidUpdateVersionException;
32+
import org.apache.fluss.exception.ServerNotExistException;
33+
import org.apache.fluss.exception.ServerTagAlreadyExistException;
34+
import org.apache.fluss.exception.ServerTagNotExistException;
3135
import org.apache.fluss.exception.TabletServerNotAvailableException;
36+
import org.apache.fluss.exception.UnknownServerException;
3237
import org.apache.fluss.exception.UnknownTableOrBucketException;
3338
import org.apache.fluss.metadata.PhysicalTablePath;
3439
import org.apache.fluss.metadata.TableBucket;
3540
import org.apache.fluss.metadata.TableBucketReplica;
3641
import org.apache.fluss.metadata.TableInfo;
3742
import org.apache.fluss.metadata.TablePartition;
3843
import org.apache.fluss.metadata.TablePath;
44+
import org.apache.fluss.rpc.messages.AddServerTagResponse;
3945
import org.apache.fluss.rpc.messages.AdjustIsrResponse;
4046
import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse;
4147
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
4248
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
4349
import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
4450
import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
51+
import org.apache.fluss.rpc.messages.RemoveServerTagResponse;
4552
import org.apache.fluss.rpc.protocol.ApiError;
4653
import org.apache.fluss.server.coordinator.event.AccessContextEvent;
54+
import org.apache.fluss.server.coordinator.event.AddServerTagEvent;
4755
import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
4856
import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
4957
import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
@@ -62,6 +70,7 @@
6270
import org.apache.fluss.server.coordinator.event.NewTabletServerEvent;
6371
import org.apache.fluss.server.coordinator.event.NotifyKvSnapshotOffsetEvent;
6472
import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
73+
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
6574
import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher;
6675
import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
6776
import org.apache.fluss.server.coordinator.statemachine.ReplicaStateMachine;
@@ -83,6 +92,7 @@
8392
import org.apache.fluss.server.zk.data.LeaderAndIsr;
8493
import org.apache.fluss.server.zk.data.PartitionAssignment;
8594
import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
95+
import org.apache.fluss.server.zk.data.ServerTags;
8696
import org.apache.fluss.server.zk.data.TableAssignment;
8797
import org.apache.fluss.server.zk.data.TabletServerRegistration;
8898
import org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode;
@@ -318,6 +328,11 @@ private void initCoordinatorContext() throws Exception {
318328
// init tablet server channels
319329
coordinatorChannelManager.startup(internalServerNodes);
320330

331+
// load server tags.
332+
zooKeeperClient
333+
.getServerTags()
334+
.ifPresent(tags -> coordinatorContext.initSeverTags(tags.getServerTags()));
335+
321336
// load all tables
322337
long start4loadTables = System.currentTimeMillis();
323338
List<TableInfo> autoPartitionTables = new ArrayList<>();
@@ -541,6 +556,16 @@ public void process(CoordinatorEvent event) {
541556
completeFromCallable(
542557
controlledShutdownEvent.getRespCallback(),
543558
() -> tryProcessControlledShutdown(controlledShutdownEvent));
559+
} else if (event instanceof AddServerTagEvent) {
560+
AddServerTagEvent addServerTagEvent = (AddServerTagEvent) event;
561+
completeFromCallable(
562+
addServerTagEvent.getRespCallback(),
563+
() -> processAddServerTag(addServerTagEvent));
564+
} else if (event instanceof RemoveServerTagEvent) {
565+
RemoveServerTagEvent removeServerTagEvent = (RemoveServerTagEvent) event;
566+
completeFromCallable(
567+
removeServerTagEvent.getRespCallback(),
568+
() -> processRemoveServerTag(removeServerTagEvent));
544569
} else if (event instanceof AccessContextEvent) {
545570
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
546571
processAccessContext(accessContextEvent);
@@ -923,6 +948,90 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent
923948
updateTabletServerMetadataCache(serverInfos, null, null, bucketsWithOfflineLeader);
924949
}
925950

951+
private AddServerTagResponse processAddServerTag(AddServerTagEvent event) {
952+
AddServerTagResponse addServerTagResponse = new AddServerTagResponse();
953+
List<Integer> serverIds = event.getServerIds();
954+
ServerTag serverTag = event.getServerTag();
955+
956+
// Verify that dose serverTag exist for input serverIds. If any of them exists, throw
957+
// an error and none of them will be written to coordinatorContext and zk.
958+
Map<Integer, ServerInfo> liveTabletServers = coordinatorContext.getLiveTabletServers();
959+
for (Integer serverId : serverIds) {
960+
if (!liveTabletServers.containsKey(serverId)) {
961+
throw new ServerNotExistException(
962+
String.format(
963+
"Server %s not exists when trying to add server tag.", serverId));
964+
}
965+
966+
if (coordinatorContext.getServerTag(serverId).isPresent()) {
967+
throw new ServerTagAlreadyExistException(
968+
String.format(
969+
"Server tag %s already exists for server %s.",
970+
serverTag, serverId));
971+
}
972+
}
973+
974+
// First register to zk, and then update coordinatorContext.
975+
Map<Integer, ServerTag> serverTags = coordinatorContext.getServerTags();
976+
for (Integer serverId : serverIds) {
977+
serverTags.put(serverId, serverTag);
978+
}
979+
980+
try {
981+
zooKeeperClient.registerServerTags(new ServerTags(serverTags));
982+
} catch (Exception e) {
983+
LOG.error("Error when register server tags to zookeeper.", e);
984+
throw new UnknownServerException("Error when register server tags to zookeeper.", e);
985+
}
986+
987+
// Then update coordinatorContext.
988+
serverIds.forEach(serverId -> coordinatorContext.putServerTag(serverId, serverTag));
989+
990+
return addServerTagResponse;
991+
}
992+
993+
private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent event) {
994+
RemoveServerTagResponse removeServerTagResponse = new RemoveServerTagResponse();
995+
List<Integer> serverIds = event.getServerIds();
996+
ServerTag serverTag = event.getServerTag();
997+
998+
// Verify that dose serverTag not exist for input serverIds. If any of them not exists,
999+
// throw an error and none of them will be removed form coordinatorContext and zk.
1000+
Map<Integer, ServerInfo> liveTabletServers = coordinatorContext.getLiveTabletServers();
1001+
for (Integer serverId : serverIds) {
1002+
if (!liveTabletServers.containsKey(serverId)) {
1003+
throw new ServerNotExistException(
1004+
String.format(
1005+
"Server %s not exists when trying to removing server tag.",
1006+
serverId));
1007+
}
1008+
1009+
if (!coordinatorContext.getServerTag(serverId).isPresent()) {
1010+
throw new ServerTagNotExistException(
1011+
String.format(
1012+
"Server tag %s not exists for server %s.", serverTag, serverId));
1013+
}
1014+
}
1015+
1016+
// First register to zk, and then update coordinatorContext.
1017+
Map<Integer, ServerTag> serverTags = coordinatorContext.getServerTags();
1018+
for (Integer serverId : serverIds) {
1019+
serverTags.remove(serverId);
1020+
}
1021+
1022+
try {
1023+
zooKeeperClient.registerServerTags(new ServerTags(serverTags));
1024+
} catch (Exception e) {
1025+
LOG.error("Error when register server tags to zookeeper.", e);
1026+
throw new UnknownServerException("Error when register server tags to zookeeper.", e);
1027+
}
1028+
1029+
// Then update coordinatorContext.
1030+
serverIds.forEach(coordinatorContext::removeServerTag);
1031+
1032+
return removeServerTagResponse;
1033+
}
1034+
9261035
private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
9271036
Map<TableBucket, LeaderAndIsr> leaderAndIsrList) {
9281037
// TODO verify leader epoch.

0 commit comments

Comments
 (0)