Skip to content

Commit 8cacbca

Browse files
authored
[server] Support generating rack aware bucket assignment when creating table (#786)
1 parent 5492564 commit 8cacbca

File tree

48 files changed

+1213
-142
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1213
-142
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/utils/MetadataUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,8 @@ private static Map<Integer, ServerNode> getAliveTabletServers(MetadataResponse r
297297
nodeId,
298298
serverNode.getHost(),
299299
serverNode.getPort(),
300-
ServerType.TABLET_SERVER));
300+
ServerType.TABLET_SERVER,
301+
serverNode.getRack()));
301302
});
302303
return aliveTabletServers;
303304
}

fluss-client/src/test/java/com/alibaba/fluss/client/metadata/TestingMetadataUpdater.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ public class TestingMetadataUpdater extends MetadataUpdater {
4444
private static final ServerNode COORDINATOR =
4545
new ServerNode(0, "localhost", 90, ServerType.COORDINATOR);
4646
private static final ServerNode NODE1 =
47-
new ServerNode(1, "localhost", 90, ServerType.TABLET_SERVER);
47+
new ServerNode(1, "localhost", 90, ServerType.TABLET_SERVER, "rack1");
4848
private static final ServerNode NODE2 =
49-
new ServerNode(2, "localhost", 91, ServerType.TABLET_SERVER);
49+
new ServerNode(2, "localhost", 91, ServerType.TABLET_SERVER, "rack2");
5050
private static final ServerNode NODE3 =
51-
new ServerNode(3, "localhost", 92, ServerType.TABLET_SERVER);
51+
new ServerNode(3, "localhost", 92, ServerType.TABLET_SERVER, "rack3");
5252

5353
private final TestCoordinatorGateway coordinatorGateway;
5454
private final Map<Integer, TestTabletServerGateway> tabletServerGatewayMap;

fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ class RecordAccumulatorTest {
9595
System.currentTimeMillis(),
9696
System.currentTimeMillis());
9797

98-
ServerNode node1 = new ServerNode(1, "localhost", 90, ServerType.TABLET_SERVER);
99-
ServerNode node2 = new ServerNode(2, "localhost", 91, ServerType.TABLET_SERVER);
100-
ServerNode node3 = new ServerNode(3, "localhost", 92, ServerType.TABLET_SERVER);
98+
ServerNode node1 = new ServerNode(1, "localhost", 90, ServerType.TABLET_SERVER, "rack1");
99+
ServerNode node2 = new ServerNode(2, "localhost", 91, ServerType.TABLET_SERVER, "rack2");
100+
ServerNode node3 = new ServerNode(3, "localhost", 92, ServerType.TABLET_SERVER, "rack3");
101101
private final ServerNode[] serverNodes = new ServerNode[] {node1, node2, node3};
102102
private final TableBucket tb1 = new TableBucket(DATA1_TABLE_ID, 0);
103103
private final TableBucket tb2 = new TableBucket(DATA1_TABLE_ID, 1);

fluss-client/src/test/java/com/alibaba/fluss/client/write/StickyStaticBucketAssignerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@
4646

4747
/** Test for {@link StickyBucketAssigner}. */
4848
class StickyStaticBucketAssignerTest {
49-
ServerNode node1 = new ServerNode(1, "localhost", 90, ServerType.TABLET_SERVER);
50-
ServerNode node2 = new ServerNode(2, "localhost", 91, ServerType.TABLET_SERVER);
51-
ServerNode node3 = new ServerNode(3, "localhost", 92, ServerType.TABLET_SERVER);
49+
ServerNode node1 = new ServerNode(1, "localhost", 90, ServerType.TABLET_SERVER, "rack1");
50+
ServerNode node2 = new ServerNode(2, "localhost", 91, ServerType.TABLET_SERVER, "rack2");
51+
ServerNode node3 = new ServerNode(3, "localhost", 92, ServerType.TABLET_SERVER, "rack3");
5252
private final ServerNode[] serverNodes = new ServerNode[] {node1, node2, node3};
5353
private final BucketLocation bucket1 =
5454
new BucketLocation(DATA1_PHYSICAL_TABLE_PATH, DATA1_TABLE_ID, 0, node1, serverNodes);

fluss-common/src/main/java/com/alibaba/fluss/cluster/MetadataCache.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,17 @@ public interface MetadataCache {
6464
*/
6565
Map<Integer, ServerNode> getAllAliveTabletServers(String listenerName);
6666

67-
Set<Integer> getAliveTabletServerIds();
67+
Set<TabletServerInfo> getAliveTabletServerInfos();
6868

6969
@Nullable
7070
PhysicalTablePath getTablePath(long tableId);
7171

7272
/** Get ids of all alive tablet server nodes. */
73-
default int[] getLiveServerIds() {
74-
Set<Integer> aliveTabletServerIds = getAliveTabletServerIds();
75-
int[] server = new int[aliveTabletServerIds.size()];
76-
Iterator<Integer> iterator = aliveTabletServerIds.iterator();
77-
for (int i = 0; i < aliveTabletServerIds.size(); i++) {
73+
default TabletServerInfo[] getLiveServers() {
74+
Set<TabletServerInfo> aliveTabletServerInfos = getAliveTabletServerInfos();
75+
TabletServerInfo[] server = new TabletServerInfo[aliveTabletServerInfos.size()];
76+
Iterator<TabletServerInfo> iterator = aliveTabletServerInfos.iterator();
77+
for (int i = 0; i < aliveTabletServerInfos.size(); i++) {
7878
server[i] = iterator.next();
7979
}
8080
return server;

fluss-common/src/main/java/com/alibaba/fluss/cluster/ServerNode.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import com.alibaba.fluss.annotation.PublicEvolving;
2020

21+
import javax.annotation.Nullable;
22+
2123
import java.util.Objects;
2224

2325
/**
@@ -33,15 +35,23 @@ public class ServerNode {
3335
private final int port;
3436
private final ServerType serverType;
3537

38+
/** rack info for ServerNode. Currently, only tabletServer has rack info. */
39+
private final @Nullable String rack;
40+
3641
// Cache hashCode as it is called in performance sensitive parts of the code (e.g.
3742
// RecordAccumulator.ready)
3843
private Integer hash;
3944

4045
public ServerNode(int id, String host, int port, ServerType serverType) {
46+
this(id, host, port, serverType, null);
47+
}
48+
49+
public ServerNode(int id, String host, int port, ServerType serverType, @Nullable String rack) {
4150
this.id = id;
4251
this.host = host;
4352
this.port = port;
4453
this.serverType = serverType;
54+
this.rack = rack;
4555
if (serverType == ServerType.COORDINATOR) {
4656
this.uid = "cs-" + id;
4757
} else {
@@ -80,6 +90,11 @@ public ServerType serverType() {
8090
return serverType;
8191
}
8292

93+
/** The rack for this node. */
94+
public @Nullable String rack() {
95+
return rack;
96+
}
97+
8398
/**
8499
* Check whether this node is empty, which may be the case if noNode() is used as a placeholder
85100
* in a response payload with an error.
@@ -98,6 +113,7 @@ public int hashCode() {
98113
result = 31 * result + id;
99114
result = 31 * result + port;
100115
result = 31 * result + serverType.hashCode();
116+
result = 31 * result + ((rack == null) ? 0 : rack.hashCode());
101117
this.hash = result;
102118
return result;
103119
} else {
@@ -117,11 +133,12 @@ public boolean equals(Object obj) {
117133
return id == other.id
118134
&& port == other.port
119135
&& Objects.equals(host, other.host)
120-
&& serverType == other.serverType;
136+
&& serverType == other.serverType
137+
&& Objects.equals(rack, other.rack);
121138
}
122139

123140
@Override
124141
public String toString() {
125-
return host + ":" + port + " (id: " + uid + ")";
142+
return host + ":" + port + " (id: " + uid + ", rack: " + rack + ")";
126143
}
127144
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.cluster;
18+
19+
import javax.annotation.Nullable;
20+
21+
import java.util.Objects;
22+
23+
/** Tablet server info. */
24+
public class TabletServerInfo {
25+
private final int id;
26+
27+
private @Nullable final String rack;
28+
29+
public TabletServerInfo(int id, @Nullable String rack) {
30+
this.id = id;
31+
this.rack = rack;
32+
}
33+
34+
public int getId() {
35+
return id;
36+
}
37+
38+
public @Nullable String getRack() {
39+
return rack;
40+
}
41+
42+
@Override
43+
public boolean equals(Object o) {
44+
if (this == o) {
45+
return true;
46+
}
47+
if (o == null || getClass() != o.getClass()) {
48+
return false;
49+
}
50+
TabletServerInfo that = (TabletServerInfo) o;
51+
return id == that.id && Objects.equals(rack, that.rack);
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return Objects.hash(id, rack);
57+
}
58+
}

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,14 @@ public class ConfigOptions {
343343
.noDefaultValue()
344344
.withDescription("The id for the tablet server.");
345345

346+
public static final ConfigOption<String> TABLET_SERVER_RACK =
347+
key("tablet-server.rack")
348+
.stringType()
349+
.noDefaultValue()
350+
.withDescription(
351+
"The rack for the tabletServer. This will be used in rack aware bucket assignment "
352+
+ "for fault tolerance. Examples: `RACK1`, `cn-hangzhou-server10`");
353+
346354
public static final ConfigOption<String> DATA_DIR =
347355
key("data.dir")
348356
.stringType()
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.exception;
18+
19+
/**
20+
* Exception for invalid server rack info. For example, not all tabletServer have assigned rack
21+
* info.
22+
*/
23+
public class InvalidServerRackInfoException extends ApiException {
24+
public InvalidServerRackInfoException(String message) {
25+
super(message);
26+
}
27+
}

fluss-common/src/test/java/com/alibaba/fluss/cluster/BucketLocationTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@ void testToString() {
3131
TablePath tablePath = new TablePath("test_db", "test_table");
3232
int bucketId = 0;
3333
long tableId = 150001L;
34-
ServerNode leader = new ServerNode(0, "localhost", 9092, ServerType.TABLET_SERVER);
35-
ServerNode replica1 = new ServerNode(1, "localhost", 9093, ServerType.TABLET_SERVER);
36-
ServerNode replica2 = new ServerNode(2, "localhost", 9094, ServerType.TABLET_SERVER);
34+
ServerNode leader = new ServerNode(0, "localhost", 9092, ServerType.TABLET_SERVER, "rack0");
35+
ServerNode replica1 =
36+
new ServerNode(1, "localhost", 9093, ServerType.TABLET_SERVER, "rack1");
37+
ServerNode replica2 =
38+
new ServerNode(2, "localhost", 9094, ServerType.TABLET_SERVER, "rack2");
3739
ServerNode[] replicas = {leader, replica1, replica2};
3840
// TODO add isr and offline.
3941
BucketLocation bucketLocation =

0 commit comments

Comments
 (0)