Skip to content

Commit 641614f

Browse files
committed
support coordinator epoch2
1 parent 00814b9 commit 641614f

File tree

8 files changed

+168
-57
lines changed

8 files changed

+168
-57
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.fluss.server.zk.data.PartitionAssignment;
5252
import org.apache.fluss.server.zk.data.TableAssignment;
5353
import org.apache.fluss.server.zk.data.TableRegistration;
54+
import org.apache.fluss.server.zk.data.ZkVersion;
5455
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
5556
import org.apache.fluss.utils.function.RunnableWithException;
5657
import org.apache.fluss.utils.function.ThrowingRunnable;
@@ -239,7 +240,9 @@ public void completeDeleteTable(long tableId) {
239240
// delete bucket assignments node, which will also delete the bucket state node,
240241
// so that all the zk nodes related to this table are deleted.
241242
rethrowIfIsNotNoNodeException(
242-
() -> zookeeperClient.deleteTableAssignment(tableId),
243+
() ->
244+
zookeeperClient.deleteTableAssignment(
245+
tableId, ZkVersion.MATCH_ANY_VERSION.getVersion()),
243246
String.format("Delete tablet assignment meta fail for table %s.", tableId));
244247
}
245248

@@ -248,7 +251,9 @@ public void completeDeletePartition(long partitionId) {
248251
// delete partition assignments node, which will also delete the bucket state node,
249252
// so that all the zk nodes related to this partition are deleted.
250253
rethrowIfIsNotNoNodeException(
251-
() -> zookeeperClient.deletePartitionAssignment(partitionId),
254+
() ->
255+
zookeeperClient.deletePartitionAssignment(
256+
partitionId, ZkVersion.MATCH_ANY_VERSION.getVersion()),
252257
String.format("Delete tablet assignment meta fail for partition %s.", partitionId));
253258
}
254259

@@ -301,7 +306,8 @@ public long createTable(
301306
long tableId = zookeeperClient.getTableIdAndIncrement();
302307
if (tableAssignment != null) {
303308
// register table assignment
304-
zookeeperClient.registerTableAssignment(tableId, tableAssignment);
309+
zookeeperClient.registerTableAssignment(
310+
tableId, tableAssignment, ZkVersion.MATCH_ANY_VERSION.getVersion());
305311
}
306312
// register the table
307313
zookeeperClient.registerTable(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,8 @@ public void batchHandleOnlineChangeAndInitLeader(Set<TableBucket> tableBuckets)
373373
if (!tableBucketLeadAndIsrInfos.isEmpty()) {
374374
try {
375375
zooKeeperClient.batchRegisterLeaderAndIsrForTablePartition(
376-
tableBucketLeadAndIsrInfos);
376+
tableBucketLeadAndIsrInfos,
377+
coordinatorContext.getCoordinatorEpochZkVersion());
377378
registerSuccessList.addAll(tableBucketLeadAndIsrInfos);
378379
} catch (Exception e) {
379380
LOG.error(
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.zk;
20+
21+
/** Class for coordinator epoch and coordinator epoch zk version. */
22+
public class ZkEpoch {
23+
private final int coordinatorEpoch;
24+
private final int coordinatorEpochZkVersion;
25+
26+
public ZkEpoch(int coordinatorEpoch, int coordinatorEpochZkVersion) {
27+
this.coordinatorEpoch = coordinatorEpoch;
28+
this.coordinatorEpochZkVersion = coordinatorEpochZkVersion;
29+
}
30+
31+
public int getCoordinatorEpoch() {
32+
return coordinatorEpoch;
33+
}
34+
35+
public int getCoordinatorEpochZkVersion() {
36+
return coordinatorEpochZkVersion;
37+
}
38+
}

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 89 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,13 @@
118118
/**
119119
* This class includes methods for write/read various metadata (leader address, tablet server
120120
* registration, table assignment, table, schema) in Zookeeper.
121+
*
122+
* <p>In some method, 'expectedZkVersion' is used to execute an epoch Zookeeper version check. We
123+
* have the following principals to judge if it's necessary to execute epoch Zookeeper version
124+
* check. If all condition met, we need to execute epoch Zookeeper version check. 1. The method
125+
* create/modify/delete Zk node. 2. It's executed by coordinator server. 3. It is about
126+
* metadata(table/partition/leaderAndIsr) rather than server info or ACL info. 4. The Zk node is
127+
* persistent rather than ephemeral.
121128
*/
122129
@Internal
123130
public class ZooKeeperClient implements AutoCloseable {
@@ -182,9 +189,9 @@ public Optional<Integer> fenceBecomeCoordinatorLeader(int coordinatorId) throws
182189
ensureEpochZnodeExists();
183190

184191
try {
185-
Tuple2<Integer, Integer> getEpoch = getCurrentEpoch();
186-
int currentEpoch = getEpoch.f0;
187-
int currentVersion = getEpoch.f1;
192+
ZkEpoch getEpoch = getCurrentEpoch();
193+
int currentEpoch = getEpoch.getCoordinatorEpoch();
194+
int currentVersion = getEpoch.getCoordinatorEpochZkVersion();
188195
int newEpoch = currentEpoch + 1;
189196
LOG.info(
190197
"Coordinator leader {} tries to update epoch. Current epoch={}, Zookeeper version={}, new epoch={}",
@@ -254,15 +261,15 @@ public void ensureEpochZnodeExists() throws Exception {
254261
}
255262

256263
/** Get epoch now in ZK. */
257-
public Tuple2<Integer, Integer> getCurrentEpoch() throws Exception {
264+
public ZkEpoch getCurrentEpoch() throws Exception {
258265
Stat currentStat = new Stat();
259266
byte[] bytes =
260267
zkClient.getData()
261268
.storingStatIn(currentStat)
262269
.forPath(ZkData.CoordinatorEpochZNode.path());
263270
int currentEpoch = ZkData.CoordinatorEpochZNode.decode(bytes);
264271
int currentVersion = currentStat.getVersion();
265-
return new Tuple2<>(currentEpoch, currentVersion);
272+
return new ZkEpoch(currentEpoch, currentVersion);
266273
}
267274

268275
// --------------------------------------------------------------------------------------------
@@ -318,13 +325,13 @@ public int[] getSortedTabletServerList() throws Exception {
318325
// --------------------------------------------------------------------------------------------
319326

320327
/** Register table assignment to ZK. */
321-
public void registerTableAssignment(long tableId, TableAssignment tableAssignment)
322-
throws Exception {
328+
public void registerTableAssignment(
329+
long tableId, TableAssignment tableAssignment, int expectedZkVersion) throws Exception {
323330
String path = TableIdZNode.path(tableId);
324-
zkClient.create()
325-
.creatingParentsIfNeeded()
326-
.withMode(CreateMode.PERSISTENT)
327-
.forPath(path, TableIdZNode.encode(tableAssignment));
331+
byte[] data = TableIdZNode.encode(tableAssignment);
332+
333+
createRecursiveWithEpochCheck(path, data, expectedZkVersion, false);
334+
328335
LOG.info("Registered table assignment {} for table id {}.", tableAssignment, tableId);
329336
}
330337

@@ -373,22 +380,27 @@ public Map<Long, PartitionAssignment> getPartitionsAssignments(Collection<Long>
373380
"partition assignment");
374381
}
375382

376-
public void updateTableAssignment(long tableId, TableAssignment tableAssignment)
377-
throws Exception {
383+
public void updateTableAssignment(
384+
long tableId, TableAssignment tableAssignment, int expectedZkVersion) throws Exception {
378385
String path = TableIdZNode.path(tableId);
379-
zkClient.setData().forPath(path, TableIdZNode.encode(tableAssignment));
386+
byte[] data = TableIdZNode.encode(tableAssignment);
387+
CuratorOp updateOp = zkOp.updateOp(path, data);
388+
List<CuratorOp> ops = wrapRequestWithEpochCheck(updateOp, expectedZkVersion);
389+
390+
zkClient.transaction().forOperations(ops);
380391
LOG.info("Updated table assignment {} for table id {}.", tableAssignment, tableId);
381392
}
382393

383-
public void deleteTableAssignment(long tableId) throws Exception {
394+
public void deleteTableAssignment(long tableId, int expectedZkVersion) throws Exception {
384395
String path = TableIdZNode.path(tableId);
385-
zkClient.delete().deletingChildrenIfNeeded().forPath(path);
396+
deleteRecursiveWithEpochCheck(path, expectedZkVersion, false);
386397
LOG.info("Deleted table assignment for table id {}.", tableId);
387398
}
388399

389-
public void deletePartitionAssignment(long partitionId) throws Exception {
400+
public void deletePartitionAssignment(long partitionId, int expectedZkVersion)
401+
throws Exception {
390402
String path = PartitionIdZNode.path(partitionId);
391-
zkClient.delete().deletingChildrenIfNeeded().forPath(path);
403+
deleteRecursiveWithEpochCheck(path, expectedZkVersion, false);
392404
LOG.info("Deleted table assignment for partition id {}.", partitionId);
393405
}
394406

@@ -404,12 +416,13 @@ public void registerLeaderAndIsr(
404416
String path = LeaderAndIsrZNode.path(tableBucket);
405417
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
406418

407-
createRecursive(path, data, expectedZkVersion, false);
419+
createRecursiveWithEpochCheck(path, data, expectedZkVersion, false);
408420
LOG.info("Registered {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
409421
}
410422

411423
public void batchRegisterLeaderAndIsrForTablePartition(
412-
List<RegisterTableBucketLeadAndIsrInfo> registerList) throws Exception {
424+
List<RegisterTableBucketLeadAndIsrInfo> registerList, int expectedZkVersion)
425+
throws Exception {
413426
if (registerList.isEmpty()) {
414427
return;
415428
}
@@ -445,12 +458,14 @@ public void batchRegisterLeaderAndIsrForTablePartition(
445458
ops.add(parentNodeCreate);
446459
ops.add(currentNodeCreate);
447460
if (ops.size() == MAX_BATCH_SIZE) {
448-
zkClient.transaction().forOperations(ops);
461+
List<CuratorOp> wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion);
462+
zkClient.transaction().forOperations(wrapOps);
449463
ops.clear();
450464
}
451465
}
452466
if (!ops.isEmpty()) {
453-
zkClient.transaction().forOperations(ops);
467+
List<CuratorOp> wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion);
468+
zkClient.transaction().forOperations(wrapOps);
454469
}
455470
LOG.info(
456471
"Batch registered leadAndIsr for tableId: {}, partitionId: {}, partitionName: {} in Zookeeper.",
@@ -489,7 +504,7 @@ public void updateLeaderAndIsr(
489504
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
490505

491506
CuratorOp updateOp = zkOp.updateOp(path, data);
492-
List<CuratorOp> ops = wrapRequestWithCoordinatorEpochCheck(updateOp, expectedZkVersion);
507+
List<CuratorOp> ops = wrapRequestWithEpochCheck(updateOp, expectedZkVersion);
493508

494509
zkClient.transaction().forOperations(ops);
495510
LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
@@ -511,22 +526,22 @@ public void batchUpdateLeaderAndIsr(
511526
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
512527
CuratorOp updateOp = zkClient.transactionOp().setData().forPath(path, data);
513528
ops.add(updateOp);
514-
if (ops.size() == MAX_BATCH_SIZE - 1) {
515-
List<CuratorOp> wrapOps =
516-
wrapRequestsWithCoordinatorEpochCheck(ops, expectedZkVersion);
529+
if (ops.size() == MAX_BATCH_SIZE) {
530+
List<CuratorOp> wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion);
517531
zkClient.transaction().forOperations(wrapOps);
518532
ops.clear();
519533
}
520534
}
521535
if (!ops.isEmpty()) {
522-
List<CuratorOp> wrapOps = wrapRequestsWithCoordinatorEpochCheck(ops, expectedZkVersion);
536+
List<CuratorOp> wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion);
523537
zkClient.transaction().forOperations(wrapOps);
524538
}
525539
}
526540

527-
public void deleteLeaderAndIsr(TableBucket tableBucket) throws Exception {
541+
public void deleteLeaderAndIsr(TableBucket tableBucket, int expectedZkVersion)
542+
throws Exception {
528543
String path = LeaderAndIsrZNode.path(tableBucket);
529-
zkClient.delete().forPath(path);
544+
deleteRecursiveWithEpochCheck(path, expectedZkVersion, false);
530545
LOG.info("Deleted LeaderAndIsr for bucket {} in Zookeeper.", tableBucket);
531546
}
532547

@@ -1601,11 +1616,19 @@ public static <K> Map<K, List<String>> processGetChildrenResponses(
16011616
return result;
16021617
}
16031618

1604-
public void createRecursive(
1619+
/**
1620+
* create a node (recursively if parent path not exists) with Zk epoch version check.
1621+
*
1622+
* @param path the path to create
1623+
* @param data the data to write
1624+
* @param throwIfPathExists whether to throw exception if path exist
1625+
* @throws Exception if any error occurs
1626+
*/
1627+
public void createRecursiveWithEpochCheck(
16051628
String path, byte[] data, int expectedZkVersion, boolean throwIfPathExists)
16061629
throws Exception {
16071630
CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT);
1608-
List<CuratorOp> ops = wrapRequestWithCoordinatorEpochCheck(createOp, expectedZkVersion);
1631+
List<CuratorOp> ops = wrapRequestWithEpochCheck(createOp, expectedZkVersion);
16091632

16101633
try {
16111634
// try to directly create
@@ -1622,19 +1645,48 @@ public void createRecursive(
16221645
throw new IllegalArgumentException("Invalid path {}" + path);
16231646
}
16241647
String parentPath = path.substring(0, indexOfLastSlash);
1625-
createRecursive(parentPath, null, expectedZkVersion, throwIfPathExists);
1648+
createRecursiveWithEpochCheck(parentPath, null, expectedZkVersion, throwIfPathExists);
16261649
// After creating parent, retry creating the original path
16271650
zkClient.transaction().forOperations(ops);
16281651
}
16291652
}
16301653

1631-
public List<CuratorOp> wrapRequestWithCoordinatorEpochCheck(
1632-
CuratorOp request, int expectedZkVersion) throws Exception {
1633-
return wrapRequestsWithCoordinatorEpochCheck(
1634-
Collections.singletonList(request), expectedZkVersion);
1654+
/**
1655+
* Delete a node (and recursively delete children) with Zk epoch version check.
1656+
*
1657+
* @param path the path to delete
1658+
* @param expectedZkVersion the expected coordinator epoch zk version
1659+
* @param throwIfPathNotExists whether to throw exception if path does not exist
1660+
* @throws Exception if any error occurs
1661+
*/
1662+
public void deleteRecursiveWithEpochCheck(
1663+
String path, int expectedZkVersion, boolean throwIfPathNotExists) throws Exception {
1664+
// delete children recursively
1665+
List<String> children = getChildren(path);
1666+
for (String child : children) {
1667+
deleteRecursiveWithEpochCheck(path + "/" + child, expectedZkVersion, false);
1668+
}
1669+
1670+
CuratorOp deleteOp = zkOp.deleteOp(path);
1671+
List<CuratorOp> ops = wrapRequestWithEpochCheck(deleteOp, expectedZkVersion);
1672+
1673+
try {
1674+
// delete itself
1675+
zkClient.transaction().forOperations(ops);
1676+
} catch (KeeperException.NoNodeException e) {
1677+
// should exist
1678+
if (throwIfPathNotExists) {
1679+
throw e;
1680+
}
1681+
}
1682+
}
1683+
1684+
public List<CuratorOp> wrapRequestWithEpochCheck(CuratorOp request, int expectedZkVersion)
1685+
throws Exception {
1686+
return wrapRequestsWithEpochCheck(Collections.singletonList(request), expectedZkVersion);
16351687
}
16361688

1637-
public List<CuratorOp> wrapRequestsWithCoordinatorEpochCheck(
1689+
public List<CuratorOp> wrapRequestsWithEpochCheck(
16381690
List<CuratorOp> requestList, int expectedZkVersion) throws Exception {
16391691
if (ZkVersion.MATCH_ANY_VERSION.getVersion() == expectedZkVersion) {
16401692
return requestList;

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperOp.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ public CuratorOp updateOp(String path, byte[] data) throws Exception {
4444
return zkClient.transactionOp().setData().forPath(path, data);
4545
}
4646

47+
public CuratorOp deleteOp(String path) throws Exception {
48+
return zkClient.transactionOp().delete().forPath(path);
49+
}
50+
4751
public static List<CuratorOp> multiRequest(CuratorOp op1, CuratorOp op2) {
4852
List<CuratorOp> ops = new ArrayList<>();
4953
ops.add(op1);

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ void testCoordinatorServerElection() throws Exception {
9191
}
9292
}
9393
assertThat(elected).isNotNull();
94-
assertThat(zookeeperClient.getCurrentEpoch().f0)
94+
assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch())
9595
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH);
9696
elected.close();
9797
elected.start();
@@ -101,7 +101,7 @@ void testCoordinatorServerElection() throws Exception {
101101
CoordinatorAddress secondLeaderAddress =
102102
zookeeperClient.getCoordinatorLeaderAddress().get();
103103
assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress);
104-
assertThat(zookeeperClient.getCurrentEpoch().f0)
104+
assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch())
105105
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 1);
106106

107107
// kill other 2 coordinator servers except the first one
@@ -115,7 +115,7 @@ void testCoordinatorServerElection() throws Exception {
115115
CoordinatorAddress thirdLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get();
116116

117117
assertThat(thirdLeaderAddress.getId()).isEqualTo(firstLeaderAddress.getId());
118-
assertThat(zookeeperClient.getCurrentEpoch().f0)
118+
assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch())
119119
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 2);
120120
}
121121

0 commit comments

Comments
 (0)