Skip to content

Commit 05458a3

Browse files
authored
[common] Change Cluster#getRandomTabletServer() to really random retrieve tabletServer (#1719)
1 parent 79d9dbb commit 05458a3

File tree

2 files changed

+32
-5
lines changed

2 files changed

+32
-5
lines changed

fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Map;
3535
import java.util.Optional;
3636
import java.util.Set;
37+
import java.util.concurrent.ThreadLocalRandom;
3738

3839
/**
3940
* An immutable representation of a subset of the server nodes, tables, and buckets and schemas in
@@ -209,7 +210,12 @@ public ServerNode getTabletServer(int id) {
209210
public ServerNode getRandomTabletServer() {
210211
// TODO this method need to get one tablet server according to the load.
211212
List<ServerNode> serverNodes = new ArrayList<>(aliveTabletServersById.values());
212-
return !serverNodes.isEmpty() ? serverNodes.get(0) : null;
213+
if (serverNodes.isEmpty()) {
214+
return null;
215+
}
216+
217+
int index = ThreadLocalRandom.current().nextInt(serverNodes.size());
218+
return serverNodes.get(index);
213219
}
214220

215221
/** Get the list of available buckets for this table/partition. */

fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import java.util.Arrays;
2929
import java.util.Collections;
3030
import java.util.HashMap;
31+
import java.util.HashSet;
3132
import java.util.List;
3233
import java.util.Map;
34+
import java.util.Set;
3335

3436
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
3537
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
@@ -68,7 +70,7 @@ void setup() {
6870

6971
@Test
7072
void testReturnModifiableCollections() {
71-
Cluster cluster = createCluster();
73+
Cluster cluster = createCluster(aliveTabletServersById);
7274
assertThatThrownBy(() -> cluster.getAliveTabletServers().put(1, NODES[3]))
7375
.isInstanceOf(UnsupportedOperationException.class);
7476
assertThatThrownBy(
@@ -87,7 +89,7 @@ void testReturnModifiableCollections() {
8789

8890
@Test
8991
void testGetTable() {
90-
Cluster cluster = createCluster();
92+
Cluster cluster = createCluster(aliveTabletServersById);
9193
assertThat(cluster.getTable(DATA1_TABLE_PATH).get()).isEqualTo(DATA1_TABLE_INFO);
9294
assertThat(cluster.getTable(DATA2_TABLE_PATH).get()).isEqualTo(DATA2_TABLE_INFO);
9395
assertThat(cluster.getSchema(DATA1_TABLE_PATH).get())
@@ -98,7 +100,7 @@ void testGetTable() {
98100

99101
@Test
100102
void testInvalidMetaAndUpdate() {
101-
Cluster cluster = createCluster();
103+
Cluster cluster = createCluster(aliveTabletServersById);
102104
for (int i = 0; i < 10000; i++) {
103105
// mock invalid meta
104106
cluster =
@@ -130,7 +132,26 @@ void testInvalidMetaAndUpdate() {
130132
NODES_IDS)));
131133
}
132134

133-
private Cluster createCluster() {
135+
@Test
136+
void testGetRandomTabletServer() {
137+
Map<Integer, ServerNode> aliveTabletServersById = new HashMap<>();
138+
for (int i = 0; i < 10; i++) {
139+
aliveTabletServersById.put(
140+
i, new ServerNode(i, "localhost", 99 + i, ServerType.TABLET_SERVER));
141+
}
142+
Cluster cluster = createCluster(aliveTabletServersById);
143+
144+
Set<ServerNode> selectedNodes = new HashSet<>();
145+
for (int i = 0; i < 10; i++) {
146+
ServerNode serverNode = cluster.getRandomTabletServer();
147+
assertThat(serverNode).isNotNull();
148+
selectedNodes.add(serverNode);
149+
}
150+
151+
assertThat(selectedNodes).hasSizeGreaterThan(1);
152+
}
153+
154+
private Cluster createCluster(Map<Integer, ServerNode> aliveTabletServersById) {
134155
Map<PhysicalTablePath, List<BucketLocation>> tablePathToBucketLocations = new HashMap<>();
135156
tablePathToBucketLocations.put(
136157
DATA1_PHYSICAL_TABLE_PATH,

0 commit comments

Comments
 (0)