Skip to content

Commit cd8787d

Browse files
committed
support coordinator epoch
1 parent 5c4482d commit cd8787d

File tree

6 files changed

+68
-22
lines changed

6 files changed

+68
-22
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,7 +1012,8 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
10121012
}
10131013

10141014
try {
1015-
zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList);
1015+
zooKeeperClient.batchUpdateLeaderAndIsr(
1016+
newLeaderAndIsrList, coordinatorContext.getCoordinatorEpoch());
10161017
newLeaderAndIsrList.forEach(
10171018
(tableBucket, newLeaderAndIsr) ->
10181019
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
@@ -1023,7 +1024,8 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
10231024
TableBucket tableBucket = entry.getKey();
10241025
LeaderAndIsr newLeaderAndIsr = entry.getValue();
10251026
try {
1026-
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
1027+
zooKeeperClient.updateLeaderAndIsr(
1028+
tableBucket, newLeaderAndIsr, coordinatorContext.getCoordinatorEpoch());
10271029
} catch (Exception e) {
10281030
LOG.error("Error when register leader and isr.", e);
10291031
result.add(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,8 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
459459
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
460460
}
461461
try {
462-
zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList);
462+
zooKeeperClient.batchUpdateLeaderAndIsr(
463+
toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorEpoch());
463464
toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
464465
return adjustedLeaderAndIsr;
465466
} catch (Exception e) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,10 @@ private Optional<ElectionResult> electNewLeaderForTableBuckets(
487487
}
488488
ElectionResult electionResult = optionalElectionResult.get();
489489
try {
490-
zooKeeperClient.updateLeaderAndIsr(tableBucket, electionResult.leaderAndIsr);
490+
zooKeeperClient.updateLeaderAndIsr(
491+
tableBucket,
492+
electionResult.leaderAndIsr,
493+
coordinatorContext.getCoordinatorEpoch());
491494
} catch (Exception e) {
492495
LOG.error(
493496
"Fail to update bucket LeaderAndIsr for table bucket {}.",

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.config.ConfigOptions;
2222
import org.apache.fluss.config.Configuration;
23+
import org.apache.fluss.exception.InvalidCoordinatorException;
2324
import org.apache.fluss.metadata.PhysicalTablePath;
2425
import org.apache.fluss.metadata.ResolvedPartitionSpec;
2526
import org.apache.fluss.metadata.Schema;
@@ -178,13 +179,9 @@ public Optional<Integer> fenceBecomeCoordinatorLeader(int coordinatorId) throws
178179
ensureEpochZnodeExists();
179180

180181
try {
181-
Stat currentStat = new Stat();
182-
byte[] bytes =
183-
zkClient.getData()
184-
.storingStatIn(currentStat)
185-
.forPath(ZkData.CoordinatorEpochZNode.path());
186-
int currentEpoch = ZkData.CoordinatorEpochZNode.decode(bytes);
187-
int currentVersion = currentStat.getVersion();
182+
Tuple2<Integer, Integer> getEpoch = getCurrentEpoch();
183+
int currentEpoch = getEpoch.f0;
184+
int currentVersion = getEpoch.f1;
188185
int newEpoch = currentEpoch + 1;
189186
LOG.info(
190187
"Coordinator leader {} tries to update epoch. Current epoch={}, Zookeeper version={}, new epoch={}",
@@ -247,12 +244,24 @@ public void ensureEpochZnodeExists() throws Exception {
247244
.forPath(
248245
path,
249246
ZkData.CoordinatorEpochZNode.encode(
250-
CoordinatorContext.INITIAL_COORDINATOR_EPOCH));
247+
CoordinatorContext.INITIAL_COORDINATOR_EPOCH - 1));
251248
} catch (KeeperException.NodeExistsException e) {
252249
}
253250
}
254251
}
255252

253+
/** Get epoch now in ZK. */
254+
public Tuple2<Integer, Integer> getCurrentEpoch() throws Exception {
255+
Stat currentStat = new Stat();
256+
byte[] bytes =
257+
zkClient.getData()
258+
.storingStatIn(currentStat)
259+
.forPath(ZkData.CoordinatorEpochZNode.path());
260+
int currentEpoch = ZkData.CoordinatorEpochZNode.decode(bytes);
261+
int currentVersion = currentStat.getVersion();
262+
return new Tuple2<>(currentEpoch, currentVersion);
263+
}
264+
256265
// --------------------------------------------------------------------------------------------
257266
// Tablet server
258267
// --------------------------------------------------------------------------------------------
@@ -469,14 +478,25 @@ public Map<TableBucket, LeaderAndIsr> getLeaderAndIsrs(Collection<TableBucket> t
469478
"leader and isr");
470479
}
471480

472-
public void updateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr)
481+
public void updateLeaderAndIsr(
482+
TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int currentCoordinatorEpoch)
473483
throws Exception {
484+
// check coordinator epoch to ensure no other Coordinator leader exists.
485+
if (leaderAndIsr.coordinatorEpoch() != currentCoordinatorEpoch) {
486+
throw new InvalidCoordinatorException(
487+
String.format(
488+
"LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
489+
+ "This coordinator may no longer be the leader.",
490+
leaderAndIsr.coordinatorEpoch(), currentCoordinatorEpoch, tableBucket));
491+
}
492+
474493
String path = LeaderAndIsrZNode.path(tableBucket);
475494
zkClient.setData().forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr));
476495
LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
477496
}
478497

479-
public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> leaderAndIsrList)
498+
public void batchUpdateLeaderAndIsr(
499+
Map<TableBucket, LeaderAndIsr> leaderAndIsrList, int currentCoordinatorEpoch)
480500
throws Exception {
481501
if (leaderAndIsrList.isEmpty()) {
482502
return;
@@ -487,6 +507,17 @@ public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> leaderAndIsrL
487507
TableBucket tableBucket = entry.getKey();
488508
LeaderAndIsr leaderAndIsr = entry.getValue();
489509

510+
// check coordinator epoch to ensure no other Coordinator leader exists.
511+
if (leaderAndIsr.coordinatorEpoch() != currentCoordinatorEpoch) {
512+
throw new InvalidCoordinatorException(
513+
String.format(
514+
"LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
515+
+ "This coordinator may no longer be the leader.",
516+
leaderAndIsr.coordinatorEpoch(),
517+
currentCoordinatorEpoch,
518+
tableBucket));
519+
}
520+
490521
LOG.info("Batch Update {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
491522
String path = LeaderAndIsrZNode.path(tableBucket);
492523
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ void testCoordinatorServerElection() throws Exception {
8282

8383
CoordinatorAddress firstLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get();
8484

85-
// Find the leader
86-
// and try to close it.
85+
// Find the leader and try to close it.
8786
CoordinatorServer elected = null;
8887
for (CoordinatorServer coordinatorServer : coordinatorServerList) {
8988
if (coordinatorServer.getServerId() == firstLeaderAddress.getId()) {
@@ -92,6 +91,8 @@ void testCoordinatorServerElection() throws Exception {
9291
}
9392
}
9493
assertThat(elected).isNotNull();
94+
assertThat(zookeeperClient.getCurrentEpoch().f0)
95+
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH);
9596
elected.close();
9697
elected.start();
9798

@@ -100,6 +101,8 @@ void testCoordinatorServerElection() throws Exception {
100101
CoordinatorAddress secondLeaderAddress =
101102
zookeeperClient.getCoordinatorLeaderAddress().get();
102103
assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress);
104+
assertThat(zookeeperClient.getCurrentEpoch().f0)
105+
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 1);
103106

104107
// kill other 2 coordinator servers
105108
for (CoordinatorServer coordinatorServer : coordinatorServerList) {
@@ -112,6 +115,8 @@ void testCoordinatorServerElection() throws Exception {
112115
CoordinatorAddress thirdLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get();
113116

114117
assertThat(thirdLeaderAddress.getId()).isEqualTo(firstLeaderAddress.getId());
118+
assertThat(zookeeperClient.getCurrentEpoch().f0)
119+
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 2);
115120
}
116121

117122
/** Create a configuration with Zookeeper address setting. */

fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.fluss.metadata.TableDescriptor;
2828
import org.apache.fluss.metadata.TablePartition;
2929
import org.apache.fluss.metadata.TablePath;
30+
import org.apache.fluss.server.coordinator.CoordinatorContext;
3031
import org.apache.fluss.server.entity.RegisterTableBucketLeadAndIsrInfo;
3132
import org.apache.fluss.server.zk.data.BucketAssignment;
3233
import org.apache.fluss.server.zk.data.BucketSnapshot;
@@ -193,7 +194,8 @@ void testLeaderAndIsr() throws Exception {
193194

194195
// test update
195196
leaderAndIsr1 = new LeaderAndIsr(2, 20, Collections.emptyList(), 200, 2000);
196-
zookeeperClient.updateLeaderAndIsr(tableBucket1, leaderAndIsr1);
197+
zookeeperClient.updateLeaderAndIsr(
198+
tableBucket1, leaderAndIsr1, CoordinatorContext.INITIAL_COORDINATOR_EPOCH);
197199
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).hasValue(leaderAndIsr1);
198200

199201
// test delete
@@ -210,7 +212,7 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep
210212
TableBucket tableBucket =
211213
isPartitionTable ? new TableBucket(1, 2L, i) : new TableBucket(1, i);
212214
LeaderAndIsr leaderAndIsr =
213-
new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000);
215+
new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 0, 1000);
214216
leaderAndIsrList.add(leaderAndIsr);
215217
RegisterTableBucketLeadAndIsrInfo info =
216218
isPartitionTable
@@ -251,7 +253,8 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep
251253
entry.setValue(adjustLeaderAndIsr);
252254
});
253255
// batch update
254-
zookeeperClient.batchUpdateLeaderAndIsr(updateMap);
256+
zookeeperClient.batchUpdateLeaderAndIsr(
257+
updateMap, CoordinatorContext.INITIAL_COORDINATOR_EPOCH);
255258
for (int i = 0; i < 100; i++) {
256259
// each should update successful
257260
Optional<LeaderAndIsr> optionalLeaderAndIsr =
@@ -270,7 +273,7 @@ void testBatchUpdateLeaderAndIsr() throws Exception {
270273
for (int i = 0; i < totalCount; i++) {
271274
TableBucket tableBucket = new TableBucket(1, i);
272275
LeaderAndIsr leaderAndIsr =
273-
new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000);
276+
new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 0, 1000);
274277
leaderAndIsrList.put(tableBucket, leaderAndIsr);
275278
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
276279
}
@@ -287,10 +290,11 @@ void testBatchUpdateLeaderAndIsr() throws Exception {
287290
old.leader() + 1,
288291
old.leaderEpoch() + 1,
289292
old.isr(),
290-
old.coordinatorEpoch() + 1,
293+
old.coordinatorEpoch(),
291294
old.bucketEpoch() + 1);
292295
}));
293-
zookeeperClient.batchUpdateLeaderAndIsr(updateLeaderAndIsrList);
296+
zookeeperClient.batchUpdateLeaderAndIsr(
297+
updateLeaderAndIsrList, CoordinatorContext.INITIAL_COORDINATOR_EPOCH);
294298
for (Map.Entry<TableBucket, LeaderAndIsr> entry : updateLeaderAndIsrList.entrySet()) {
295299
TableBucket tableBucket = entry.getKey();
296300
LeaderAndIsr leaderAndIsr = entry.getValue();

0 commit comments

Comments
 (0)