Skip to content

Commit b738677

Browse files
committed
support coordinator epoch1
1 parent 492bdc9 commit b738677

File tree

12 files changed

+196
-68
lines changed

12 files changed

+196
-68
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,8 +1012,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
10121012
}
10131013

10141014
try {
1015-
zooKeeperClient.batchUpdateLeaderAndIsr(
1016-
newLeaderAndIsrList, coordinatorContext.getCoordinatorEpoch());
1015+
zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList);
10171016
newLeaderAndIsrList.forEach(
10181017
(tableBucket, newLeaderAndIsr) ->
10191018
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
@@ -1024,8 +1023,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
10241023
TableBucket tableBucket = entry.getKey();
10251024
LeaderAndIsr newLeaderAndIsr = entry.getValue();
10261025
try {
1027-
zooKeeperClient.updateLeaderAndIsr(
1028-
tableBucket, newLeaderAndIsr, coordinatorContext.getCoordinatorEpoch());
1026+
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
10291027
} catch (Exception e) {
10301028
LOG.error("Error when register leader and isr.", e);
10311029
result.add(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,7 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
459459
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
460460
}
461461
try {
462-
zooKeeperClient.batchUpdateLeaderAndIsr(
463-
toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorEpoch());
462+
zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList);
464463
toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
465464
return adjustedLeaderAndIsr;
466465
} catch (Exception e) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,10 @@ private Optional<ElectionResult> initLeaderForTableBuckets(
309309
ElectionResult electionResult = optionalElectionResult.get();
310310
LeaderAndIsr leaderAndIsr = electionResult.leaderAndIsr;
311311
try {
312-
zooKeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
312+
zooKeeperClient.registerLeaderAndIsr(
313+
tableBucket,
314+
leaderAndIsr,
315+
coordinatorContext.getCoordinatorEpochZkVersion());
313316
} catch (Exception e) {
314317
LOG.error(
315318
"Fail to create state node for table bucket {} in zookeeper.",
@@ -445,7 +448,10 @@ private List<RegisterTableBucketLeadAndIsrInfo> tryRegisterLeaderAndIsrOneByOne(
445448
List<RegisterTableBucketLeadAndIsrInfo> registerSuccessList = new ArrayList<>();
446449
for (RegisterTableBucketLeadAndIsrInfo info : registerList) {
447450
try {
448-
zooKeeperClient.registerLeaderAndIsr(info.getTableBucket(), info.getLeaderAndIsr());
451+
zooKeeperClient.registerLeaderAndIsr(
452+
info.getTableBucket(),
453+
info.getLeaderAndIsr(),
454+
coordinatorContext.getCoordinatorEpochZkVersion());
449455
registerSuccessList.add(info);
450456
} catch (Exception e) {
451457
LOG.error(
@@ -487,10 +493,7 @@ private Optional<ElectionResult> electNewLeaderForTableBuckets(
487493
}
488494
ElectionResult electionResult = optionalElectionResult.get();
489495
try {
490-
zooKeeperClient.updateLeaderAndIsr(
491-
tableBucket,
492-
electionResult.leaderAndIsr,
493-
coordinatorContext.getCoordinatorEpoch());
496+
zooKeeperClient.updateLeaderAndIsr(tableBucket, electionResult.leaderAndIsr);
494497
} catch (Exception e) {
495498
LOG.error(
496499
"Fail to update bucket LeaderAndIsr for table bucket {}.",

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.config.ConfigOptions;
2222
import org.apache.fluss.config.Configuration;
23-
import org.apache.fluss.exception.InvalidCoordinatorException;
2423
import org.apache.fluss.metadata.PhysicalTablePath;
2524
import org.apache.fluss.metadata.ResolvedPartitionSpec;
2625
import org.apache.fluss.metadata.Schema;
@@ -75,6 +74,7 @@
7574
import org.apache.fluss.server.zk.data.ZkData.TableZNode;
7675
import org.apache.fluss.server.zk.data.ZkData.TablesZNode;
7776
import org.apache.fluss.server.zk.data.ZkData.WriterIdZNode;
77+
import org.apache.fluss.server.zk.data.ZkVersion;
7878
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
7979
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.BackgroundCallback;
8080
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.CuratorEvent;
@@ -112,6 +112,7 @@
112112

113113
import static java.util.stream.Collectors.toMap;
114114
import static org.apache.fluss.metadata.ResolvedPartitionSpec.fromPartitionName;
115+
import static org.apache.fluss.server.zk.ZooKeeperOp.multiRequest;
115116
import static org.apache.fluss.utils.Preconditions.checkNotNull;
116117

117118
/**
@@ -129,6 +130,7 @@ public class ZooKeeperClient implements AutoCloseable {
129130
private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper;
130131

131132
private final CuratorFramework zkClient;
133+
private final ZooKeeperOp zkOp;
132134
private final ZkSequenceIDCounter tableIdCounter;
133135
private final ZkSequenceIDCounter partitionIdCounter;
134136
private final ZkSequenceIDCounter writerIdCounter;
@@ -140,6 +142,7 @@ public ZooKeeperClient(
140142
Configuration configuration) {
141143
this.curatorFrameworkWrapper = curatorFrameworkWrapper;
142144
this.zkClient = curatorFrameworkWrapper.asCuratorFramework();
145+
this.zkOp = new ZooKeeperOp(zkClient);
143146
this.tableIdCounter = new ZkSequenceIDCounter(zkClient, TableSequenceIdZNode.path());
144147
this.partitionIdCounter =
145148
new ZkSequenceIDCounter(zkClient, PartitionSequenceIdZNode.path());
@@ -394,13 +397,14 @@ public void deletePartitionAssignment(long partitionId) throws Exception {
394397
// --------------------------------------------------------------------------------------------
395398

396399
/** Register bucket LeaderAndIsr to ZK. */
397-
public void registerLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr)
400+
public void registerLeaderAndIsr(
401+
TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int expectedZkVersion)
398402
throws Exception {
403+
399404
String path = LeaderAndIsrZNode.path(tableBucket);
400-
zkClient.create()
401-
.creatingParentsIfNeeded()
402-
.withMode(CreateMode.PERSISTENT)
403-
.forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr));
405+
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
406+
407+
createRecursive(path, data, expectedZkVersion, false);
404408
LOG.info("Registered {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
405409
}
406410

@@ -478,25 +482,14 @@ public Map<TableBucket, LeaderAndIsr> getLeaderAndIsrs(Collection<TableBucket> t
478482
"leader and isr");
479483
}
480484

481-
public void updateLeaderAndIsr(
482-
TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int currentCoordinatorEpoch)
485+
public void updateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr)
483486
throws Exception {
484-
// check coordinator epoch to ensure no other Coordinator leader exists.
485-
if (leaderAndIsr.coordinatorEpoch() != currentCoordinatorEpoch) {
486-
throw new InvalidCoordinatorException(
487-
String.format(
488-
"LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
489-
+ "This coordinator may no longer be the leader.",
490-
leaderAndIsr.coordinatorEpoch(), currentCoordinatorEpoch, tableBucket));
491-
}
492-
493487
String path = LeaderAndIsrZNode.path(tableBucket);
494488
zkClient.setData().forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr));
495489
LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
496490
}
497491

498-
public void batchUpdateLeaderAndIsr(
499-
Map<TableBucket, LeaderAndIsr> leaderAndIsrList, int currentCoordinatorEpoch)
492+
public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> leaderAndIsrList)
500493
throws Exception {
501494
if (leaderAndIsrList.isEmpty()) {
502495
return;
@@ -506,18 +499,6 @@ public void batchUpdateLeaderAndIsr(
506499
for (Map.Entry<TableBucket, LeaderAndIsr> entry : leaderAndIsrList.entrySet()) {
507500
TableBucket tableBucket = entry.getKey();
508501
LeaderAndIsr leaderAndIsr = entry.getValue();
509-
510-
// check coordinator epoch to ensure no other Coordinator leader exists.
511-
if (leaderAndIsr.coordinatorEpoch() != currentCoordinatorEpoch) {
512-
throw new InvalidCoordinatorException(
513-
String.format(
514-
"LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
515-
+ "This coordinator may no longer be the leader.",
516-
leaderAndIsr.coordinatorEpoch(),
517-
currentCoordinatorEpoch,
518-
tableBucket));
519-
}
520-
521502
LOG.info("Batch Update {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
522503
String path = LeaderAndIsrZNode.path(tableBucket);
523504
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
@@ -1609,4 +1590,48 @@ public static <K> Map<K, List<String>> processGetChildrenResponses(
16091590
}
16101591
return result;
16111592
}
1593+
1594+
public void createRecursive(
1595+
String path, byte[] data, int expectedZkVersion, boolean throwIfPathExists)
1596+
throws Exception {
1597+
List<CuratorOp> ops = wrapCheckAndCreateRequest(path, data, expectedZkVersion);
1598+
1599+
try {
1600+
// try to directly create
1601+
zkClient.transaction().forOperations(ops);
1602+
} catch (KeeperException.NodeExistsException e) {
1603+
// should not exist
1604+
if (throwIfPathExists) {
1605+
throw e;
1606+
}
1607+
} catch (KeeperException.NoNodeException e) {
1608+
// if parent does not exist, create parent first
1609+
int indexOfLastSlash = path.lastIndexOf("/");
1610+
if (indexOfLastSlash == -1) {
1611+
throw new IllegalArgumentException("Invalid path {}" + path);
1612+
}
1613+
String parentPath = path.substring(0, indexOfLastSlash);
1614+
createRecursive(parentPath, null, expectedZkVersion, throwIfPathExists);
1615+
// After creating parent, retry creating the original path
1616+
zkClient.transaction().forOperations(ops);
1617+
}
1618+
}
1619+
1620+
public List<CuratorOp> wrapCheckAndCreateRequest(
1621+
String path, byte[] data, int expectedZkVersion) throws Exception {
1622+
if (ZkVersion.MATCH_ANY_VERSION.getVersion() == expectedZkVersion) {
1623+
return Collections.singletonList(zkOp.createOp(path, data, CreateMode.PERSISTENT));
1624+
} else if (expectedZkVersion >= 0) {
1625+
CuratorOp checkOp =
1626+
zkOp.checkOp(ZkData.CoordinatorEpochZNode.path(), expectedZkVersion);
1627+
CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT);
1628+
return multiRequest(checkOp, createOp);
1629+
} else {
1630+
throw new IllegalArgumentException(
1631+
"Expected coordinator epoch zkVersion "
1632+
+ expectedZkVersion
1633+
+ " should be non-negative or equal to "
1634+
+ ZkVersion.MATCH_ANY_VERSION.getVersion());
1635+
}
1636+
}
16121637
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.zk;
19+
20+
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
21+
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.transaction.CuratorOp;
22+
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
27+
/** This class contains some utility methods for wrap/unwrap operations for Zookeeper. */
28+
public class ZooKeeperOp {
29+
private final CuratorFramework zkClient;
30+
31+
public ZooKeeperOp(CuratorFramework zkClient) {
32+
this.zkClient = zkClient;
33+
}
34+
35+
public CuratorOp checkOp(String path, int expectZkVersion) throws Exception {
36+
return zkClient.transactionOp().check().withVersion(expectZkVersion).forPath(path);
37+
}
38+
39+
public CuratorOp createOp(String path, byte[] data, CreateMode createMode) throws Exception {
40+
return zkClient.transactionOp().create().withMode(createMode).forPath(path, data);
41+
}
42+
43+
public static List<CuratorOp> multiRequest(CuratorOp op1, CuratorOp op2) {
44+
List<CuratorOp> ops = new ArrayList<>();
45+
ops.add(op1);
46+
ops.add(op2);
47+
return ops;
48+
}
49+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.zk.data;
20+
21+
/** Enum to represent the type of special Zookeeper version. */
22+
public enum ZkVersion {
23+
MATCH_ANY_VERSION(-1),
24+
UNKNOWN_VERSION(-2);
25+
26+
private final int version;
27+
28+
ZkVersion(int version) {
29+
this.version = version;
30+
}
31+
32+
public int getVersion() {
33+
return version;
34+
}
35+
}

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class CoordinatorServerTest extends ServerTestBase {
4040
@BeforeEach
4141
void beforeEach() throws Exception {
4242
coordinatorServer = startCoordinatorServer(createConfiguration());
43-
waitUtilCoordinatorServerElected();
43+
waitUntilCoordinatorServerElected();
4444
}
4545

4646
@AfterEach
@@ -75,7 +75,7 @@ protected void checkAfterStartServer() throws Exception {
7575
coordinatorServer.getRpcServer().getBindEndpoints());
7676
}
7777

78-
public void waitUtilCoordinatorServerElected() {
78+
public void waitUntilCoordinatorServerElected() {
7979
waitUntil(
8080
() -> zookeeperClient.getCoordinatorLeaderAddress().isPresent(),
8181
Duration.ofSeconds(10),

fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.fluss.server.zk.ZooKeeperClient;
3838
import org.apache.fluss.server.zk.ZooKeeperExtension;
3939
import org.apache.fluss.server.zk.data.LeaderAndIsr;
40+
import org.apache.fluss.server.zk.data.ZkVersion;
4041
import org.apache.fluss.testutils.common.AllCallbackWrapper;
4142

4243
import org.junit.jupiter.api.BeforeAll;
@@ -214,7 +215,8 @@ void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception {
214215
}
215216
// put leader and isr
216217
LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0);
217-
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
218+
zookeeperClient.registerLeaderAndIsr(
219+
tableBucket, leaderAndIsr, ZkVersion.MATCH_ANY_VERSION.getVersion());
218220
coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2));
219221
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
220222

@@ -250,7 +252,8 @@ void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception {
250252
}
251253
// put leader and isr
252254
LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0);
253-
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
255+
zookeeperClient.registerLeaderAndIsr(
256+
tableBucket, leaderAndIsr, ZkVersion.MATCH_ANY_VERSION.getVersion());
254257
coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2));
255258
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
256259

fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.fluss.server.zk.ZooKeeperClient;
4242
import org.apache.fluss.server.zk.ZooKeeperExtension;
4343
import org.apache.fluss.server.zk.data.LeaderAndIsr;
44+
import org.apache.fluss.server.zk.data.ZkVersion;
4445
import org.apache.fluss.shaded.guava32.com.google.common.collect.Sets;
4546
import org.apache.fluss.testutils.common.AllCallbackWrapper;
4647
import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
@@ -140,9 +141,13 @@ void testStartup() throws Exception {
140141

141142
// create LeaderAndIsr for t10/t11 info in zk,
142143
zookeeperClient.registerLeaderAndIsr(
143-
new TableBucket(t1Id, 0), new LeaderAndIsr(0, 0, Arrays.asList(0, 1), 0, 0));
144+
new TableBucket(t1Id, 0),
145+
new LeaderAndIsr(0, 0, Arrays.asList(0, 1), 0, 0),
146+
ZkVersion.MATCH_ANY_VERSION.getVersion());
144147
zookeeperClient.registerLeaderAndIsr(
145-
new TableBucket(t1Id, 1), new LeaderAndIsr(2, 0, Arrays.asList(2, 3), 0, 0));
148+
new TableBucket(t1Id, 1),
149+
new LeaderAndIsr(2, 0, Arrays.asList(2, 3), 0, 0),
150+
ZkVersion.MATCH_ANY_VERSION.getVersion());
146151
// update the LeaderAndIsr to context
147152
coordinatorContext.putBucketLeaderAndIsr(
148153
t1b0, zookeeperClient.getLeaderAndIsr(new TableBucket(t1Id, 0)).get());

0 commit comments

Comments
 (0)