Skip to content

Commit 8c1940a

Browse files
committed
support coordinator epoch2
1 parent 175d422 commit 8c1940a

File tree

2 files changed

+32
-18
lines changed

2 files changed

+32
-18
lines changed

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1675,20 +1675,21 @@ public void createRecursiveWithEpochCheck(
16751675
throw new IllegalArgumentException("Invalid path: " + path);
16761676
} else if (indexOfLastSlash == 0) {
16771677
// root path can be directly create without fence
1678-
zkClient.create()
1679-
.creatingParentsIfNeeded()
1680-
.withMode(CreateMode.PERSISTENT)
1681-
.forPath(path);
1682-
}
1683-
// If indexOfLastSlash is 0, it means the parent is root "/" which should already exist
1684-
// We should not try to create it, just retry creating the current path
1685-
if (indexOfLastSlash > 0) {
1678+
try {
1679+
zkClient.create()
1680+
.creatingParentsIfNeeded()
1681+
.withMode(CreateMode.PERSISTENT)
1682+
.forPath(path);
1683+
} catch (KeeperException.NodeExistsException ignored) {
1684+
}
1685+
} else {
1686+
// indexOfLastSlash > 0
16861687
String parentPath = path.substring(0, indexOfLastSlash);
16871688
createRecursiveWithEpochCheck(
16881689
parentPath, null, expectedZkVersion, throwIfPathExists);
1690+
// After creating parent (or if parent is root), retry creating the original path
1691+
zkClient.transaction().forOperations(ops);
16891692
}
1690-
// After creating parent (or if parent is root), retry creating the original path
1691-
zkClient.transaction().forOperations(ops);
16921693
}
16931694
}
16941695

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,18 @@ void testCoordinatorServerElection() throws Exception {
8383
CoordinatorAddress firstLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get();
8484

8585
// Find the leader and try to restart it.
86-
CoordinatorServer elected = null;
86+
CoordinatorServer firstLeader = null;
8787
for (CoordinatorServer coordinatorServer : coordinatorServerList) {
8888
if (coordinatorServer.getServerId() == firstLeaderAddress.getId()) {
89-
elected = coordinatorServer;
89+
firstLeader = coordinatorServer;
9090
break;
9191
}
9292
}
93-
assertThat(elected).isNotNull();
93+
assertThat(firstLeader).isNotNull();
9494
assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch())
9595
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH);
96-
elected.close();
97-
elected.start();
96+
firstLeader.close();
97+
firstLeader.start();
9898

9999
// Then we should get another Coordinator server leader elected
100100
waitUntilCoordinatorServerReelected(firstLeaderAddress);
@@ -104,12 +104,25 @@ void testCoordinatorServerElection() throws Exception {
104104
assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch())
105105
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 1);
106106

107-
// kill other 2 coordinator servers except the first one
107+
CoordinatorServer secondLeader = null;
108+
for (CoordinatorServer coordinatorServer : coordinatorServerList) {
109+
if (coordinatorServer.getServerId() == secondLeaderAddress.getId()) {
110+
secondLeader = coordinatorServer;
111+
break;
112+
}
113+
}
114+
CoordinatorServer nonLeader = null;
108115
for (CoordinatorServer coordinatorServer : coordinatorServerList) {
109-
if (coordinatorServer.getServerId() != firstLeaderAddress.getId()) {
110-
coordinatorServer.close();
116+
if (coordinatorServer.getServerId() != firstLeaderAddress.getId()
117+
&& coordinatorServer.getServerId() != secondLeaderAddress.getId()) {
118+
nonLeader = coordinatorServer;
119+
break;
111120
}
112121
}
122+
// kill other 2 coordinator servers except the first one
123+
nonLeader.close();
124+
secondLeader.close();
125+
113126
// the origin coordinator server should become leader again
114127
waitUntilCoordinatorServerReelected(secondLeaderAddress);
115128
CoordinatorAddress thirdLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get();

0 commit comments

Comments
 (0)