Skip to content

Commit 0d200dc

Browse files
committed
Refactor Admin#getServerNodes to support to carry ServerTag
1 parent 10d42cc commit 0d200dc

File tree

15 files changed

+232
-25
lines changed

15 files changed

+232
-25
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.fluss.client.metadata.LakeSnapshot;
2323
import org.apache.fluss.client.metadata.MetadataUpdater;
2424
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
25-
import org.apache.fluss.cluster.Cluster;
2625
import org.apache.fluss.cluster.ServerNode;
2726
import org.apache.fluss.cluster.rebalance.GoalType;
2827
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
@@ -92,7 +91,7 @@
9291
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
9392
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
9493
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
95-
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
94+
import static org.apache.fluss.client.utils.MetadataUtils.sendDescribeClusterRequest;
9695
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
9796
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toPbAclBindingFilters;
9897
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toPbAclFilter;
@@ -126,17 +125,8 @@ public CompletableFuture<List<ServerNode>> getServerNodes() {
126125
CompletableFuture.runAsync(
127126
() -> {
128127
try {
129-
List<ServerNode> serverNodeList = new ArrayList<>();
130-
Cluster cluster =
131-
sendMetadataRequestAndRebuildCluster(
132-
readOnlyGateway,
133-
false,
134-
metadataUpdater.getCluster(),
135-
null,
136-
null,
137-
null);
138-
serverNodeList.add(cluster.getCoordinatorServer());
139-
serverNodeList.addAll(cluster.getAliveTabletServerList());
128+
List<ServerNode> serverNodeList =
129+
sendDescribeClusterRequest(readOnlyGateway);
140130
future.complete(serverNodeList);
141131
} catch (Throwable t) {
142132
future.completeExceptionally(t);

fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.cluster.Cluster;
2222
import org.apache.fluss.cluster.ServerNode;
2323
import org.apache.fluss.cluster.ServerType;
24+
import org.apache.fluss.cluster.rebalance.ServerTag;
2425
import org.apache.fluss.exception.FlussRuntimeException;
2526
import org.apache.fluss.exception.StaleMetadataException;
2627
import org.apache.fluss.metadata.PhysicalTablePath;
@@ -31,6 +32,8 @@
3132
import org.apache.fluss.rpc.GatewayClientProxy;
3233
import org.apache.fluss.rpc.RpcClient;
3334
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
35+
import org.apache.fluss.rpc.messages.DescribeClusterRequest;
36+
import org.apache.fluss.rpc.messages.DescribeClusterResponse;
3437
import org.apache.fluss.rpc.messages.MetadataRequest;
3538
import org.apache.fluss.rpc.messages.MetadataResponse;
3639
import org.apache.fluss.rpc.messages.PbBucketMetadata;
@@ -165,6 +168,22 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
165168
// time out here
166169
}
167170

171+
public static List<ServerNode> sendDescribeClusterRequest(AdminReadOnlyGateway gateway)
172+
throws ExecutionException, InterruptedException, TimeoutException {
173+
DescribeClusterRequest describeClusterRequest = new DescribeClusterRequest();
174+
return gateway.describeCluster(describeClusterRequest)
175+
.thenApply(
176+
response -> {
177+
List<ServerNode> serverNodes = new ArrayList<>();
178+
serverNodes.add(getCoordinatorServer(response));
179+
serverNodes.addAll(getAliveTabletServers(response));
180+
return serverNodes;
181+
})
182+
.get(30, TimeUnit.SECONDS); // TODO currently, we don't have timeout logic in
183+
// RpcClient, it will let the get() block forever. So we
184+
// time out here
185+
}
186+
168187
private static NewTableMetadata getTableMetadataToUpdate(
169188
Cluster cluster, MetadataResponse metadataResponse) {
170189
Map<TablePath, Long> newTablePathToTableId = new HashMap<>();
@@ -277,6 +296,19 @@ private static ServerNode getCoordinatorServer(MetadataResponse response) {
277296
}
278297
}
279298

299+
private static ServerNode getCoordinatorServer(DescribeClusterResponse response) {
300+
if (!response.hasCoordinatorServer()) {
301+
return null;
302+
} else {
303+
PbServerNode protoServerNode = response.getCoordinatorServer();
304+
return new ServerNode(
305+
protoServerNode.getNodeId(),
306+
protoServerNode.getHost(),
307+
protoServerNode.getPort(),
308+
ServerType.COORDINATOR);
309+
}
310+
}
311+
280312
private static Map<Integer, ServerNode> getAliveTabletServers(MetadataResponse response) {
281313
Map<Integer, ServerNode> aliveTabletServers = new HashMap<>();
282314
response.getTabletServersList()
@@ -295,6 +327,25 @@ private static Map<Integer, ServerNode> getAliveTabletServers(MetadataResponse r
295327
return aliveTabletServers;
296328
}
297329

330+
public static List<ServerNode> getAliveTabletServers(DescribeClusterResponse response) {
331+
List<ServerNode> aliveTabletServers = new ArrayList<>();
332+
response.getTabletServersList()
333+
.forEach(
334+
serverNode -> {
335+
aliveTabletServers.add(
336+
new ServerNode(
337+
serverNode.getNodeId(),
338+
serverNode.getHost(),
339+
serverNode.getPort(),
340+
ServerType.TABLET_SERVER,
341+
serverNode.hasRack() ? serverNode.getRack() : null,
342+
serverNode.hasServerTag()
343+
? ServerTag.valueOf(serverNode.getServerTag())
344+
: null));
345+
});
346+
return aliveTabletServers;
347+
}
348+
298349
private static List<BucketLocation> toBucketLocations(
299350
TablePath tablePath,
300351
long tableId,

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,11 +1072,10 @@ public void testAddAndRemoveServerTags() throws Exception {
10721072

10731073
// 2.add server tag for server 0,1.
10741074
admin.addServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1075-
// TODO use api to get serverTags instead of getting from zk directly
1076-
assertThat(zkClient.getServerTags()).isPresent();
1077-
assertThat(zkClient.getServerTags().get().getServerTags())
1078-
.containsEntry(0, ServerTag.PERMANENT_OFFLINE)
1079-
.containsEntry(1, ServerTag.PERMANENT_OFFLINE);
1075+
assertThat(admin.getServerNodes().get())
1076+
.filteredOn(serverNode -> serverNode.serverTag() != null)
1077+
.extracting(ServerNode::id)
1078+
.containsExactlyInAnyOrder(0, 1);
10801079

10811080
// 3.add server tag for server 0,2. error will be thrown and tag for 2 will not be added.
10821081
assertThatThrownBy(
@@ -1100,8 +1099,10 @@ public void testAddAndRemoveServerTags() throws Exception {
11001099

11011100
// 5.remove server tag for server 0,1.
11021101
admin.removeServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1103-
assertThat(zkClient.getServerTags()).isPresent();
1104-
assertThat(zkClient.getServerTags().get().getServerTags()).isEmpty();
1102+
assertThat(admin.getServerNodes().get())
1103+
.filteredOn(serverNode -> serverNode.serverTag() != null)
1104+
.extracting(ServerNode::id)
1105+
.isEmpty();
11051106

11061107
// 6.remove server tag for server 2. error will be thrown and tag for 2 will not be removed.
11071108
assertThatThrownBy(

fluss-common/src/main/java/org/apache/fluss/cluster/ServerNode.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.cluster;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.cluster.rebalance.ServerTag;
2122

2223
import javax.annotation.Nullable;
2324

@@ -39,6 +40,9 @@ public class ServerNode {
3940
/** rack info for ServerNode. Currently, only tabletServer has rack info. */
4041
private final @Nullable String rack;
4142

43+
/** Sever tag of tabletServer. */
44+
private final @Nullable ServerTag serverTag;
45+
4246
// Cache hashCode as it is called in performance sensitive parts of the code (e.g.
4347
// RecordAccumulator.ready)
4448
private Integer hash;
@@ -48,11 +52,22 @@ public ServerNode(int id, String host, int port, ServerType serverType) {
4852
}
4953

5054
public ServerNode(int id, String host, int port, ServerType serverType, @Nullable String rack) {
55+
this(id, host, port, serverType, rack, null);
56+
}
57+
58+
public ServerNode(
59+
int id,
60+
String host,
61+
int port,
62+
ServerType serverType,
63+
@Nullable String rack,
64+
@Nullable ServerTag serverTag) {
5165
this.id = id;
5266
this.host = host;
5367
this.port = port;
5468
this.serverType = serverType;
5569
this.rack = rack;
70+
this.serverTag = serverTag;
5671
if (serverType == ServerType.COORDINATOR) {
5772
this.uid = "cs-" + id;
5873
} else {
@@ -96,6 +111,11 @@ public ServerType serverType() {
96111
return rack;
97112
}
98113

114+
/** The server tag for this node. */
115+
public @Nullable ServerTag serverTag() {
116+
return serverTag;
117+
}
118+
99119
/**
100120
* Check whether this node is empty, which may be the case if noNode() is used as a placeholder
101121
* in a response payload with an error.
@@ -115,6 +135,7 @@ public int hashCode() {
115135
result = 31 * result + port;
116136
result = 31 * result + serverType.hashCode();
117137
result = 31 * result + ((rack == null) ? 0 : rack.hashCode());
138+
result = 31 * result + ((serverTag == null) ? 0 : serverTag.hashCode());
118139
this.hash = result;
119140
return result;
120141
} else {
@@ -135,11 +156,12 @@ public boolean equals(Object obj) {
135156
&& port == other.port
136157
&& Objects.equals(host, other.host)
137158
&& serverType == other.serverType
138-
&& Objects.equals(rack, other.rack);
159+
&& Objects.equals(rack, other.rack)
160+
&& Objects.equals(serverTag, other.serverTag);
139161
}
140162

141163
@Override
142164
public String toString() {
143-
return host + ":" + port + " (id: " + uid + ", rack: " + rack + ")";
165+
return host + ":" + port + " (id: " + uid + ", rack: " + rack + ", tag: " + serverTag + ")";
144166
}
145167
}

fluss-common/src/test/java/org/apache/fluss/cluster/ServerNodeTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ void testServerNode() {
4545
assertThat(serverNode.hashCode()).isNotEqualTo(serverNode2.hashCode());
4646
assertThat(serverNode).isEqualTo(new ServerNode(0, "HOST1", 9023, ServerType.COORDINATOR));
4747

48-
assertThat(serverNode.toString()).isEqualTo("HOST1:9023 (id: cs-0, rack: null)");
49-
assertThat(serverNode2.toString()).isEqualTo("HOST2:9123 (id: ts-1, rack: null)");
48+
assertThat(serverNode.toString()).isEqualTo("HOST1:9023 (id: cs-0, rack: null, tag: null)");
49+
assertThat(serverNode2.toString())
50+
.isEqualTo("HOST2:9123 (id: ts-1, rack: null, tag: null)");
5051
}
5152
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.fluss.rpc.RpcGateway;
2121
import org.apache.fluss.rpc.messages.DatabaseExistsRequest;
2222
import org.apache.fluss.rpc.messages.DatabaseExistsResponse;
23+
import org.apache.fluss.rpc.messages.DescribeClusterRequest;
24+
import org.apache.fluss.rpc.messages.DescribeClusterResponse;
2325
import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest;
2426
import org.apache.fluss.rpc.messages.GetDatabaseInfoResponse;
2527
import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;
@@ -187,4 +189,13 @@ CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
187189
*/
188190
@RPC(api = ApiKeys.LIST_ACLS)
189191
CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest request);
192+
193+
/**
194+
* Get the server node information of the current cluster.
195+
*
196+
* @param request Describe cluster request
197+
* @return a future returns server node
198+
*/
199+
@RPC(api = ApiKeys.DESCRIBE_CLUSTER)
200+
CompletableFuture<DescribeClusterResponse> describeCluster(DescribeClusterRequest request);
190201
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ public enum ApiKeys {
7575
REMOVE_SERVER_TAG(1044, 0, 0, PUBLIC),
7676
REBALANCE(1045, 0, 0, PUBLIC),
7777
LIST_REBALANCE_PROCESS(1046, 0, 0, PUBLIC),
78-
CANCEL_REBALANCE(1047, 0, 0, PUBLIC);
78+
CANCEL_REBALANCE(1047, 0, 0, PUBLIC),
79+
DESCRIBE_CLUSTER(1048, 0, 0, PUBLIC);
7980

8081
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
8182
Arrays.stream(ApiKeys.values())

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,15 @@ message UpdateMetadataRequest {
178178
message UpdateMetadataResponse {
179179
}
180180

181+
// describe cluster request and response
182+
message DescribeClusterRequest {
183+
}
184+
185+
message DescribeClusterResponse {
186+
optional PbServerNode coordinator_server = 1;
187+
repeated PbServerNode tablet_servers = 2;
188+
}
189+
181190
// produce log request and response
182191
message ProduceLogRequest {
183192
required int32 acks = 1;
@@ -591,12 +600,14 @@ message PbPhysicalTablePath {
591600
// * versions <= 0.6: host and port are used.
592601
// * versions >= 0.7: listeners is used to replace host and port.
593602
// For MetadataResponse and UpdateMetadataRequest: Fluss versions >= 0.7: we add rack for each tabletServer
603+
// For DescribeClusterResponse: Fluss version >= 0.8: we add server_tag for each tabletServer
594604
message PbServerNode {
595605
required int32 node_id = 1;
596606
required string host = 2;
597607
required int32 port = 3;
598608
optional string listeners = 4;
599609
optional string rack = 5;
610+
optional int32 server_tag = 6;
600611
}
601612

602613
message PbTableMetadata {

fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.fluss.rpc.gateway.TabletServerGateway;
2222
import org.apache.fluss.rpc.messages.DatabaseExistsRequest;
2323
import org.apache.fluss.rpc.messages.DatabaseExistsResponse;
24+
import org.apache.fluss.rpc.messages.DescribeClusterRequest;
25+
import org.apache.fluss.rpc.messages.DescribeClusterResponse;
2426
import org.apache.fluss.rpc.messages.FetchLogRequest;
2527
import org.apache.fluss.rpc.messages.FetchLogResponse;
2628
import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest;
@@ -241,4 +243,10 @@ public CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
241243
public CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest request) {
242244
return null;
243245
}
246+
247+
@Override
248+
public CompletableFuture<DescribeClusterResponse> describeCluster(
249+
DescribeClusterRequest request) {
250+
return null;
251+
}
244252
}

0 commit comments

Comments
 (0)