|
21 | 21 | import org.apache.fluss.cluster.Endpoint; |
22 | 22 | import org.apache.fluss.cluster.ServerNode; |
23 | 23 | import org.apache.fluss.cluster.ServerType; |
| 24 | +import org.apache.fluss.cluster.rebalance.ServerTag; |
24 | 25 | import org.apache.fluss.config.ConfigOptions; |
25 | 26 | import org.apache.fluss.config.Configuration; |
26 | 27 | import org.apache.fluss.exception.FencedLeaderEpochException; |
27 | 28 | import org.apache.fluss.exception.FlussRuntimeException; |
28 | 29 | import org.apache.fluss.exception.IneligibleReplicaException; |
29 | 30 | import org.apache.fluss.exception.InvalidCoordinatorException; |
30 | 31 | import org.apache.fluss.exception.InvalidUpdateVersionException; |
| 32 | +import org.apache.fluss.exception.ServerNotExistException; |
| 33 | +import org.apache.fluss.exception.ServerTagAlreadyExistException; |
| 34 | +import org.apache.fluss.exception.ServerTagNotExistException; |
31 | 35 | import org.apache.fluss.exception.TabletServerNotAvailableException; |
| 36 | +import org.apache.fluss.exception.UnknownServerException; |
32 | 37 | import org.apache.fluss.exception.UnknownTableOrBucketException; |
33 | 38 | import org.apache.fluss.metadata.PhysicalTablePath; |
34 | 39 | import org.apache.fluss.metadata.TableBucket; |
35 | 40 | import org.apache.fluss.metadata.TableBucketReplica; |
36 | 41 | import org.apache.fluss.metadata.TableInfo; |
37 | 42 | import org.apache.fluss.metadata.TablePartition; |
38 | 43 | import org.apache.fluss.metadata.TablePath; |
| 44 | +import org.apache.fluss.rpc.messages.AddServerTagResponse; |
39 | 45 | import org.apache.fluss.rpc.messages.AdjustIsrResponse; |
40 | 46 | import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse; |
41 | 47 | import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse; |
42 | 48 | import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse; |
43 | 49 | import org.apache.fluss.rpc.messages.ControlledShutdownResponse; |
44 | 50 | import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable; |
| 51 | +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; |
45 | 52 | import org.apache.fluss.rpc.protocol.ApiError; |
46 | 53 | import org.apache.fluss.server.coordinator.event.AccessContextEvent; |
| 54 | +import org.apache.fluss.server.coordinator.event.AddServerTagEvent; |
47 | 55 | import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; |
48 | 56 | import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; |
49 | 57 | import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent; |
|
62 | 70 | import org.apache.fluss.server.coordinator.event.NewTabletServerEvent; |
63 | 71 | import org.apache.fluss.server.coordinator.event.NotifyKvSnapshotOffsetEvent; |
64 | 72 | import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent; |
| 73 | +import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; |
65 | 74 | import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher; |
66 | 75 | import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher; |
67 | 76 | import org.apache.fluss.server.coordinator.statemachine.ReplicaStateMachine; |
|
83 | 92 | import org.apache.fluss.server.zk.data.LeaderAndIsr; |
84 | 93 | import org.apache.fluss.server.zk.data.PartitionAssignment; |
85 | 94 | import org.apache.fluss.server.zk.data.RemoteLogManifestHandle; |
| 95 | +import org.apache.fluss.server.zk.data.ServerTags; |
86 | 96 | import org.apache.fluss.server.zk.data.TableAssignment; |
87 | 97 | import org.apache.fluss.server.zk.data.TabletServerRegistration; |
88 | 98 | import org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode; |
@@ -318,6 +328,11 @@ private void initCoordinatorContext() throws Exception { |
318 | 328 | // init tablet server channels |
319 | 329 | coordinatorChannelManager.startup(internalServerNodes); |
320 | 330 |
|
| 331 | + // load server tags. |
| 332 | + zooKeeperClient |
| 333 | + .getServerTags() |
| 334 | + .ifPresent(tags -> coordinatorContext.initSeverTags(tags.getServerTags())); |
| 335 | + |
321 | 336 | // load all tables |
322 | 337 | long start4loadTables = System.currentTimeMillis(); |
323 | 338 | List<TableInfo> autoPartitionTables = new ArrayList<>(); |
@@ -541,6 +556,16 @@ public void process(CoordinatorEvent event) { |
541 | 556 | completeFromCallable( |
542 | 557 | controlledShutdownEvent.getRespCallback(), |
543 | 558 | () -> tryProcessControlledShutdown(controlledShutdownEvent)); |
| 559 | + } else if (event instanceof AddServerTagEvent) { |
| 560 | + AddServerTagEvent addServerTagEvent = (AddServerTagEvent) event; |
| 561 | + completeFromCallable( |
| 562 | + addServerTagEvent.getRespCallback(), |
| 563 | + () -> processAddServerTag(addServerTagEvent)); |
| 564 | + } else if (event instanceof RemoveServerTagEvent) { |
| 565 | + RemoveServerTagEvent removeServerTagEvent = (RemoveServerTagEvent) event; |
| 566 | + completeFromCallable( |
| 567 | + removeServerTagEvent.getRespCallback(), |
| 568 | + () -> processRemoveServerTag(removeServerTagEvent)); |
544 | 569 | } else if (event instanceof AccessContextEvent) { |
545 | 570 | AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event; |
546 | 571 | processAccessContext(accessContextEvent); |
@@ -923,6 +948,90 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent |
923 | 948 | updateTabletServerMetadataCache(serverInfos, null, null, bucketsWithOfflineLeader); |
924 | 949 | } |
925 | 950 |
|
| 951 | + private AddServerTagResponse processAddServerTag(AddServerTagEvent event) { |
| 952 | + AddServerTagResponse addServerTagResponse = new AddServerTagResponse(); |
| 953 | + List<Integer> serverIds = event.getServerIds(); |
| 954 | + ServerTag serverTag = event.getServerTag(); |
| 955 | + |
| 956 | + // Verify that dose serverTag exist for input serverIds. If any of them exists, throw |
| 957 | + // an error and none of them will be written to coordinatorContext and zk. |
| 958 | + Map<Integer, ServerInfo> liveTabletServers = coordinatorContext.getLiveTabletServers(); |
| 959 | + for (Integer serverId : serverIds) { |
| 960 | + if (!liveTabletServers.containsKey(serverId)) { |
| 961 | + throw new ServerNotExistException( |
| 962 | + String.format( |
| 963 | + "Server %s not exists when trying to add server tag.", serverId)); |
| 964 | + } |
| 965 | + |
| 966 | + if (coordinatorContext.getServerTag(serverId).isPresent()) { |
| 967 | + throw new ServerTagAlreadyExistException( |
| 968 | + String.format( |
| 969 | + "Server tag %s already exists for server %s.", |
| 970 | + serverTag, serverId)); |
| 971 | + } |
| 972 | + } |
| 973 | + |
| 974 | + // First register to zk, and then update coordinatorContext. |
| 975 | + Map<Integer, ServerTag> serverTags = coordinatorContext.getServerTags(); |
| 976 | + for (Integer serverId : serverIds) { |
| 977 | + serverTags.put(serverId, serverTag); |
| 978 | + } |
| 979 | + |
| 980 | + try { |
| 981 | + zooKeeperClient.registerServerTags(new ServerTags(serverTags)); |
| 982 | + } catch (Exception e) { |
| 983 | + LOG.error("Error when register server tags to zookeeper.", e); |
| 984 | + throw new UnknownServerException("Error when register server tags to zookeeper.", e); |
| 985 | + } |
| 986 | + |
| 987 | + // Then update coordinatorContext. |
| 988 | + serverIds.forEach(serverId -> coordinatorContext.putServerTag(serverId, serverTag)); |
| 989 | + |
| 990 | + return addServerTagResponse; |
| 991 | + } |
| 992 | + |
| 993 | + private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent event) { |
| 994 | + RemoveServerTagResponse removeServerTagResponse = new RemoveServerTagResponse(); |
| 995 | + List<Integer> serverIds = event.getServerIds(); |
| 996 | + ServerTag serverTag = event.getServerTag(); |
| 997 | + |
| 998 | + // Verify that dose serverTag not exist for input serverIds. If any of them not exists, |
| 999 | + // throw an error and none of them will be removed form coordinatorContext and zk. |
| 1000 | + Map<Integer, ServerInfo> liveTabletServers = coordinatorContext.getLiveTabletServers(); |
| 1001 | + for (Integer serverId : serverIds) { |
| 1002 | + if (!liveTabletServers.containsKey(serverId)) { |
| 1003 | + throw new ServerNotExistException( |
| 1004 | + String.format( |
| 1005 | + "Server %s not exists when trying to removing server tag.", |
| 1006 | + serverId)); |
| 1007 | + } |
| 1008 | + |
| 1009 | + if (!coordinatorContext.getServerTag(serverId).isPresent()) { |
| 1010 | + throw new ServerTagNotExistException( |
| 1011 | + String.format( |
| 1012 | + "Server tag %s not exists for server %s.", serverTag, serverId)); |
| 1013 | + } |
| 1014 | + } |
| 1015 | + |
| 1016 | + // First register to zk, and then update coordinatorContext. |
| 1017 | + Map<Integer, ServerTag> serverTags = coordinatorContext.getServerTags(); |
| 1018 | + for (Integer serverId : serverIds) { |
| 1019 | + serverTags.remove(serverId); |
| 1020 | + } |
| 1021 | + |
| 1022 | + try { |
| 1023 | + zooKeeperClient.registerServerTags(new ServerTags(serverTags)); |
| 1024 | + } catch (Exception e) { |
| 1025 | + LOG.error("Error when register server tags to zookeeper.", e); |
| 1026 | + throw new UnknownServerException("Error when register server tags to zookeeper.", e); |
| 1027 | + } |
| 1028 | + |
| 1029 | + // Then update coordinatorContext. |
| 1030 | + serverIds.forEach(coordinatorContext::removeServerTag); |
| 1031 | + |
| 1032 | + return removeServerTagResponse; |
| 1033 | + } |
| 1034 | + |
926 | 1035 | private List<AdjustIsrResultForBucket> tryProcessAdjustIsr( |
927 | 1036 | Map<TableBucket, LeaderAndIsr> leaderAndIsrList) { |
928 | 1037 | // TODO verify leader epoch. |
|
0 commit comments