Skip to content

Commit 98b0dc4

Browse files
committed
[server] Support AddServerTag and RemoveServerTag
1 parent b81415d commit 98b0dc4

File tree

8 files changed

+339
-6
lines changed

8 files changed

+339
-6
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.fluss.rpc.gateway.AdminGateway;
4545
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4646
import org.apache.fluss.rpc.gateway.TabletServerGateway;
47+
import org.apache.fluss.rpc.messages.AddServerTagRequest;
4748
import org.apache.fluss.rpc.messages.CreateAclsRequest;
4849
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
4950
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -68,6 +69,7 @@
6869
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
6970
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7071
import org.apache.fluss.rpc.messages.PbTablePath;
72+
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
7173
import org.apache.fluss.rpc.messages.TableExistsRequest;
7274
import org.apache.fluss.rpc.messages.TableExistsResponse;
7375
import org.apache.fluss.rpc.protocol.ApiError;
@@ -470,13 +472,17 @@ public DropAclsResult dropAcls(Collection<AclBindingFilter> filters) {
470472

471473
@Override
472474
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
473-
throw new UnsupportedOperationException("Support soon");
475+
AddServerTagRequest request = new AddServerTagRequest().setServerTag(serverTag.value);
476+
tabletServers.forEach(request::addServerId);
477+
return gateway.addServerTag(request).thenApply(r -> null);
474478
}
475479

476480
@Override
477481
public CompletableFuture<Void> removeServerTag(
478482
List<Integer> tabletServers, ServerTag serverTag) {
479-
throw new UnsupportedOperationException("Support soon");
483+
RemoveServerTagRequest request = new RemoveServerTagRequest().setServerTag(serverTag.value);
484+
tabletServers.forEach(request::addServerId);
485+
return gateway.removeServerTag(request).thenApply(r -> null);
480486
}
481487

482488
@Override

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.client.table.Table;
2525
import org.apache.fluss.client.table.writer.UpsertWriter;
2626
import org.apache.fluss.cluster.ServerNode;
27+
import org.apache.fluss.cluster.maintencance.ServerTag;
2728
import org.apache.fluss.config.AutoPartitionTimeUnit;
2829
import org.apache.fluss.config.ConfigOptions;
2930
import org.apache.fluss.config.Configuration;
@@ -39,6 +40,9 @@
3940
import org.apache.fluss.exception.PartitionAlreadyExistsException;
4041
import org.apache.fluss.exception.PartitionNotExistException;
4142
import org.apache.fluss.exception.SchemaNotExistException;
43+
import org.apache.fluss.exception.ServerNotExistException;
44+
import org.apache.fluss.exception.ServerTagAlreadyExistException;
45+
import org.apache.fluss.exception.ServerTagNotExistException;
4246
import org.apache.fluss.exception.TableNotExistException;
4347
import org.apache.fluss.exception.TableNotPartitionedException;
4448
import org.apache.fluss.exception.TooManyBucketsException;
@@ -60,6 +64,7 @@
6064
import org.apache.fluss.metadata.TablePath;
6165
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
6266
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
67+
import org.apache.fluss.server.zk.ZooKeeperClient;
6368
import org.apache.fluss.types.DataTypes;
6469

6570
import org.junit.jupiter.api.BeforeEach;
@@ -1050,4 +1055,63 @@ public void testSystemsColumns() throws Exception {
10501055
+ "Please use other names for these columns. "
10511056
+ "The reserved system columns are: __offset, __timestamp, __bucket");
10521057
}
1058+
1059+
@Test
1060+
public void testAddAndRemoveServerTags() throws Exception {
1061+
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
1062+
// 1.add server tag to a none exists server.
1063+
assertThatThrownBy(
1064+
() ->
1065+
admin.addServerTag(
1066+
Collections.singletonList(100),
1067+
ServerTag.PERMANENT_OFFLINE)
1068+
.get())
1069+
.cause()
1070+
.isInstanceOf(ServerNotExistException.class)
1071+
.hasMessageContaining("Server 100 not exists when trying to add server tag.");
1072+
1073+
// 2.add server tag for server 0,1.
1074+
admin.addServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1075+
// TODO use api to get serverTags instead of getting from zk directly
1076+
assertThat(zkClient.getServerTags()).isPresent();
1077+
assertThat(zkClient.getServerTags().get().getServerTags())
1078+
.containsEntry(0, ServerTag.PERMANENT_OFFLINE)
1079+
.containsEntry(1, ServerTag.PERMANENT_OFFLINE);
1080+
1081+
// 3.add server tag for server 0,2. error will be thrown and tag for 2 will not be added.
1082+
assertThatThrownBy(
1083+
() ->
1084+
admin.addServerTag(Arrays.asList(0, 2), ServerTag.PERMANENT_OFFLINE)
1085+
.get())
1086+
.cause()
1087+
.isInstanceOf(ServerTagAlreadyExistException.class)
1088+
.hasMessageContaining("Server tag PERMANENT_OFFLINE already exists for server 0.");
1089+
1090+
// 4.remove server tag for server 100
1091+
assertThatThrownBy(
1092+
() ->
1093+
admin.removeServerTag(
1094+
Collections.singletonList(100),
1095+
ServerTag.PERMANENT_OFFLINE)
1096+
.get())
1097+
.cause()
1098+
.isInstanceOf(ServerNotExistException.class)
1099+
.hasMessageContaining("Server 100 not exists when trying to removing server tag.");
1100+
1101+
// 5.remove server tag for server 0,1.
1102+
admin.removeServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1103+
assertThat(zkClient.getServerTags()).isPresent();
1104+
assertThat(zkClient.getServerTags().get().getServerTags()).isEmpty();
1105+
1106+
// 6.remove server tag for server 2. error will be thrown and tag for 2 will not be removed.
1107+
assertThatThrownBy(
1108+
() ->
1109+
admin.removeServerTag(
1110+
Collections.singletonList(0),
1111+
ServerTag.PERMANENT_OFFLINE)
1112+
.get())
1113+
.cause()
1114+
.isInstanceOf(ServerTagNotExistException.class)
1115+
.hasMessageContaining("Server tag PERMANENT_OFFLINE not exists for server 0.");
1116+
}
10531117
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.fluss.exception.SecurityTokenException;
6666
import org.apache.fluss.exception.ServerNotExistException;
6767
import org.apache.fluss.exception.ServerTagAlreadyExistException;
68+
import org.apache.fluss.exception.ServerTagNotExistException;
6869
import org.apache.fluss.exception.StorageException;
6970
import org.apache.fluss.exception.TableAlreadyExistException;
7071
import org.apache.fluss.exception.TableNotExistException;
@@ -222,8 +223,7 @@ public enum Errors {
222223
SERVER_NOT_EXIST_EXCEPTION(54, "The server is not exist.", ServerNotExistException::new),
223224
SEVER_TAG_ALREADY_EXIST_EXCEPTION(
224225
55, "The server tag already exist.", ServerTagAlreadyExistException::new),
225-
SEVER_TAG_NOT_EXIST_EXCEPTION(
226-
56, "The server tag not exist.", ServerTagAlreadyExistException::new),
226+
SEVER_TAG_NOT_EXIST_EXCEPTION(56, "The server tag not exist.", ServerTagNotExistException::new),
227227
REBALANCE_FAILURE_EXCEPTION(57, "The rebalance task failure.", RebalanceFailureException::new),
228228
NO_REBALANCE_IN_PROGRESS_EXCEPTION(
229229
58, "No rebalance task in progress.", NoRebalanceInProgressException::new);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.server.coordinator;
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
21+
import org.apache.fluss.cluster.maintencance.ServerTag;
2122
import org.apache.fluss.metadata.PhysicalTablePath;
2223
import org.apache.fluss.metadata.TableBucket;
2324
import org.apache.fluss.metadata.TableBucketReplica;
@@ -101,6 +102,9 @@ public class CoordinatorContext {
101102
*/
102103
private final Map<Integer, Set<TableBucket>> replicasOnOffline = new HashMap<>();
103104

105+
/** A mapping from tabletServers to server tag. */
106+
private final Map<Integer, ServerTag> serverTags = new HashMap<>();
107+
104108
private ServerInfo coordinatorServerInfo = null;
105109
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
106110

@@ -616,6 +620,26 @@ public void removePartition(TablePartition tablePartition) {
616620
}
617621
}
618622

623+
public void initSeverTags(Map<Integer, ServerTag> initialServerTags) {
624+
serverTags.putAll(initialServerTags);
625+
}
626+
627+
public void putServerTag(int serverId, ServerTag serverTag) {
628+
serverTags.put(serverId, serverTag);
629+
}
630+
631+
public Map<Integer, ServerTag> getServerTags() {
632+
return new HashMap<>(serverTags);
633+
}
634+
635+
public Optional<ServerTag> getServerTag(int serverId) {
636+
return Optional.ofNullable(serverTags.get(serverId));
637+
}
638+
639+
public void removeServerTag(int serverId) {
640+
serverTags.remove(serverId);
641+
}
642+
619643
private void clearTablesState() {
620644
tableAssignments.clear();
621645
partitionAssignments.clear();
@@ -636,5 +660,6 @@ public void resetContext() {
636660
clearTablesState();
637661
// clear the live tablet servers
638662
liveTabletServers.clear();
663+
serverTags.clear();
639664
}
640665
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,34 @@
2121
import org.apache.fluss.cluster.Endpoint;
2222
import org.apache.fluss.cluster.ServerNode;
2323
import org.apache.fluss.cluster.ServerType;
24+
import org.apache.fluss.cluster.maintencance.ServerTag;
2425
import org.apache.fluss.config.ConfigOptions;
2526
import org.apache.fluss.config.Configuration;
2627
import org.apache.fluss.exception.FencedLeaderEpochException;
2728
import org.apache.fluss.exception.FlussRuntimeException;
2829
import org.apache.fluss.exception.InvalidCoordinatorException;
2930
import org.apache.fluss.exception.InvalidUpdateVersionException;
31+
import org.apache.fluss.exception.ServerNotExistException;
32+
import org.apache.fluss.exception.ServerTagAlreadyExistException;
33+
import org.apache.fluss.exception.ServerTagNotExistException;
34+
import org.apache.fluss.exception.UnknownServerException;
3035
import org.apache.fluss.exception.UnknownTableOrBucketException;
3136
import org.apache.fluss.metadata.PhysicalTablePath;
3237
import org.apache.fluss.metadata.TableBucket;
3338
import org.apache.fluss.metadata.TableBucketReplica;
3439
import org.apache.fluss.metadata.TableInfo;
3540
import org.apache.fluss.metadata.TablePartition;
3641
import org.apache.fluss.metadata.TablePath;
42+
import org.apache.fluss.rpc.messages.AddServerTagResponse;
3743
import org.apache.fluss.rpc.messages.AdjustIsrResponse;
3844
import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse;
3945
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
4046
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
4147
import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
48+
import org.apache.fluss.rpc.messages.RemoveServerTagResponse;
4249
import org.apache.fluss.rpc.protocol.ApiError;
4350
import org.apache.fluss.server.coordinator.event.AccessContextEvent;
51+
import org.apache.fluss.server.coordinator.event.AddServerTagEvent;
4452
import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
4553
import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
4654
import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
@@ -57,6 +65,7 @@
5765
import org.apache.fluss.server.coordinator.event.FencedCoordinatorEvent;
5866
import org.apache.fluss.server.coordinator.event.NewTabletServerEvent;
5967
import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
68+
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
6069
import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher;
6170
import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
6271
import org.apache.fluss.server.coordinator.statemachine.ReplicaStateMachine;
@@ -77,6 +86,7 @@
7786
import org.apache.fluss.server.zk.data.LeaderAndIsr;
7887
import org.apache.fluss.server.zk.data.PartitionAssignment;
7988
import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
89+
import org.apache.fluss.server.zk.data.ServerTags;
8090
import org.apache.fluss.server.zk.data.TableAssignment;
8191
import org.apache.fluss.server.zk.data.TabletServerRegistration;
8292
import org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode;
@@ -298,6 +308,11 @@ private void initCoordinatorContext() throws Exception {
298308
// init tablet server channels
299309
coordinatorChannelManager.startup(internalServerNodes);
300310

311+
// load server tags.
312+
zooKeeperClient
313+
.getServerTags()
314+
.ifPresent(tags -> coordinatorContext.initSeverTags(tags.getServerTags()));
315+
301316
// load all tables
302317
List<TableInfo> autoPartitionTables = new ArrayList<>();
303318
List<Tuple2<TableInfo, Long>> lakeTables = new ArrayList<>();
@@ -470,6 +485,16 @@ public void process(CoordinatorEvent event) {
470485
completeFromCallable(
471486
commitLakeTableSnapshotEvent.getRespCallback(),
472487
() -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
488+
} else if (event instanceof AddServerTagEvent) {
489+
AddServerTagEvent addServerTagEvent = (AddServerTagEvent) event;
490+
completeFromCallable(
491+
addServerTagEvent.getRespCallback(),
492+
() -> processAddServerTag(addServerTagEvent));
493+
} else if (event instanceof RemoveServerTagEvent) {
494+
RemoveServerTagEvent removeServerTagEvent = (RemoveServerTagEvent) event;
495+
completeFromCallable(
496+
removeServerTagEvent.getRespCallback(),
497+
() -> processRemoveServerTag(removeServerTagEvent));
473498
} else if (event instanceof AccessContextEvent) {
474499
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
475500
processAccessContext(accessContextEvent);
@@ -827,6 +852,90 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent
827852
updateTabletServerMetadataCache(serverInfos, null, null, bucketsWithOfflineLeader);
828853
}
829854

855+
private AddServerTagResponse processAddServerTag(AddServerTagEvent event) {
856+
AddServerTagResponse addServerTagResponse = new AddServerTagResponse();
857+
List<Integer> serverIds = event.getServerIds();
858+
ServerTag serverTag = event.getServerTag();
859+
860+
// Verify that dose serverTag exist for input serverIds. If any of them exists, throw
861+
// an error and none of them will be written to coordinatorContext and zk.
862+
Map<Integer, ServerInfo> liveTabletServers = coordinatorContext.getLiveTabletServers();
863+
for (Integer serverId : serverIds) {
864+
if (!liveTabletServers.containsKey(serverId)) {
865+
throw new ServerNotExistException(
866+
String.format(
867+
"Server %s not exists when trying to add server tag.", serverId));
868+
}
869+
870+
if (coordinatorContext.getServerTag(serverId).isPresent()) {
871+
throw new ServerTagAlreadyExistException(
872+
String.format(
873+
"Server tag %s already exists for server %s.",
874+
serverTag, serverId));
875+
}
876+
}
877+
878+
// First register to zk, and then update coordinatorContext.
879+
Map<Integer, ServerTag> serverTags = coordinatorContext.getServerTags();
880+
for (Integer serverId : serverIds) {
881+
serverTags.put(serverId, serverTag);
882+
}
883+
884+
try {
885+
zooKeeperClient.registerServerTags(new ServerTags(serverTags));
886+
} catch (Exception e) {
887+
LOG.error("Error when register server tags to zookeeper.", e);
888+
throw new UnknownServerException("Error when register server tags to zookeeper.", e);
889+
}
890+
891+
// Then update coordinatorContext.
892+
serverIds.forEach(serverId -> coordinatorContext.putServerTag(serverId, serverTag));
893+
894+
return addServerTagResponse;
895+
}
896+
897+
private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent event) {
898+
RemoveServerTagResponse removeServerTagResponse = new RemoveServerTagResponse();
899+
List<Integer> serverIds = event.getServerIds();
900+
ServerTag serverTag = event.getServerTag();
901+
902+
// Verify that dose serverTag not exist for input serverIds. If any of them not exists,
903+
// throw an error and none of them will be removed form coordinatorContext and zk.
904+
Map<Integer, ServerInfo> liveTabletServers = coordinatorContext.getLiveTabletServers();
905+
for (Integer serverId : serverIds) {
906+
if (!liveTabletServers.containsKey(serverId)) {
907+
throw new ServerNotExistException(
908+
String.format(
909+
"Server %s not exists when trying to removing server tag.",
910+
serverId));
911+
}
912+
913+
if (!coordinatorContext.getServerTag(serverId).isPresent()) {
914+
throw new ServerTagNotExistException(
915+
String.format(
916+
"Server tag %s not exists for server %s.", serverTag, serverId));
917+
}
918+
}
919+
920+
// First register to zk, and then update coordinatorContext.
921+
Map<Integer, ServerTag> serverTags = coordinatorContext.getServerTags();
922+
for (Integer serverId : serverIds) {
923+
serverTags.remove(serverId);
924+
}
925+
926+
try {
927+
zooKeeperClient.registerServerTags(new ServerTags(serverTags));
928+
} catch (Exception e) {
929+
LOG.error("Error when register server tags to zookeeper.", e);
930+
throw new UnknownServerException("Error when register server tags to zookeeper.", e);
931+
}
932+
933+
// Then update coordinatorContext.
934+
serverIds.forEach(coordinatorContext::removeServerTag);
935+
936+
return removeServerTagResponse;
937+
}
938+
830939
private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
831940
Map<TableBucket, LeaderAndIsr> leaderAndIsrList) {
832941
// TODO verify leader epoch.

0 commit comments

Comments
 (0)