Skip to content

Commit ea8c313

Browse files
author
Liebing
committed
[server] Change the implementation of Admin#getServerNodes()
1 parent 2368e6a commit ea8c313

File tree

14 files changed

+228
-23
lines changed

14 files changed

+228
-23
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.alibaba.fluss.client.metadata.LakeSnapshot;
2323
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2424
import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
25-
import com.alibaba.fluss.cluster.Cluster;
2625
import com.alibaba.fluss.cluster.ServerNode;
2726
import com.alibaba.fluss.cluster.maintencance.GoalType;
2827
import com.alibaba.fluss.cluster.maintencance.RebalancePlanForBucket;
@@ -91,7 +90,7 @@
9190
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
9291
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
9392
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
94-
import static com.alibaba.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
93+
import static com.alibaba.fluss.client.utils.MetadataUtils.sendDescribeClusterRequest;
9594
import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
9695
import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toPbAclBindingFilters;
9796
import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toPbAclFilter;
@@ -127,17 +126,8 @@ public CompletableFuture<List<ServerNode>> getServerNodes() {
127126
CompletableFuture.runAsync(
128127
() -> {
129128
try {
130-
List<ServerNode> serverNodeList = new ArrayList<>();
131-
Cluster cluster =
132-
sendMetadataRequestAndRebuildCluster(
133-
readOnlyGateway,
134-
false,
135-
metadataUpdater.getCluster(),
136-
null,
137-
null,
138-
null);
139-
serverNodeList.add(cluster.getCoordinatorServer());
140-
serverNodeList.addAll(cluster.getAliveTabletServerList());
129+
List<ServerNode> serverNodeList =
130+
sendDescribeClusterRequest(readOnlyGateway);
141131
future.complete(serverNodeList);
142132
} catch (Throwable t) {
143133
future.completeExceptionally(t);

fluss-client/src/main/java/com/alibaba/fluss/client/utils/MetadataUtils.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alibaba.fluss.cluster.Cluster;
2222
import com.alibaba.fluss.cluster.ServerNode;
2323
import com.alibaba.fluss.cluster.ServerType;
24+
import com.alibaba.fluss.cluster.maintencance.ServerTag;
2425
import com.alibaba.fluss.exception.FlussRuntimeException;
2526
import com.alibaba.fluss.exception.StaleMetadataException;
2627
import com.alibaba.fluss.metadata.PhysicalTablePath;
@@ -31,6 +32,8 @@
3132
import com.alibaba.fluss.rpc.GatewayClientProxy;
3233
import com.alibaba.fluss.rpc.RpcClient;
3334
import com.alibaba.fluss.rpc.gateway.AdminReadOnlyGateway;
35+
import com.alibaba.fluss.rpc.messages.DescribeClusterRequest;
36+
import com.alibaba.fluss.rpc.messages.DescribeClusterResponse;
3437
import com.alibaba.fluss.rpc.messages.MetadataRequest;
3538
import com.alibaba.fluss.rpc.messages.MetadataResponse;
3639
import com.alibaba.fluss.rpc.messages.PbBucketMetadata;
@@ -166,6 +169,22 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
166169
// time out here
167170
}
168171

172+
public static List<ServerNode> sendDescribeClusterRequest(AdminReadOnlyGateway gateway)
173+
throws ExecutionException, InterruptedException, TimeoutException {
174+
DescribeClusterRequest describeClusterRequest = new DescribeClusterRequest();
175+
return gateway.describeCluster(describeClusterRequest)
176+
.thenApply(
177+
response -> {
178+
List<ServerNode> serverNodes = new ArrayList<>();
179+
serverNodes.add(getCoordinatorServer(response));
180+
serverNodes.addAll(getAliveTabletServers(response));
181+
return serverNodes;
182+
})
183+
.get(30, TimeUnit.SECONDS); // TODO currently, we don't have timeout logic in
184+
// RpcClient, it will let the get() block forever. So we
185+
// time out here
186+
}
187+
169188
private static NewTableMetadata getTableMetadataToUpdate(
170189
Cluster cluster,
171190
MetadataResponse metadataResponse,
@@ -304,6 +323,38 @@ private static Map<Integer, ServerNode> getAliveTabletServers(MetadataResponse r
304323
return aliveTabletServers;
305324
}
306325

326+
private static ServerNode getCoordinatorServer(DescribeClusterResponse response) {
327+
if (!response.hasCoordinatorServer()) {
328+
return null;
329+
} else {
330+
PbServerNode protoServerNode = response.getCoordinatorServer();
331+
return new ServerNode(
332+
protoServerNode.getNodeId(),
333+
protoServerNode.getHost(),
334+
protoServerNode.getPort(),
335+
ServerType.COORDINATOR);
336+
}
337+
}
338+
339+
public static List<ServerNode> getAliveTabletServers(DescribeClusterResponse response) {
340+
List<ServerNode> aliveTabletServers = new ArrayList<>();
341+
response.getTabletServersList()
342+
.forEach(
343+
serverNode -> {
344+
aliveTabletServers.add(
345+
new ServerNode(
346+
serverNode.getNodeId(),
347+
serverNode.getHost(),
348+
serverNode.getPort(),
349+
ServerType.TABLET_SERVER,
350+
serverNode.hasRack() ? serverNode.getRack() : null,
351+
serverNode.hasServerTag()
352+
? ServerTag.valueOf(serverNode.getServerTag())
353+
: null));
354+
});
355+
return aliveTabletServers;
356+
}
357+
307358
private static List<BucketLocation> toBucketLocations(
308359
TablePath tablePath,
309360
long tableId,

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,11 +1072,10 @@ public void testAddAndRemoveServerTags() throws Exception {
10721072

10731073
// 2.add server tag for server 0,1.
10741074
admin.addServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1075-
// TODO use api to get serverTags instead of getting from zk directly
1076-
assertThat(zkClient.getServerTags()).isPresent();
1077-
assertThat(zkClient.getServerTags().get().getServerTags())
1078-
.containsEntry(0, ServerTag.PERMANENT_OFFLINE)
1079-
.containsEntry(1, ServerTag.PERMANENT_OFFLINE);
1075+
assertThat(admin.getServerNodes().get())
1076+
.filteredOn(serverNode -> serverNode.serverTag() != null)
1077+
.extracting(ServerNode::id)
1078+
.containsExactlyInAnyOrder(0, 1);
10801079

10811080
// 3.add server tag for server 0,2. error will be thrown and tag for 2 will not be added.
10821081
assertThatThrownBy(
@@ -1100,8 +1099,10 @@ public void testAddAndRemoveServerTags() throws Exception {
11001099

11011100
// 5.remove server tag for server 0,1.
11021101
admin.removeServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1103-
assertThat(zkClient.getServerTags()).isPresent();
1104-
assertThat(zkClient.getServerTags().get().getServerTags()).isEmpty();
1102+
assertThat(admin.getServerNodes().get())
1103+
.filteredOn(serverNode -> serverNode.serverTag() != null)
1104+
.extracting(ServerNode::id)
1105+
.isEmpty();
11051106

11061107
// 6.remove server tag for server 2. error will be thrown and tag for 2 will not be removed.
11071108
assertThatThrownBy(

fluss-common/src/main/java/com/alibaba/fluss/cluster/ServerNode.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alibaba.fluss.cluster;
1919

2020
import com.alibaba.fluss.annotation.PublicEvolving;
21+
import com.alibaba.fluss.cluster.maintencance.ServerTag;
2122

2223
import javax.annotation.Nullable;
2324

@@ -39,6 +40,9 @@ public class ServerNode {
3940
/** rack info for ServerNode. Currently, only tabletServer has rack info. */
4041
private final @Nullable String rack;
4142

43+
/** Sever tag of tabletServer. */
44+
private final @Nullable ServerTag serverTag;
45+
4246
// Cache hashCode as it is called in performance sensitive parts of the code (e.g.
4347
// RecordAccumulator.ready)
4448
private Integer hash;
@@ -48,11 +52,22 @@ public ServerNode(int id, String host, int port, ServerType serverType) {
4852
}
4953

5054
public ServerNode(int id, String host, int port, ServerType serverType, @Nullable String rack) {
55+
this(id, host, port, serverType, rack, null);
56+
}
57+
58+
public ServerNode(
59+
int id,
60+
String host,
61+
int port,
62+
ServerType serverType,
63+
@Nullable String rack,
64+
@Nullable ServerTag serverTag) {
5165
this.id = id;
5266
this.host = host;
5367
this.port = port;
5468
this.serverType = serverType;
5569
this.rack = rack;
70+
this.serverTag = serverTag;
5671
if (serverType == ServerType.COORDINATOR) {
5772
this.uid = "cs-" + id;
5873
} else {
@@ -96,6 +111,11 @@ public ServerType serverType() {
96111
return rack;
97112
}
98113

114+
/** The server tag for this node. */
115+
public @Nullable ServerTag serverTag() {
116+
return serverTag;
117+
}
118+
99119
/**
100120
* Check whether this node is empty, which may be the case if noNode() is used as a placeholder
101121
* in a response payload with an error.
@@ -115,6 +135,7 @@ public int hashCode() {
115135
result = 31 * result + port;
116136
result = 31 * result + serverType.hashCode();
117137
result = 31 * result + ((rack == null) ? 0 : rack.hashCode());
138+
result = 31 * result + ((serverTag == null) ? 0 : serverTag.hashCode());
118139
this.hash = result;
119140
return result;
120141
} else {
@@ -135,11 +156,12 @@ public boolean equals(Object obj) {
135156
&& port == other.port
136157
&& Objects.equals(host, other.host)
137158
&& serverType == other.serverType
138-
&& Objects.equals(rack, other.rack);
159+
&& Objects.equals(rack, other.rack)
160+
&& Objects.equals(serverTag, other.serverTag);
139161
}
140162

141163
@Override
142164
public String toString() {
143-
return host + ":" + port + " (id: " + uid + ", rack: " + rack + ")";
165+
return host + ":" + port + " (id: " + uid + ", rack: " + rack + ", tag: " + serverTag + ")";
144166
}
145167
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.alibaba.fluss.rpc.RpcGateway;
2121
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
2222
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
23+
import com.alibaba.fluss.rpc.messages.DescribeClusterRequest;
24+
import com.alibaba.fluss.rpc.messages.DescribeClusterResponse;
2325
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
2426
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoResponse;
2527
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;
@@ -187,4 +189,13 @@ CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
187189
*/
188190
@RPC(api = ApiKeys.LIST_ACLS)
189191
CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest request);
192+
193+
/**
194+
* Get the server node information of the current cluster.
195+
*
196+
* @param request Describe cluster request
197+
* @return a future returns server node
198+
*/
199+
@RPC(api = ApiKeys.DESCRIBE_CLUSTER)
200+
CompletableFuture<DescribeClusterResponse> describeCluster(DescribeClusterRequest request);
190201
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ public enum ApiKeys {
7575
REMOVE_SERVER_TAG(1044, 0, 0, PUBLIC),
7676
REBALANCE(1045, 0, 0, PUBLIC),
7777
LIST_REBALANCE_PROCESS(1046, 0, 0, PUBLIC),
78-
CANCEL_REBALANCE(1047, 0, 0, PUBLIC);
78+
CANCEL_REBALANCE(1047, 0, 0, PUBLIC),
79+
DESCRIBE_CLUSTER(1048, 0, 0, PUBLIC);
7980

8081
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
8182
Arrays.stream(ApiKeys.values())

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,15 @@ message UpdateMetadataRequest {
178178
message UpdateMetadataResponse {
179179
}
180180

181+
// describe cluster request and response
182+
message DescribeClusterRequest {
183+
}
184+
185+
message DescribeClusterResponse {
186+
optional PbServerNode coordinator_server = 1;
187+
repeated PbServerNode tablet_servers = 2;
188+
}
189+
181190
// produce log request and response
182191
message ProduceLogRequest {
183192
required int32 acks = 1;
@@ -596,6 +605,7 @@ message PbServerNode {
596605
required int32 port = 3;
597606
optional string listeners = 4;
598607
optional string rack = 5;
608+
optional int32 server_tag = 6;
599609
}
600610

601611
message PbTableMetadata {

fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingTabletGatewayService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
2222
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
2323
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
24+
import com.alibaba.fluss.rpc.messages.DescribeClusterRequest;
25+
import com.alibaba.fluss.rpc.messages.DescribeClusterResponse;
2426
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
2527
import com.alibaba.fluss.rpc.messages.FetchLogResponse;
2628
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
@@ -241,4 +243,10 @@ public CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
241243
public CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest request) {
242244
return null;
243245
}
246+
247+
@Override
248+
public CompletableFuture<DescribeClusterResponse> describeCluster(
249+
DescribeClusterRequest request) {
250+
return null;
251+
}
244252
}

fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package com.alibaba.fluss.server;
1919

20+
import com.alibaba.fluss.cluster.ServerNode;
2021
import com.alibaba.fluss.cluster.ServerType;
22+
import com.alibaba.fluss.cluster.maintencance.ServerTag;
2123
import com.alibaba.fluss.exception.FlussRuntimeException;
2224
import com.alibaba.fluss.exception.KvSnapshotNotExistException;
2325
import com.alibaba.fluss.exception.LakeTableSnapshotNotExistException;
@@ -42,6 +44,8 @@
4244
import com.alibaba.fluss.rpc.messages.ApiVersionsResponse;
4345
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
4446
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
47+
import com.alibaba.fluss.rpc.messages.DescribeClusterRequest;
48+
import com.alibaba.fluss.rpc.messages.DescribeClusterResponse;
4549
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
4650
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoResponse;
4751
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;
@@ -92,6 +96,7 @@
9296
import com.alibaba.fluss.server.zk.data.BucketSnapshot;
9397
import com.alibaba.fluss.server.zk.data.LakeTableSnapshot;
9498
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
99+
import com.alibaba.fluss.server.zk.data.ServerTags;
95100
import com.alibaba.fluss.server.zk.data.TableAssignment;
96101

97102
import org.slf4j.Logger;
@@ -113,6 +118,7 @@
113118
import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toAclFilter;
114119
import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec;
115120
import static com.alibaba.fluss.security.acl.Resource.TABLE_SPLITTER;
121+
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.buildDescribeClusterResponse;
116122
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.buildMetadataResponse;
117123
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeGetLatestKvSnapshotsResponse;
118124
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeGetLatestLakeSnapshotResponse;
@@ -466,6 +472,15 @@ public CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest request) {
466472
}
467473
}
468474

475+
@Override
476+
public CompletableFuture<DescribeClusterResponse> describeCluster(
477+
DescribeClusterRequest request) {
478+
return CompletableFuture.completedFuture(
479+
makeDescribeClusterResponse(currentListenerName(), getServerMetadataCache()));
480+
}
481+
482+
protected abstract ServerMetadataCache getServerMetadataCache();
483+
469484
protected MetadataResponse makeMetadataResponse(
470485
MetadataRequest request,
471486
String listenerName,
@@ -533,6 +548,42 @@ protected MetadataResponse makeMetadataResponse(
533548
partitionMetadata);
534549
}
535550

551+
protected DescribeClusterResponse makeDescribeClusterResponse(
552+
String listenerName, ServerMetadataCache metadataCache) {
553+
ServerNode coordinatorServer = metadataCache.getCoordinatorServer(listenerName);
554+
555+
Collection<ServerNode> aliveTabletServers =
556+
metadataCache.getAllAliveTabletServers(listenerName).values();
557+
Set<ServerNode> aliveTabletServersWithTag = new HashSet<>(aliveTabletServers.size());
558+
try {
559+
Optional<ServerTags> serverTagsOp = zkClient.getServerTags();
560+
if (serverTagsOp.isPresent()) {
561+
Map<Integer, ServerTag> tagMap = serverTagsOp.get().getServerTags();
562+
for (ServerNode server : aliveTabletServers) {
563+
if (tagMap.containsKey(server.id())) {
564+
ServerNode serverWithTag =
565+
new ServerNode(
566+
server.id(),
567+
server.host(),
568+
server.port(),
569+
server.serverType(),
570+
server.rack(),
571+
tagMap.get(server.id()));
572+
aliveTabletServersWithTag.add(serverWithTag);
573+
} else {
574+
aliveTabletServersWithTag.add(server);
575+
}
576+
}
577+
} else {
578+
aliveTabletServersWithTag.addAll(aliveTabletServers);
579+
}
580+
} catch (Exception e) {
581+
throw new FlussRuntimeException("Failed to get server tags", e);
582+
}
583+
584+
return buildDescribeClusterResponse(coordinatorServer, aliveTabletServersWithTag);
585+
}
586+
536587
public static List<BucketMetadata> getTableMetadataFromZk(
537588
ZooKeeperClient zkClient, TablePath tablePath, long tableId, boolean isPartitioned) {
538589
try {

0 commit comments

Comments
 (0)