|
21 | 21 | import org.apache.fluss.cluster.Endpoint; |
22 | 22 | import org.apache.fluss.cluster.ServerNode; |
23 | 23 | import org.apache.fluss.cluster.ServerType; |
| 24 | +import org.apache.fluss.cluster.maintencance.ServerTag; |
24 | 25 | import org.apache.fluss.config.ConfigOptions; |
25 | 26 | import org.apache.fluss.config.Configuration; |
26 | 27 | import org.apache.fluss.exception.FencedLeaderEpochException; |
27 | 28 | import org.apache.fluss.exception.FlussRuntimeException; |
28 | 29 | import org.apache.fluss.exception.InvalidCoordinatorException; |
29 | 30 | import org.apache.fluss.exception.InvalidUpdateVersionException; |
| 31 | +import org.apache.fluss.exception.ServerNotExistException; |
| 32 | +import org.apache.fluss.exception.ServerTagAlreadyExistException; |
| 33 | +import org.apache.fluss.exception.ServerTagNotExistException; |
| 34 | +import org.apache.fluss.exception.UnknownServerException; |
30 | 35 | import org.apache.fluss.exception.UnknownTableOrBucketException; |
31 | 36 | import org.apache.fluss.metadata.PhysicalTablePath; |
32 | 37 | import org.apache.fluss.metadata.TableBucket; |
33 | 38 | import org.apache.fluss.metadata.TableBucketReplica; |
34 | 39 | import org.apache.fluss.metadata.TableInfo; |
35 | 40 | import org.apache.fluss.metadata.TablePartition; |
36 | 41 | import org.apache.fluss.metadata.TablePath; |
| 42 | +import org.apache.fluss.rpc.messages.AddServerTagResponse; |
37 | 43 | import org.apache.fluss.rpc.messages.AdjustIsrResponse; |
38 | 44 | import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse; |
39 | 45 | import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse; |
40 | 46 | import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse; |
41 | 47 | import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable; |
| 48 | +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; |
42 | 49 | import org.apache.fluss.rpc.protocol.ApiError; |
43 | 50 | import org.apache.fluss.server.coordinator.event.AccessContextEvent; |
| 51 | +import org.apache.fluss.server.coordinator.event.AddServerTagEvent; |
44 | 52 | import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; |
45 | 53 | import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; |
46 | 54 | import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent; |
|
57 | 65 | import org.apache.fluss.server.coordinator.event.FencedCoordinatorEvent; |
58 | 66 | import org.apache.fluss.server.coordinator.event.NewTabletServerEvent; |
59 | 67 | import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent; |
| 68 | +import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; |
60 | 69 | import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher; |
61 | 70 | import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher; |
62 | 71 | import org.apache.fluss.server.coordinator.statemachine.ReplicaStateMachine; |
|
77 | 86 | import org.apache.fluss.server.zk.data.LeaderAndIsr; |
78 | 87 | import org.apache.fluss.server.zk.data.PartitionAssignment; |
79 | 88 | import org.apache.fluss.server.zk.data.RemoteLogManifestHandle; |
| 89 | +import org.apache.fluss.server.zk.data.ServerTags; |
80 | 90 | import org.apache.fluss.server.zk.data.TableAssignment; |
81 | 91 | import org.apache.fluss.server.zk.data.TabletServerRegistration; |
82 | 92 | import org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode; |
@@ -298,6 +308,11 @@ private void initCoordinatorContext() throws Exception { |
298 | 308 | // init tablet server channels |
299 | 309 | coordinatorChannelManager.startup(internalServerNodes); |
300 | 310 |
|
| 311 | + // load server tags. |
| 312 | + zooKeeperClient |
| 313 | + .getServerTags() |
| 314 | + .ifPresent(tags -> coordinatorContext.initSeverTags(tags.getServerTags())); |
| 315 | + |
301 | 316 | // load all tables |
302 | 317 | List<TableInfo> autoPartitionTables = new ArrayList<>(); |
303 | 318 | List<Tuple2<TableInfo, Long>> lakeTables = new ArrayList<>(); |
@@ -470,6 +485,16 @@ public void process(CoordinatorEvent event) { |
470 | 485 | completeFromCallable( |
471 | 486 | commitLakeTableSnapshotEvent.getRespCallback(), |
472 | 487 | () -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent)); |
| 488 | + } else if (event instanceof AddServerTagEvent) { |
| 489 | + AddServerTagEvent addServerTagEvent = (AddServerTagEvent) event; |
| 490 | + completeFromCallable( |
| 491 | + addServerTagEvent.getRespCallback(), |
| 492 | + () -> processAddServerTag(addServerTagEvent)); |
| 493 | + } else if (event instanceof RemoveServerTagEvent) { |
| 494 | + RemoveServerTagEvent removeServerTagEvent = (RemoveServerTagEvent) event; |
| 495 | + completeFromCallable( |
| 496 | + removeServerTagEvent.getRespCallback(), |
| 497 | + () -> processRemoveServerTag(removeServerTagEvent)); |
473 | 498 | } else if (event instanceof AccessContextEvent) { |
474 | 499 | AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event; |
475 | 500 | processAccessContext(accessContextEvent); |
@@ -827,6 +852,90 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent |
827 | 852 | updateTabletServerMetadataCache(serverInfos, null, null, bucketsWithOfflineLeader); |
828 | 853 | } |
829 | 854 |
|
| 855 | + private AddServerTagResponse processAddServerTag(AddServerTagEvent event) { |
| 856 | + AddServerTagResponse addServerTagResponse = new AddServerTagResponse(); |
| 857 | + List<Integer> serverIds = event.getServerIds(); |
| 858 | + ServerTag serverTag = event.getServerTag(); |
| 859 | + |
| 860 | + // Verify that dose serverTag exist for input serverIds. If any of them exists, throw |
| 861 | + // an error and none of them will be written to coordinatorContext and zk. |
| 862 | + Map<Integer, ServerInfo> liveTabletServers = coordinatorContext.getLiveTabletServers(); |
| 863 | + for (Integer serverId : serverIds) { |
| 864 | + if (!liveTabletServers.containsKey(serverId)) { |
| 865 | + throw new ServerNotExistException( |
| 866 | + String.format( |
| 867 | + "Server %s not exists when trying to add server tag.", serverId)); |
| 868 | + } |
| 869 | + |
| 870 | + if (coordinatorContext.getServerTag(serverId).isPresent()) { |
| 871 | + throw new ServerTagAlreadyExistException( |
| 872 | + String.format( |
| 873 | + "Server tag %s already exists for server %s.", |
| 874 | + serverTag, serverId)); |
| 875 | + } |
| 876 | + } |
| 877 | + |
| 878 | + // First register to zk, and then update coordinatorContext. |
| 879 | + Map<Integer, ServerTag> serverTags = coordinatorContext.getServerTags(); |
| 880 | + for (Integer serverId : serverIds) { |
| 881 | + serverTags.put(serverId, serverTag); |
| 882 | + } |
| 883 | + |
| 884 | + try { |
| 885 | + zooKeeperClient.registerServerTags(new ServerTags(serverTags)); |
| 886 | + } catch (Exception e) { |
| 887 | + LOG.error("Error when register server tags to zookeeper.", e); |
| 888 | + throw new UnknownServerException("Error when register server tags to zookeeper.", e); |
| 889 | + } |
| 890 | + |
| 891 | + // Then update coordinatorContext. |
| 892 | + serverIds.forEach(serverId -> coordinatorContext.putServerTag(serverId, serverTag)); |
| 893 | + |
| 894 | + return addServerTagResponse; |
| 895 | + } |
| 896 | + |
| 897 | + private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent event) { |
| 898 | + RemoveServerTagResponse removeServerTagResponse = new RemoveServerTagResponse(); |
| 899 | + List<Integer> serverIds = event.getServerIds(); |
| 900 | + ServerTag serverTag = event.getServerTag(); |
| 901 | + |
| 902 | + // Verify that dose serverTag not exist for input serverIds. If any of them not exists, |
| 903 | + // throw an error and none of them will be removed form coordinatorContext and zk. |
| 904 | + Map<Integer, ServerInfo> liveTabletServers = coordinatorContext.getLiveTabletServers(); |
| 905 | + for (Integer serverId : serverIds) { |
| 906 | + if (!liveTabletServers.containsKey(serverId)) { |
| 907 | + throw new ServerNotExistException( |
| 908 | + String.format( |
| 909 | + "Server %s not exists when trying to removing server tag.", |
| 910 | + serverId)); |
| 911 | + } |
| 912 | + |
| 913 | + if (!coordinatorContext.getServerTag(serverId).isPresent()) { |
| 914 | + throw new ServerTagNotExistException( |
| 915 | + String.format( |
| 916 | + "Server tag %s not exists for server %s.", serverTag, serverId)); |
| 917 | + } |
| 918 | + } |
| 919 | + |
| 920 | + // First register to zk, and then update coordinatorContext. |
| 921 | + Map<Integer, ServerTag> serverTags = coordinatorContext.getServerTags(); |
| 922 | + for (Integer serverId : serverIds) { |
| 923 | + serverTags.remove(serverId); |
| 924 | + } |
| 925 | + |
| 926 | + try { |
| 927 | + zooKeeperClient.registerServerTags(new ServerTags(serverTags)); |
| 928 | + } catch (Exception e) { |
| 929 | + LOG.error("Error when register server tags to zookeeper.", e); |
| 930 | + throw new UnknownServerException("Error when register server tags to zookeeper.", e); |
| 931 | + } |
| 932 | + |
| 933 | + // Then update coordinatorContext. |
| 934 | + serverIds.forEach(coordinatorContext::removeServerTag); |
| 935 | + |
| 936 | + return removeServerTagResponse; |
| 937 | + } |
| 938 | + |
830 | 939 | private List<AdjustIsrResultForBucket> tryProcessAdjustIsr( |
831 | 940 | Map<TableBucket, LeaderAndIsr> leaderAndIsrList) { |
832 | 941 | // TODO verify leader epoch. |
|
0 commit comments