Skip to content

Commit 6d9e910

Browse files
committed
support coordinator epoch1
1 parent 492bdc9 commit 6d9e910

File tree

12 files changed

+229
-61
lines changed

12 files changed

+229
-61
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,7 +1013,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
10131013

10141014
try {
10151015
zooKeeperClient.batchUpdateLeaderAndIsr(
1016-
newLeaderAndIsrList, coordinatorContext.getCoordinatorEpoch());
1016+
newLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion());
10171017
newLeaderAndIsrList.forEach(
10181018
(tableBucket, newLeaderAndIsr) ->
10191019
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
@@ -1025,7 +1025,9 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
10251025
LeaderAndIsr newLeaderAndIsr = entry.getValue();
10261026
try {
10271027
zooKeeperClient.updateLeaderAndIsr(
1028-
tableBucket, newLeaderAndIsr, coordinatorContext.getCoordinatorEpoch());
1028+
tableBucket,
1029+
newLeaderAndIsr,
1030+
coordinatorContext.getCoordinatorEpochZkVersion());
10291031
} catch (Exception e) {
10301032
LOG.error("Error when register leader and isr.", e);
10311033
result.add(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
460460
}
461461
try {
462462
zooKeeperClient.batchUpdateLeaderAndIsr(
463-
toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorEpoch());
463+
toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion());
464464
toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
465465
return adjustedLeaderAndIsr;
466466
} catch (Exception e) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,10 @@ private Optional<ElectionResult> initLeaderForTableBuckets(
309309
ElectionResult electionResult = optionalElectionResult.get();
310310
LeaderAndIsr leaderAndIsr = electionResult.leaderAndIsr;
311311
try {
312-
zooKeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
312+
zooKeeperClient.registerLeaderAndIsr(
313+
tableBucket,
314+
leaderAndIsr,
315+
coordinatorContext.getCoordinatorEpochZkVersion());
313316
} catch (Exception e) {
314317
LOG.error(
315318
"Fail to create state node for table bucket {} in zookeeper.",
@@ -445,7 +448,10 @@ private List<RegisterTableBucketLeadAndIsrInfo> tryRegisterLeaderAndIsrOneByOne(
445448
List<RegisterTableBucketLeadAndIsrInfo> registerSuccessList = new ArrayList<>();
446449
for (RegisterTableBucketLeadAndIsrInfo info : registerList) {
447450
try {
448-
zooKeeperClient.registerLeaderAndIsr(info.getTableBucket(), info.getLeaderAndIsr());
451+
zooKeeperClient.registerLeaderAndIsr(
452+
info.getTableBucket(),
453+
info.getLeaderAndIsr(),
454+
coordinatorContext.getCoordinatorEpochZkVersion());
449455
registerSuccessList.add(info);
450456
} catch (Exception e) {
451457
LOG.error(
@@ -490,7 +496,7 @@ private Optional<ElectionResult> electNewLeaderForTableBuckets(
490496
zooKeeperClient.updateLeaderAndIsr(
491497
tableBucket,
492498
electionResult.leaderAndIsr,
493-
coordinatorContext.getCoordinatorEpoch());
499+
coordinatorContext.getCoordinatorEpochZkVersion());
494500
} catch (Exception e) {
495501
LOG.error(
496502
"Fail to update bucket LeaderAndIsr for table bucket {}.",

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 74 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.config.ConfigOptions;
2222
import org.apache.fluss.config.Configuration;
23-
import org.apache.fluss.exception.InvalidCoordinatorException;
2423
import org.apache.fluss.metadata.PhysicalTablePath;
2524
import org.apache.fluss.metadata.ResolvedPartitionSpec;
2625
import org.apache.fluss.metadata.Schema;
@@ -75,6 +74,7 @@
7574
import org.apache.fluss.server.zk.data.ZkData.TableZNode;
7675
import org.apache.fluss.server.zk.data.ZkData.TablesZNode;
7776
import org.apache.fluss.server.zk.data.ZkData.WriterIdZNode;
77+
import org.apache.fluss.server.zk.data.ZkVersion;
7878
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
7979
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.BackgroundCallback;
8080
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.CuratorEvent;
@@ -112,6 +112,7 @@
112112

113113
import static java.util.stream.Collectors.toMap;
114114
import static org.apache.fluss.metadata.ResolvedPartitionSpec.fromPartitionName;
115+
import static org.apache.fluss.server.zk.ZooKeeperOp.multiRequest;
115116
import static org.apache.fluss.utils.Preconditions.checkNotNull;
116117

117118
/**
@@ -129,6 +130,7 @@ public class ZooKeeperClient implements AutoCloseable {
129130
private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper;
130131

131132
private final CuratorFramework zkClient;
133+
private final ZooKeeperOp zkOp;
132134
private final ZkSequenceIDCounter tableIdCounter;
133135
private final ZkSequenceIDCounter partitionIdCounter;
134136
private final ZkSequenceIDCounter writerIdCounter;
@@ -140,6 +142,7 @@ public ZooKeeperClient(
140142
Configuration configuration) {
141143
this.curatorFrameworkWrapper = curatorFrameworkWrapper;
142144
this.zkClient = curatorFrameworkWrapper.asCuratorFramework();
145+
this.zkOp = new ZooKeeperOp(zkClient);
143146
this.tableIdCounter = new ZkSequenceIDCounter(zkClient, TableSequenceIdZNode.path());
144147
this.partitionIdCounter =
145148
new ZkSequenceIDCounter(zkClient, PartitionSequenceIdZNode.path());
@@ -394,13 +397,14 @@ public void deletePartitionAssignment(long partitionId) throws Exception {
394397
// --------------------------------------------------------------------------------------------
395398

396399
/** Register bucket LeaderAndIsr to ZK. */
397-
public void registerLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr)
400+
public void registerLeaderAndIsr(
401+
TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int expectedZkVersion)
398402
throws Exception {
403+
399404
String path = LeaderAndIsrZNode.path(tableBucket);
400-
zkClient.create()
401-
.creatingParentsIfNeeded()
402-
.withMode(CreateMode.PERSISTENT)
403-
.forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr));
405+
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
406+
407+
createRecursive(path, data, expectedZkVersion, false);
404408
LOG.info("Registered {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
405409
}
406410

@@ -479,24 +483,20 @@ public Map<TableBucket, LeaderAndIsr> getLeaderAndIsrs(Collection<TableBucket> t
479483
}
480484

481485
public void updateLeaderAndIsr(
482-
TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int currentCoordinatorEpoch)
486+
TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int expectedZkVersion)
483487
throws Exception {
484-
// check coordinator epoch to ensure no other Coordinator leader exists.
485-
if (leaderAndIsr.coordinatorEpoch() != currentCoordinatorEpoch) {
486-
throw new InvalidCoordinatorException(
487-
String.format(
488-
"LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
489-
+ "This coordinator may no longer be the leader.",
490-
leaderAndIsr.coordinatorEpoch(), currentCoordinatorEpoch, tableBucket));
491-
}
492-
493488
String path = LeaderAndIsrZNode.path(tableBucket);
494-
zkClient.setData().forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr));
489+
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
490+
491+
CuratorOp updateOp = zkOp.updateOp(path, data);
492+
List<CuratorOp> ops = wrapRequestWithCoordinatorEpochCheck(updateOp, expectedZkVersion);
493+
494+
zkClient.transaction().forOperations(ops);
495495
LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
496496
}
497497

498498
public void batchUpdateLeaderAndIsr(
499-
Map<TableBucket, LeaderAndIsr> leaderAndIsrList, int currentCoordinatorEpoch)
499+
Map<TableBucket, LeaderAndIsr> leaderAndIsrList, int expectedZkVersion)
500500
throws Exception {
501501
if (leaderAndIsrList.isEmpty()) {
502502
return;
@@ -506,30 +506,21 @@ public void batchUpdateLeaderAndIsr(
506506
for (Map.Entry<TableBucket, LeaderAndIsr> entry : leaderAndIsrList.entrySet()) {
507507
TableBucket tableBucket = entry.getKey();
508508
LeaderAndIsr leaderAndIsr = entry.getValue();
509-
510-
// check coordinator epoch to ensure no other Coordinator leader exists.
511-
if (leaderAndIsr.coordinatorEpoch() != currentCoordinatorEpoch) {
512-
throw new InvalidCoordinatorException(
513-
String.format(
514-
"LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
515-
+ "This coordinator may no longer be the leader.",
516-
leaderAndIsr.coordinatorEpoch(),
517-
currentCoordinatorEpoch,
518-
tableBucket));
519-
}
520-
521509
LOG.info("Batch Update {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
522510
String path = LeaderAndIsrZNode.path(tableBucket);
523511
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
524512
CuratorOp updateOp = zkClient.transactionOp().setData().forPath(path, data);
525513
ops.add(updateOp);
526-
if (ops.size() == MAX_BATCH_SIZE) {
527-
zkClient.transaction().forOperations(ops);
514+
if (ops.size() == MAX_BATCH_SIZE - 1) {
515+
List<CuratorOp> wrapOps =
516+
wrapRequestsWithCoordinatorEpochCheck(ops, expectedZkVersion);
517+
zkClient.transaction().forOperations(wrapOps);
528518
ops.clear();
529519
}
530520
}
531521
if (!ops.isEmpty()) {
532-
zkClient.transaction().forOperations(ops);
522+
List<CuratorOp> wrapOps = wrapRequestsWithCoordinatorEpochCheck(ops, expectedZkVersion);
523+
zkClient.transaction().forOperations(wrapOps);
533524
}
534525
}
535526

@@ -1609,4 +1600,54 @@ public static <K> Map<K, List<String>> processGetChildrenResponses(
16091600
}
16101601
return result;
16111602
}
1603+
1604+
public void createRecursive(
1605+
String path, byte[] data, int expectedZkVersion, boolean throwIfPathExists)
1606+
throws Exception {
1607+
CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT);
1608+
List<CuratorOp> ops = wrapRequestWithCoordinatorEpochCheck(createOp, expectedZkVersion);
1609+
1610+
try {
1611+
// try to directly create
1612+
zkClient.transaction().forOperations(ops);
1613+
} catch (KeeperException.NodeExistsException e) {
1614+
// should not exist
1615+
if (throwIfPathExists) {
1616+
throw e;
1617+
}
1618+
} catch (KeeperException.NoNodeException e) {
1619+
// if parent does not exist, create parent first
1620+
int indexOfLastSlash = path.lastIndexOf("/");
1621+
if (indexOfLastSlash == -1) {
1622+
throw new IllegalArgumentException("Invalid path {}" + path);
1623+
}
1624+
String parentPath = path.substring(0, indexOfLastSlash);
1625+
createRecursive(parentPath, null, expectedZkVersion, throwIfPathExists);
1626+
// After creating parent, retry creating the original path
1627+
zkClient.transaction().forOperations(ops);
1628+
}
1629+
}
1630+
1631+
public List<CuratorOp> wrapRequestWithCoordinatorEpochCheck(
1632+
CuratorOp request, int expectedZkVersion) throws Exception {
1633+
return wrapRequestsWithCoordinatorEpochCheck(
1634+
Collections.singletonList(request), expectedZkVersion);
1635+
}
1636+
1637+
public List<CuratorOp> wrapRequestsWithCoordinatorEpochCheck(
1638+
List<CuratorOp> requestList, int expectedZkVersion) throws Exception {
1639+
if (ZkVersion.MATCH_ANY_VERSION.getVersion() == expectedZkVersion) {
1640+
return requestList;
1641+
} else if (expectedZkVersion >= 0) {
1642+
CuratorOp checkOp =
1643+
zkOp.checkOp(ZkData.CoordinatorEpochZNode.path(), expectedZkVersion);
1644+
return multiRequest(checkOp, requestList);
1645+
} else {
1646+
throw new IllegalArgumentException(
1647+
"Expected coordinator epoch zkVersion "
1648+
+ expectedZkVersion
1649+
+ " should be non-negative or equal to "
1650+
+ ZkVersion.MATCH_ANY_VERSION.getVersion());
1651+
}
1652+
}
16121653
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.zk;
19+
20+
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
21+
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.transaction.CuratorOp;
22+
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
27+
/** This class contains some utility methods for wrap/unwrap operations for Zookeeper. */
28+
public class ZooKeeperOp {
29+
private final CuratorFramework zkClient;
30+
31+
public ZooKeeperOp(CuratorFramework zkClient) {
32+
this.zkClient = zkClient;
33+
}
34+
35+
public CuratorOp checkOp(String path, int expectZkVersion) throws Exception {
36+
return zkClient.transactionOp().check().withVersion(expectZkVersion).forPath(path);
37+
}
38+
39+
public CuratorOp createOp(String path, byte[] data, CreateMode createMode) throws Exception {
40+
return zkClient.transactionOp().create().withMode(createMode).forPath(path, data);
41+
}
42+
43+
public CuratorOp updateOp(String path, byte[] data) throws Exception {
44+
return zkClient.transactionOp().setData().forPath(path, data);
45+
}
46+
47+
public static List<CuratorOp> multiRequest(CuratorOp op1, CuratorOp op2) {
48+
List<CuratorOp> ops = new ArrayList<>();
49+
ops.add(op1);
50+
ops.add(op2);
51+
return ops;
52+
}
53+
54+
public static List<CuratorOp> multiRequest(CuratorOp op, List<CuratorOp> ops) {
55+
List<CuratorOp> result = new ArrayList<>();
56+
result.add(op);
57+
result.addAll(ops);
58+
return result;
59+
}
60+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.zk.data;
20+
21+
/** Enum to represent the type of special Zookeeper version. */
22+
public enum ZkVersion {
23+
MATCH_ANY_VERSION(-1),
24+
UNKNOWN_VERSION(-2);
25+
26+
private final int version;
27+
28+
ZkVersion(int version) {
29+
this.version = version;
30+
}
31+
32+
public int getVersion() {
33+
return version;
34+
}
35+
}

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class CoordinatorServerTest extends ServerTestBase {
4040
@BeforeEach
4141
void beforeEach() throws Exception {
4242
coordinatorServer = startCoordinatorServer(createConfiguration());
43-
waitUtilCoordinatorServerElected();
43+
waitUntilCoordinatorServerElected();
4444
}
4545

4646
@AfterEach
@@ -75,7 +75,7 @@ protected void checkAfterStartServer() throws Exception {
7575
coordinatorServer.getRpcServer().getBindEndpoints());
7676
}
7777

78-
public void waitUtilCoordinatorServerElected() {
78+
public void waitUntilCoordinatorServerElected() {
7979
waitUntil(
8080
() -> zookeeperClient.getCoordinatorLeaderAddress().isPresent(),
8181
Duration.ofSeconds(10),

fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.fluss.server.zk.ZooKeeperClient;
3838
import org.apache.fluss.server.zk.ZooKeeperExtension;
3939
import org.apache.fluss.server.zk.data.LeaderAndIsr;
40+
import org.apache.fluss.server.zk.data.ZkVersion;
4041
import org.apache.fluss.testutils.common.AllCallbackWrapper;
4142

4243
import org.junit.jupiter.api.BeforeAll;
@@ -214,7 +215,8 @@ void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception {
214215
}
215216
// put leader and isr
216217
LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0);
217-
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
218+
zookeeperClient.registerLeaderAndIsr(
219+
tableBucket, leaderAndIsr, ZkVersion.MATCH_ANY_VERSION.getVersion());
218220
coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2));
219221
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
220222

@@ -250,7 +252,8 @@ void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception {
250252
}
251253
// put leader and isr
252254
LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0);
253-
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
255+
zookeeperClient.registerLeaderAndIsr(
256+
tableBucket, leaderAndIsr, ZkVersion.MATCH_ANY_VERSION.getVersion());
254257
coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2));
255258
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
256259

0 commit comments

Comments
 (0)