Skip to content

Commit 2368e6a

Browse files
committed
[server] Support AddServerTag and RemoveServerTag
1 parent 8b604e4 commit 2368e6a

File tree

8 files changed

+339
-6
lines changed

8 files changed

+339
-6
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.alibaba.fluss.rpc.gateway.AdminGateway;
4444
import com.alibaba.fluss.rpc.gateway.AdminReadOnlyGateway;
4545
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
46+
import com.alibaba.fluss.rpc.messages.AddServerTagRequest;
4647
import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
4748
import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
4849
import com.alibaba.fluss.rpc.messages.CreateTableRequest;
@@ -67,6 +68,7 @@
6768
import com.alibaba.fluss.rpc.messages.PbListOffsetsRespForBucket;
6869
import com.alibaba.fluss.rpc.messages.PbPartitionSpec;
6970
import com.alibaba.fluss.rpc.messages.PbTablePath;
71+
import com.alibaba.fluss.rpc.messages.RemoveServerTagRequest;
7072
import com.alibaba.fluss.rpc.messages.TableExistsRequest;
7173
import com.alibaba.fluss.rpc.messages.TableExistsResponse;
7274
import com.alibaba.fluss.rpc.protocol.ApiError;
@@ -471,13 +473,17 @@ public DropAclsResult dropAcls(Collection<AclBindingFilter> filters) {
471473

472474
@Override
473475
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
474-
throw new UnsupportedOperationException("Support soon");
476+
AddServerTagRequest request = new AddServerTagRequest().setServerTag(serverTag.value);
477+
tabletServers.forEach(request::addServerId);
478+
return gateway.addServerTag(request).thenApply(r -> null);
475479
}
476480

477481
@Override
478482
public CompletableFuture<Void> removeServerTag(
479483
List<Integer> tabletServers, ServerTag serverTag) {
480-
throw new UnsupportedOperationException("Support soon");
484+
RemoveServerTagRequest request = new RemoveServerTagRequest().setServerTag(serverTag.value);
485+
tabletServers.forEach(request::addServerId);
486+
return gateway.removeServerTag(request).thenApply(r -> null);
481487
}
482488

483489
@Override

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alibaba.fluss.client.table.Table;
2525
import com.alibaba.fluss.client.table.writer.UpsertWriter;
2626
import com.alibaba.fluss.cluster.ServerNode;
27+
import com.alibaba.fluss.cluster.maintencance.ServerTag;
2728
import com.alibaba.fluss.config.AutoPartitionTimeUnit;
2829
import com.alibaba.fluss.config.ConfigOptions;
2930
import com.alibaba.fluss.config.Configuration;
@@ -39,6 +40,9 @@
3940
import com.alibaba.fluss.exception.PartitionAlreadyExistsException;
4041
import com.alibaba.fluss.exception.PartitionNotExistException;
4142
import com.alibaba.fluss.exception.SchemaNotExistException;
43+
import com.alibaba.fluss.exception.ServerNotExistException;
44+
import com.alibaba.fluss.exception.ServerTagAlreadyExistException;
45+
import com.alibaba.fluss.exception.ServerTagNotExistException;
4246
import com.alibaba.fluss.exception.TableNotExistException;
4347
import com.alibaba.fluss.exception.TableNotPartitionedException;
4448
import com.alibaba.fluss.exception.TooManyBucketsException;
@@ -60,6 +64,7 @@
6064
import com.alibaba.fluss.metadata.TablePath;
6165
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
6266
import com.alibaba.fluss.server.kv.snapshot.KvSnapshotHandle;
67+
import com.alibaba.fluss.server.zk.ZooKeeperClient;
6368
import com.alibaba.fluss.types.DataTypes;
6469

6570
import org.junit.jupiter.api.BeforeEach;
@@ -1050,4 +1055,63 @@ public void testSystemsColumns() throws Exception {
10501055
+ "Please use other names for these columns. "
10511056
+ "The reserved system columns are: __offset, __timestamp, __bucket");
10521057
}
1058+
1059+
@Test
1060+
public void testAddAndRemoveServerTags() throws Exception {
1061+
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
1062+
// 1.add server tag to a none exists server.
1063+
assertThatThrownBy(
1064+
() ->
1065+
admin.addServerTag(
1066+
Collections.singletonList(100),
1067+
ServerTag.PERMANENT_OFFLINE)
1068+
.get())
1069+
.cause()
1070+
.isInstanceOf(ServerNotExistException.class)
1071+
.hasMessageContaining("Server 100 not exists when trying to add server tag.");
1072+
1073+
// 2.add server tag for server 0,1.
1074+
admin.addServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1075+
// TODO use api to get serverTags instead of getting from zk directly
1076+
assertThat(zkClient.getServerTags()).isPresent();
1077+
assertThat(zkClient.getServerTags().get().getServerTags())
1078+
.containsEntry(0, ServerTag.PERMANENT_OFFLINE)
1079+
.containsEntry(1, ServerTag.PERMANENT_OFFLINE);
1080+
1081+
// 3.add server tag for server 0,2. error will be thrown and tag for 2 will not be added.
1082+
assertThatThrownBy(
1083+
() ->
1084+
admin.addServerTag(Arrays.asList(0, 2), ServerTag.PERMANENT_OFFLINE)
1085+
.get())
1086+
.cause()
1087+
.isInstanceOf(ServerTagAlreadyExistException.class)
1088+
.hasMessageContaining("Server tag PERMANENT_OFFLINE already exists for server 0.");
1089+
1090+
// 4.remove server tag for server 100
1091+
assertThatThrownBy(
1092+
() ->
1093+
admin.removeServerTag(
1094+
Collections.singletonList(100),
1095+
ServerTag.PERMANENT_OFFLINE)
1096+
.get())
1097+
.cause()
1098+
.isInstanceOf(ServerNotExistException.class)
1099+
.hasMessageContaining("Server 100 not exists when trying to removing server tag.");
1100+
1101+
// 5.remove server tag for server 0,1.
1102+
admin.removeServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1103+
assertThat(zkClient.getServerTags()).isPresent();
1104+
assertThat(zkClient.getServerTags().get().getServerTags()).isEmpty();
1105+
1106+
// 6.remove server tag for server 2. error will be thrown and tag for 2 will not be removed.
1107+
assertThatThrownBy(
1108+
() ->
1109+
admin.removeServerTag(
1110+
Collections.singletonList(0),
1111+
ServerTag.PERMANENT_OFFLINE)
1112+
.get())
1113+
.cause()
1114+
.isInstanceOf(ServerTagNotExistException.class)
1115+
.hasMessageContaining("Server tag PERMANENT_OFFLINE not exists for server 0.");
1116+
}
10531117
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import com.alibaba.fluss.exception.SecurityTokenException;
6666
import com.alibaba.fluss.exception.ServerNotExistException;
6767
import com.alibaba.fluss.exception.ServerTagAlreadyExistException;
68+
import com.alibaba.fluss.exception.ServerTagNotExistException;
6869
import com.alibaba.fluss.exception.StorageException;
6970
import com.alibaba.fluss.exception.TableAlreadyExistException;
7071
import com.alibaba.fluss.exception.TableNotExistException;
@@ -222,8 +223,7 @@ public enum Errors {
222223
SERVER_NOT_EXIST_EXCEPTION(54, "The server is not exist.", ServerNotExistException::new),
223224
SEVER_TAG_ALREADY_EXIST_EXCEPTION(
224225
55, "The server tag already exist.", ServerTagAlreadyExistException::new),
225-
SEVER_TAG_NOT_EXIST_EXCEPTION(
226-
56, "The server tag not exist.", ServerTagAlreadyExistException::new),
226+
SEVER_TAG_NOT_EXIST_EXCEPTION(56, "The server tag not exist.", ServerTagNotExistException::new),
227227
REBALANCE_FAILURE_EXCEPTION(57, "The rebalance task failure.", RebalanceFailureException::new),
228228
NO_REBALANCE_IN_PROGRESS_EXCEPTION(
229229
58, "No rebalance task in progress.", NoRebalanceInProgressException::new);

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alibaba.fluss.server.coordinator;
1919

2020
import com.alibaba.fluss.annotation.VisibleForTesting;
21+
import com.alibaba.fluss.cluster.maintencance.ServerTag;
2122
import com.alibaba.fluss.metadata.PhysicalTablePath;
2223
import com.alibaba.fluss.metadata.TableBucket;
2324
import com.alibaba.fluss.metadata.TableBucketReplica;
@@ -101,6 +102,9 @@ public class CoordinatorContext {
101102
*/
102103
private final Map<Integer, Set<TableBucket>> replicasOnOffline = new HashMap<>();
103104

105+
/** A mapping from tabletServers to server tag. */
106+
private final Map<Integer, ServerTag> serverTags = new HashMap<>();
107+
104108
private ServerInfo coordinatorServerInfo = null;
105109
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
106110

@@ -616,6 +620,26 @@ public void removePartition(TablePartition tablePartition) {
616620
}
617621
}
618622

623+
public void initSeverTags(Map<Integer, ServerTag> initialServerTags) {
624+
serverTags.putAll(initialServerTags);
625+
}
626+
627+
public void putServerTag(int serverId, ServerTag serverTag) {
628+
serverTags.put(serverId, serverTag);
629+
}
630+
631+
public Map<Integer, ServerTag> getServerTags() {
632+
return new HashMap<>(serverTags);
633+
}
634+
635+
public Optional<ServerTag> getServerTag(int serverId) {
636+
return Optional.ofNullable(serverTags.get(serverId));
637+
}
638+
639+
public void removeServerTag(int serverId) {
640+
serverTags.remove(serverId);
641+
}
642+
619643
private void clearTablesState() {
620644
tableAssignments.clear();
621645
partitionAssignments.clear();
@@ -636,5 +660,6 @@ public void resetContext() {
636660
clearTablesState();
637661
// clear the live tablet servers
638662
liveTabletServers.clear();
663+
serverTags.clear();
639664
}
640665
}

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@
2121
import com.alibaba.fluss.cluster.Endpoint;
2222
import com.alibaba.fluss.cluster.ServerNode;
2323
import com.alibaba.fluss.cluster.ServerType;
24+
import com.alibaba.fluss.cluster.maintencance.ServerTag;
2425
import com.alibaba.fluss.config.ConfigOptions;
2526
import com.alibaba.fluss.config.Configuration;
2627
import com.alibaba.fluss.exception.FencedLeaderEpochException;
2728
import com.alibaba.fluss.exception.FlussRuntimeException;
2829
import com.alibaba.fluss.exception.InvalidCoordinatorException;
2930
import com.alibaba.fluss.exception.InvalidUpdateVersionException;
31+
import com.alibaba.fluss.exception.ServerNotExistException;
32+
import com.alibaba.fluss.exception.ServerTagAlreadyExistException;
33+
import com.alibaba.fluss.exception.ServerTagNotExistException;
34+
import com.alibaba.fluss.exception.UnknownServerException;
3035
import com.alibaba.fluss.exception.UnknownTableOrBucketException;
3136
import com.alibaba.fluss.metadata.PhysicalTablePath;
3237
import com.alibaba.fluss.metadata.TableBucket;
@@ -35,13 +40,16 @@
3540
import com.alibaba.fluss.metadata.TablePartition;
3641
import com.alibaba.fluss.metadata.TablePath;
3742
import com.alibaba.fluss.metrics.MetricNames;
43+
import com.alibaba.fluss.rpc.messages.AddServerTagResponse;
3844
import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
3945
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
4046
import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
4147
import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestResponse;
4248
import com.alibaba.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
49+
import com.alibaba.fluss.rpc.messages.RemoveServerTagResponse;
4350
import com.alibaba.fluss.rpc.protocol.ApiError;
4451
import com.alibaba.fluss.server.coordinator.event.AccessContextEvent;
52+
import com.alibaba.fluss.server.coordinator.event.AddServerTagEvent;
4553
import com.alibaba.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
4654
import com.alibaba.fluss.server.coordinator.event.CommitKvSnapshotEvent;
4755
import com.alibaba.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
@@ -58,6 +66,7 @@
5866
import com.alibaba.fluss.server.coordinator.event.FencedCoordinatorEvent;
5967
import com.alibaba.fluss.server.coordinator.event.NewTabletServerEvent;
6068
import com.alibaba.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
69+
import com.alibaba.fluss.server.coordinator.event.RemoveServerTagEvent;
6170
import com.alibaba.fluss.server.coordinator.event.watcher.TableChangeWatcher;
6271
import com.alibaba.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
6372
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaState;
@@ -79,6 +88,7 @@
7988
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
8089
import com.alibaba.fluss.server.zk.data.PartitionAssignment;
8190
import com.alibaba.fluss.server.zk.data.RemoteLogManifestHandle;
91+
import com.alibaba.fluss.server.zk.data.ServerTags;
8292
import com.alibaba.fluss.server.zk.data.TableAssignment;
8393
import com.alibaba.fluss.server.zk.data.TabletServerRegistration;
8494
import com.alibaba.fluss.server.zk.data.ZkData.PartitionIdsZNode;
@@ -319,6 +329,11 @@ private void initCoordinatorContext() throws Exception {
319329
// init tablet server channels
320330
coordinatorChannelManager.startup(internalServerNodes);
321331

332+
// load server tags.
333+
zooKeeperClient
334+
.getServerTags()
335+
.ifPresent(tags -> coordinatorContext.initSeverTags(tags.getServerTags()));
336+
322337
// load all tables
323338
List<TableInfo> autoPartitionTables = new ArrayList<>();
324339
List<Tuple2<TableInfo, Long>> lakeTables = new ArrayList<>();
@@ -493,6 +508,16 @@ public void process(CoordinatorEvent event) {
493508
completeFromCallable(
494509
commitLakeTableSnapshotEvent.getRespCallback(),
495510
() -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
511+
} else if (event instanceof AddServerTagEvent) {
512+
AddServerTagEvent addServerTagEvent = (AddServerTagEvent) event;
513+
completeFromCallable(
514+
addServerTagEvent.getRespCallback(),
515+
() -> processAddServerTag(addServerTagEvent));
516+
} else if (event instanceof RemoveServerTagEvent) {
517+
RemoveServerTagEvent removeServerTagEvent = (RemoveServerTagEvent) event;
518+
completeFromCallable(
519+
removeServerTagEvent.getRespCallback(),
520+
() -> processRemoveServerTag(removeServerTagEvent));
496521
} else if (event instanceof AccessContextEvent) {
497522
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
498523
processAccessContext(accessContextEvent);
@@ -884,6 +909,90 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent
884909
updateTabletServerMetadataCache(serverInfos, null, null, bucketsWithOfflineLeader);
885910
}
886911

912+
private AddServerTagResponse processAddServerTag(AddServerTagEvent event) {
913+
AddServerTagResponse addServerTagResponse = new AddServerTagResponse();
914+
List<Integer> serverIds = event.getServerIds();
915+
ServerTag serverTag = event.getServerTag();
916+
917+
// Verify that dose serverTag exist for input serverIds. If any of them exists, throw
918+
// an error and none of them will be written to coordinatorContext and zk.
919+
Map<Integer, ServerInfo> liveTabletServers = coordinatorContext.getLiveTabletServers();
920+
for (Integer serverId : serverIds) {
921+
if (!liveTabletServers.containsKey(serverId)) {
922+
throw new ServerNotExistException(
923+
String.format(
924+
"Server %s not exists when trying to add server tag.", serverId));
925+
}
926+
927+
if (coordinatorContext.getServerTag(serverId).isPresent()) {
928+
throw new ServerTagAlreadyExistException(
929+
String.format(
930+
"Server tag %s already exists for server %s.",
931+
serverTag, serverId));
932+
}
933+
}
934+
935+
// First register to zk, and then update coordinatorContext.
936+
Map<Integer, ServerTag> serverTags = coordinatorContext.getServerTags();
937+
for (Integer serverId : serverIds) {
938+
serverTags.put(serverId, serverTag);
939+
}
940+
941+
try {
942+
zooKeeperClient.registerServerTags(new ServerTags(serverTags));
943+
} catch (Exception e) {
944+
LOG.error("Error when register server tags to zookeeper.", e);
945+
throw new UnknownServerException("Error when register server tags to zookeeper.", e);
946+
}
947+
948+
// Then update coordinatorContext.
949+
serverIds.forEach(serverId -> coordinatorContext.putServerTag(serverId, serverTag));
950+
951+
return addServerTagResponse;
952+
}
953+
954+
private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent event) {
955+
RemoveServerTagResponse removeServerTagResponse = new RemoveServerTagResponse();
956+
List<Integer> serverIds = event.getServerIds();
957+
ServerTag serverTag = event.getServerTag();
958+
959+
// Verify that dose serverTag not exist for input serverIds. If any of them not exists,
960+
// throw an error and none of them will be removed form coordinatorContext and zk.
961+
Map<Integer, ServerInfo> liveTabletServers = coordinatorContext.getLiveTabletServers();
962+
for (Integer serverId : serverIds) {
963+
if (!liveTabletServers.containsKey(serverId)) {
964+
throw new ServerNotExistException(
965+
String.format(
966+
"Server %s not exists when trying to removing server tag.",
967+
serverId));
968+
}
969+
970+
if (!coordinatorContext.getServerTag(serverId).isPresent()) {
971+
throw new ServerTagNotExistException(
972+
String.format(
973+
"Server tag %s not exists for server %s.", serverTag, serverId));
974+
}
975+
}
976+
977+
// First register to zk, and then update coordinatorContext.
978+
Map<Integer, ServerTag> serverTags = coordinatorContext.getServerTags();
979+
for (Integer serverId : serverIds) {
980+
serverTags.remove(serverId);
981+
}
982+
983+
try {
984+
zooKeeperClient.registerServerTags(new ServerTags(serverTags));
985+
} catch (Exception e) {
986+
LOG.error("Error when register server tags to zookeeper.", e);
987+
throw new UnknownServerException("Error when register server tags to zookeeper.", e);
988+
}
989+
990+
// Then update coordinatorContext.
991+
serverIds.forEach(coordinatorContext::removeServerTag);
992+
993+
return removeServerTagResponse;
994+
}
995+
887996
private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
888997
Map<TableBucket, LeaderAndIsr> leaderAndIsrList) {
889998
// TODO verify leader epoch.

0 commit comments

Comments
 (0)