Skip to content

Commit 5255b80

Browse files
committed
[server] Support AddServerTag and RemoveServerTag
1 parent 749689f commit 5255b80

File tree

8 files changed

+342
-4
lines changed

8 files changed

+342
-4
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.fluss.rpc.gateway.AdminGateway;
4949
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
5050
import org.apache.fluss.rpc.gateway.TabletServerGateway;
51+
import org.apache.fluss.rpc.messages.AddServerTagRequest;
5152
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
5253
import org.apache.fluss.rpc.messages.AlterTableRequest;
5354
import org.apache.fluss.rpc.messages.CreateAclsRequest;
@@ -76,6 +77,7 @@
7677
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7778
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7879
import org.apache.fluss.rpc.messages.PbTablePath;
80+
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
7981
import org.apache.fluss.rpc.messages.TableExistsRequest;
8082
import org.apache.fluss.rpc.messages.TableExistsResponse;
8183
import org.apache.fluss.rpc.protocol.ApiError;
@@ -537,13 +539,17 @@ public CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> confi
537539

538540
@Override
539541
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
540-
throw new UnsupportedOperationException("Support soon");
542+
AddServerTagRequest request = new AddServerTagRequest().setServerTag(serverTag.value);
543+
tabletServers.forEach(request::addServerId);
544+
return gateway.addServerTag(request).thenApply(r -> null);
541545
}
542546

543547
@Override
544548
public CompletableFuture<Void> removeServerTag(
545549
List<Integer> tabletServers, ServerTag serverTag) {
546-
throw new UnsupportedOperationException("Support soon");
550+
RemoveServerTagRequest request = new RemoveServerTagRequest().setServerTag(serverTag.value);
551+
tabletServers.forEach(request::addServerId);
552+
return gateway.removeServerTag(request).thenApply(r -> null);
547553
}
548554

549555
@Override

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.client.table.Table;
2525
import org.apache.fluss.client.table.writer.UpsertWriter;
2626
import org.apache.fluss.cluster.ServerNode;
27+
import org.apache.fluss.cluster.rebalance.ServerTag;
2728
import org.apache.fluss.config.AutoPartitionTimeUnit;
2829
import org.apache.fluss.config.ConfigOptions;
2930
import org.apache.fluss.config.Configuration;
@@ -42,6 +43,9 @@
4243
import org.apache.fluss.exception.PartitionAlreadyExistsException;
4344
import org.apache.fluss.exception.PartitionNotExistException;
4445
import org.apache.fluss.exception.SchemaNotExistException;
46+
import org.apache.fluss.exception.ServerNotExistException;
47+
import org.apache.fluss.exception.ServerTagAlreadyExistException;
48+
import org.apache.fluss.exception.ServerTagNotExistException;
4549
import org.apache.fluss.exception.TableNotExistException;
4650
import org.apache.fluss.exception.TableNotPartitionedException;
4751
import org.apache.fluss.exception.TooManyBucketsException;
@@ -64,6 +68,7 @@
6468
import org.apache.fluss.metadata.TablePath;
6569
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
6670
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
71+
import org.apache.fluss.server.zk.ZooKeeperClient;
6772
import org.apache.fluss.types.DataTypes;
6873

6974
import org.junit.jupiter.api.BeforeEach;
@@ -1436,4 +1441,63 @@ public void testSystemsColumns() throws Exception {
14361441
+ "Please use other names for these columns. "
14371442
+ "The reserved system columns are: __offset, __timestamp, __bucket");
14381443
}
1444+
1445+
@Test
1446+
public void testAddAndRemoveServerTags() throws Exception {
1447+
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
1448+
// 1.add server tag to a none exists server.
1449+
assertThatThrownBy(
1450+
() ->
1451+
admin.addServerTag(
1452+
Collections.singletonList(100),
1453+
ServerTag.PERMANENT_OFFLINE)
1454+
.get())
1455+
.cause()
1456+
.isInstanceOf(ServerNotExistException.class)
1457+
.hasMessageContaining("Server 100 not exists when trying to add server tag.");
1458+
1459+
// 2.add server tag for server 0,1.
1460+
admin.addServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1461+
// TODO use api to get serverTags instead of getting from zk directly
1462+
assertThat(zkClient.getServerTags()).isPresent();
1463+
assertThat(zkClient.getServerTags().get().getServerTags())
1464+
.containsEntry(0, ServerTag.PERMANENT_OFFLINE)
1465+
.containsEntry(1, ServerTag.PERMANENT_OFFLINE);
1466+
1467+
// 3.add server tag for server 0,2. error will be thrown and tag for 2 will not be added.
1468+
assertThatThrownBy(
1469+
() ->
1470+
admin.addServerTag(Arrays.asList(0, 2), ServerTag.PERMANENT_OFFLINE)
1471+
.get())
1472+
.cause()
1473+
.isInstanceOf(ServerTagAlreadyExistException.class)
1474+
.hasMessageContaining("Server tag PERMANENT_OFFLINE already exists for server 0.");
1475+
1476+
// 4.remove server tag for server 100
1477+
assertThatThrownBy(
1478+
() ->
1479+
admin.removeServerTag(
1480+
Collections.singletonList(100),
1481+
ServerTag.PERMANENT_OFFLINE)
1482+
.get())
1483+
.cause()
1484+
.isInstanceOf(ServerNotExistException.class)
1485+
.hasMessageContaining("Server 100 not exists when trying to removing server tag.");
1486+
1487+
// 5.remove server tag for server 0,1.
1488+
admin.removeServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1489+
assertThat(zkClient.getServerTags()).isPresent();
1490+
assertThat(zkClient.getServerTags().get().getServerTags()).isEmpty();
1491+
1492+
// 6.remove server tag for server 2. error will be thrown and tag for 2 will not be removed.
1493+
assertThatThrownBy(
1494+
() ->
1495+
admin.removeServerTag(
1496+
Collections.singletonList(0),
1497+
ServerTag.PERMANENT_OFFLINE)
1498+
.get())
1499+
.cause()
1500+
.isInstanceOf(ServerTagNotExistException.class)
1501+
.hasMessageContaining("Server tag PERMANENT_OFFLINE not exists for server 0.");
1502+
}
14391503
}

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@
2828
*/
2929
@PublicEvolving
3030
public enum ServerTag {
31+
/**
32+
* The tabletServer is permanently offline. Such as the host where the tabletServer on is
33+
* upcoming decommissioning.
34+
*/
3135
PERMANENT_OFFLINE(0),
36+
37+
/** The tabletServer is temporarily offline. Such as the tabletServer is upcoming upgrading. */
3238
TEMPORARY_OFFLINE(1);
3339

3440
public final int value;

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.server.coordinator;
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
21+
import org.apache.fluss.cluster.rebalance.ServerTag;
2122
import org.apache.fluss.metadata.PhysicalTablePath;
2223
import org.apache.fluss.metadata.TableBucket;
2324
import org.apache.fluss.metadata.TableBucketReplica;
@@ -102,6 +103,9 @@ public class CoordinatorContext {
102103
*/
103104
private final Map<Integer, Set<TableBucket>> replicasOnOffline = new HashMap<>();
104105

106+
/** A mapping from tabletServers to server tag. */
107+
private final Map<Integer, ServerTag> serverTags = new HashMap<>();
108+
105109
private ServerInfo coordinatorServerInfo = null;
106110
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
107111

@@ -635,6 +639,26 @@ public void removePartition(TablePartition tablePartition) {
635639
}
636640
}
637641

642+
public void initSeverTags(Map<Integer, ServerTag> initialServerTags) {
643+
serverTags.putAll(initialServerTags);
644+
}
645+
646+
public void putServerTag(int serverId, ServerTag serverTag) {
647+
serverTags.put(serverId, serverTag);
648+
}
649+
650+
public Map<Integer, ServerTag> getServerTags() {
651+
return new HashMap<>(serverTags);
652+
}
653+
654+
public Optional<ServerTag> getServerTag(int serverId) {
655+
return Optional.ofNullable(serverTags.get(serverId));
656+
}
657+
658+
public void removeServerTag(int serverId) {
659+
serverTags.remove(serverId);
660+
}
661+
638662
private void clearTablesState() {
639663
tableAssignments.clear();
640664
partitionAssignments.clear();
@@ -656,6 +680,7 @@ public void resetContext() {
656680
// clear the live tablet servers
657681
liveTabletServers.clear();
658682
shuttingDownTabletServers.clear();
683+
serverTags.clear();
659684
}
660685

661686
public int getTotalPartitionCount() {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,20 @@
2121
import org.apache.fluss.cluster.Endpoint;
2222
import org.apache.fluss.cluster.ServerNode;
2323
import org.apache.fluss.cluster.ServerType;
24+
import org.apache.fluss.cluster.rebalance.ServerTag;
2425
import org.apache.fluss.config.ConfigOptions;
2526
import org.apache.fluss.config.Configuration;
2627
import org.apache.fluss.exception.FencedLeaderEpochException;
2728
import org.apache.fluss.exception.FlussRuntimeException;
2829
import org.apache.fluss.exception.IneligibleReplicaException;
2930
import org.apache.fluss.exception.InvalidCoordinatorException;
3031
import org.apache.fluss.exception.InvalidUpdateVersionException;
32+
import org.apache.fluss.exception.ServerNotExistException;
33+
import org.apache.fluss.exception.ServerTagAlreadyExistException;
34+
import org.apache.fluss.exception.ServerTagNotExistException;
3135
import org.apache.fluss.exception.TableNotExistException;
3236
import org.apache.fluss.exception.TabletServerNotAvailableException;
37+
import org.apache.fluss.exception.UnknownServerException;
3338
import org.apache.fluss.exception.UnknownTableOrBucketException;
3439
import org.apache.fluss.metadata.PhysicalTablePath;
3540
import org.apache.fluss.metadata.SchemaInfo;
@@ -38,14 +43,17 @@
3843
import org.apache.fluss.metadata.TableInfo;
3944
import org.apache.fluss.metadata.TablePartition;
4045
import org.apache.fluss.metadata.TablePath;
46+
import org.apache.fluss.rpc.messages.AddServerTagResponse;
4147
import org.apache.fluss.rpc.messages.AdjustIsrResponse;
4248
import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse;
4349
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
4450
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
4551
import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
4652
import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
53+
import org.apache.fluss.rpc.messages.RemoveServerTagResponse;
4754
import org.apache.fluss.rpc.protocol.ApiError;
4855
import org.apache.fluss.server.coordinator.event.AccessContextEvent;
56+
import org.apache.fluss.server.coordinator.event.AddServerTagEvent;
4957
import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
5058
import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
5159
import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
@@ -65,6 +73,7 @@
6573
import org.apache.fluss.server.coordinator.event.NotifyKvSnapshotOffsetEvent;
6674
import org.apache.fluss.server.coordinator.event.NotifyLakeTableOffsetEvent;
6775
import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
76+
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
6877
import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
6978
import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher;
7079
import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
@@ -86,6 +95,7 @@
8695
import org.apache.fluss.server.zk.data.LeaderAndIsr;
8796
import org.apache.fluss.server.zk.data.PartitionAssignment;
8897
import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
98+
import org.apache.fluss.server.zk.data.ServerTags;
8999
import org.apache.fluss.server.zk.data.TableAssignment;
90100
import org.apache.fluss.server.zk.data.TabletServerRegistration;
91101
import org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode;
@@ -326,6 +336,11 @@ private void initCoordinatorContext() throws Exception {
326336
// init tablet server channels
327337
coordinatorChannelManager.startup(internalServerNodes);
328338

339+
// load server tags.
340+
zooKeeperClient
341+
.getServerTags()
342+
.ifPresent(tags -> coordinatorContext.initSeverTags(tags.getServerTags()));
343+
329344
// load all tables
330345
long start4loadTables = System.currentTimeMillis();
331346
List<TableInfo> autoPartitionTables = new ArrayList<>();
@@ -553,6 +568,16 @@ public void process(CoordinatorEvent event) {
553568
completeFromCallable(
554569
controlledShutdownEvent.getRespCallback(),
555570
() -> tryProcessControlledShutdown(controlledShutdownEvent));
571+
} else if (event instanceof AddServerTagEvent) {
572+
AddServerTagEvent addServerTagEvent = (AddServerTagEvent) event;
573+
completeFromCallable(
574+
addServerTagEvent.getRespCallback(),
575+
() -> processAddServerTag(addServerTagEvent));
576+
} else if (event instanceof RemoveServerTagEvent) {
577+
RemoveServerTagEvent removeServerTagEvent = (RemoveServerTagEvent) event;
578+
completeFromCallable(
579+
removeServerTagEvent.getRespCallback(),
580+
() -> processRemoveServerTag(removeServerTagEvent));
556581
} else if (event instanceof AccessContextEvent) {
557582
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
558583
processAccessContext(accessContextEvent);
@@ -973,6 +998,90 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent
973998
updateTabletServerMetadataCache(serverInfos, null, null, bucketsWithOfflineLeader);
974999
}
9751000

1001+
private AddServerTagResponse processAddServerTag(AddServerTagEvent event) {
1002+
AddServerTagResponse addServerTagResponse = new AddServerTagResponse();
1003+
List<Integer> serverIds = event.getServerIds();
1004+
ServerTag serverTag = event.getServerTag();
1005+
1006+
// Verify that dose serverTag exist for input serverIds. If any of them exists, throw
1007+
// an error and none of them will be written to coordinatorContext and zk.
1008+
Map<Integer, ServerInfo> liveTabletServers = coordinatorContext.getLiveTabletServers();
1009+
for (Integer serverId : serverIds) {
1010+
if (!liveTabletServers.containsKey(serverId)) {
1011+
throw new ServerNotExistException(
1012+
String.format(
1013+
"Server %s not exists when trying to add server tag.", serverId));
1014+
}
1015+
1016+
if (coordinatorContext.getServerTag(serverId).isPresent()) {
1017+
throw new ServerTagAlreadyExistException(
1018+
String.format(
1019+
"Server tag %s already exists for server %s.",
1020+
serverTag, serverId));
1021+
}
1022+
}
1023+
1024+
// First register to zk, and then update coordinatorContext.
1025+
Map<Integer, ServerTag> serverTags = coordinatorContext.getServerTags();
1026+
for (Integer serverId : serverIds) {
1027+
serverTags.put(serverId, serverTag);
1028+
}
1029+
1030+
try {
1031+
zooKeeperClient.registerServerTags(new ServerTags(serverTags));
1032+
} catch (Exception e) {
1033+
LOG.error("Error when register server tags to zookeeper.", e);
1034+
throw new UnknownServerException("Error when register server tags to zookeeper.", e);
1035+
}
1036+
1037+
// Then update coordinatorContext.
1038+
serverIds.forEach(serverId -> coordinatorContext.putServerTag(serverId, serverTag));
1039+
1040+
return addServerTagResponse;
1041+
}
1042+
1043+
private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent event) {
1044+
RemoveServerTagResponse removeServerTagResponse = new RemoveServerTagResponse();
1045+
List<Integer> serverIds = event.getServerIds();
1046+
ServerTag serverTag = event.getServerTag();
1047+
1048+
// Verify that dose serverTag not exist for input serverIds. If any of them not exists,
1049+
// throw an error and none of them will be removed form coordinatorContext and zk.
1050+
Map<Integer, ServerInfo> liveTabletServers = coordinatorContext.getLiveTabletServers();
1051+
for (Integer serverId : serverIds) {
1052+
if (!liveTabletServers.containsKey(serverId)) {
1053+
throw new ServerNotExistException(
1054+
String.format(
1055+
"Server %s not exists when trying to removing server tag.",
1056+
serverId));
1057+
}
1058+
1059+
if (!coordinatorContext.getServerTag(serverId).isPresent()) {
1060+
throw new ServerTagNotExistException(
1061+
String.format(
1062+
"Server tag %s not exists for server %s.", serverTag, serverId));
1063+
}
1064+
}
1065+
1066+
// First register to zk, and then update coordinatorContext.
1067+
Map<Integer, ServerTag> serverTags = coordinatorContext.getServerTags();
1068+
for (Integer serverId : serverIds) {
1069+
serverTags.remove(serverId);
1070+
}
1071+
1072+
try {
1073+
zooKeeperClient.registerServerTags(new ServerTags(serverTags));
1074+
} catch (Exception e) {
1075+
LOG.error("Error when register server tags to zookeeper.", e);
1076+
throw new UnknownServerException("Error when register server tags to zookeeper.", e);
1077+
}
1078+
1079+
// Then update coordinatorContext.
1080+
serverIds.forEach(coordinatorContext::removeServerTag);
1081+
1082+
return removeServerTagResponse;
1083+
}
1084+
9761085
private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
9771086
Map<TableBucket, LeaderAndIsr> leaderAndIsrList) {
9781087
// TODO verify leader epoch.

0 commit comments

Comments
 (0)