Skip to content

Commit 492bdc9

Browse files
committed
support coordinator epoch
1 parent 5c4482d commit 492bdc9

File tree

11 files changed

+82
-38
lines changed

11 files changed

+82
-38
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class CoordinatorContext {
5454
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorContext.class);
5555

5656
public static final int INITIAL_COORDINATOR_EPOCH = 0;
57-
public static final int INITIAL_COORDINATOR_EPOCH_ZKVERSION = 0;
57+
public static final int INITIAL_COORDINATOR_EPOCH_ZK_VERSION = 0;
5858

5959
// for simplicity, we just use retry time, may consider make it a configurable value
6060
// and use combine retry times and retry delay
@@ -106,7 +106,7 @@ public class CoordinatorContext {
106106

107107
private ServerInfo coordinatorServerInfo = null;
108108
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
109-
private int coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZKVERSION;
109+
private int coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZK_VERSION;
110110

111111
public CoordinatorContext() {}
112112

@@ -127,7 +127,6 @@ public Set<Integer> getLiveCoordinatorServers() {
127127
return liveCoordinatorServers;
128128
}
129129

130-
@VisibleForTesting
131130
public void setLiveCoordinatorServers(Set<Integer> servers) {
132131
liveCoordinatorServers.clear();
133132
liveCoordinatorServers.addAll(servers);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,7 +1012,8 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
10121012
}
10131013

10141014
try {
1015-
zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList);
1015+
zooKeeperClient.batchUpdateLeaderAndIsr(
1016+
newLeaderAndIsrList, coordinatorContext.getCoordinatorEpoch());
10161017
newLeaderAndIsrList.forEach(
10171018
(tableBucket, newLeaderAndIsr) ->
10181019
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
@@ -1023,7 +1024,8 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
10231024
TableBucket tableBucket = entry.getKey();
10241025
LeaderAndIsr newLeaderAndIsr = entry.getValue();
10251026
try {
1026-
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
1027+
zooKeeperClient.updateLeaderAndIsr(
1028+
tableBucket, newLeaderAndIsr, coordinatorContext.getCoordinatorEpoch());
10271029
} catch (Exception e) {
10281030
LOG.error("Error when register leader and isr.", e);
10291031
result.add(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,6 @@ protected void startCoordinatorLeaderService() throws Exception {
238238
rpcServer.start();
239239

240240
registerCoordinatorLeader();
241-
// when init session, register coordinator server again
242-
// ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
243-
// zkClient, this::registerCoordinatorLeader, this);
244241

245242
this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME);
246243
this.rpcClient = RpcClient.create(conf, clientMetricGroup, true);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ private void updateMetricsViaAccessContext() {
106106
AccessContextEvent<MetricsData> accessContextEvent =
107107
new AccessContextEvent<>(
108108
context -> {
109+
int coordinatorServerCount = context.getLiveCoordinatorServers().size();
109110
int tabletServerCount = context.getLiveTabletServers().size();
110111
int tableCount = context.allTables().size();
111112
int bucketCount = context.bucketLeaderAndIsr().size();
@@ -138,6 +139,7 @@ private void updateMetricsViaAccessContext() {
138139
}
139140

140141
return new MetricsData(
142+
coordinatorServerCount,
141143
tabletServerCount,
142144
tableCount,
143145
bucketCount,
@@ -151,6 +153,7 @@ private void updateMetricsViaAccessContext() {
151153
// Wait for the result and update local metrics
152154
try {
153155
MetricsData metricsData = accessContextEvent.getResultFuture().get();
156+
this.aliveCoordinatorServerCount = metricsData.coordinatorServerCount;
154157
this.tabletServerCount = metricsData.tabletServerCount;
155158
this.tableCount = metricsData.tableCount;
156159
this.bucketCount = metricsData.bucketCount;
@@ -273,6 +276,7 @@ public QueuedEvent(CoordinatorEvent event, long enqueueTimeMs) {
273276
}
274277

275278
private static class MetricsData {
279+
private final int coordinatorServerCount;
276280
private final int tabletServerCount;
277281
private final int tableCount;
278282
private final int bucketCount;
@@ -281,12 +285,14 @@ private static class MetricsData {
281285
private final int replicasToDeleteCount;
282286

283287
public MetricsData(
288+
int coordinatorServerCount,
284289
int tabletServerCount,
285290
int tableCount,
286291
int bucketCount,
287292
int partitionCount,
288293
int offlineBucketCount,
289294
int replicasToDeleteCount) {
295+
this.coordinatorServerCount = coordinatorServerCount;
290296
this.tabletServerCount = tabletServerCount;
291297
this.tableCount = tableCount;
292298
this.bucketCount = bucketCount;

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,8 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
459459
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
460460
}
461461
try {
462-
zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList);
462+
zooKeeperClient.batchUpdateLeaderAndIsr(
463+
toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorEpoch());
463464
toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
464465
return adjustedLeaderAndIsr;
465466
} catch (Exception e) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,10 @@ private Optional<ElectionResult> electNewLeaderForTableBuckets(
487487
}
488488
ElectionResult electionResult = optionalElectionResult.get();
489489
try {
490-
zooKeeperClient.updateLeaderAndIsr(tableBucket, electionResult.leaderAndIsr);
490+
zooKeeperClient.updateLeaderAndIsr(
491+
tableBucket,
492+
electionResult.leaderAndIsr,
493+
coordinatorContext.getCoordinatorEpoch());
491494
} catch (Exception e) {
492495
LOG.error(
493496
"Fail to update bucket LeaderAndIsr for table bucket {}.",

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.config.ConfigOptions;
2222
import org.apache.fluss.config.Configuration;
23+
import org.apache.fluss.exception.InvalidCoordinatorException;
2324
import org.apache.fluss.metadata.PhysicalTablePath;
2425
import org.apache.fluss.metadata.ResolvedPartitionSpec;
2526
import org.apache.fluss.metadata.Schema;
@@ -178,13 +179,9 @@ public Optional<Integer> fenceBecomeCoordinatorLeader(int coordinatorId) throws
178179
ensureEpochZnodeExists();
179180

180181
try {
181-
Stat currentStat = new Stat();
182-
byte[] bytes =
183-
zkClient.getData()
184-
.storingStatIn(currentStat)
185-
.forPath(ZkData.CoordinatorEpochZNode.path());
186-
int currentEpoch = ZkData.CoordinatorEpochZNode.decode(bytes);
187-
int currentVersion = currentStat.getVersion();
182+
Tuple2<Integer, Integer> getEpoch = getCurrentEpoch();
183+
int currentEpoch = getEpoch.f0;
184+
int currentVersion = getEpoch.f1;
188185
int newEpoch = currentEpoch + 1;
189186
LOG.info(
190187
"Coordinator leader {} tries to update epoch. Current epoch={}, Zookeeper version={}, new epoch={}",
@@ -226,7 +223,7 @@ public Optional<CoordinatorAddress> getCoordinatorLeaderAddress() throws Excepti
226223
Optional<byte[]> bytes = getOrEmpty(ZkData.CoordinatorLeaderZNode.path());
227224
return bytes.map(
228225
data ->
229-
// maybe a empty node when a leader is elected but not registered
226+
// maybe an empty node when a leader is elected but not registered
230227
data.length == 0 ? null : ZkData.CoordinatorLeaderZNode.decode(data));
231228
}
232229

@@ -247,12 +244,24 @@ public void ensureEpochZnodeExists() throws Exception {
247244
.forPath(
248245
path,
249246
ZkData.CoordinatorEpochZNode.encode(
250-
CoordinatorContext.INITIAL_COORDINATOR_EPOCH));
247+
CoordinatorContext.INITIAL_COORDINATOR_EPOCH - 1));
251248
} catch (KeeperException.NodeExistsException e) {
252249
}
253250
}
254251
}
255252

253+
/** Get epoch now in ZK. */
254+
public Tuple2<Integer, Integer> getCurrentEpoch() throws Exception {
255+
Stat currentStat = new Stat();
256+
byte[] bytes =
257+
zkClient.getData()
258+
.storingStatIn(currentStat)
259+
.forPath(ZkData.CoordinatorEpochZNode.path());
260+
int currentEpoch = ZkData.CoordinatorEpochZNode.decode(bytes);
261+
int currentVersion = currentStat.getVersion();
262+
return new Tuple2<>(currentEpoch, currentVersion);
263+
}
264+
256265
// --------------------------------------------------------------------------------------------
257266
// Tablet server
258267
// --------------------------------------------------------------------------------------------
@@ -469,14 +478,25 @@ public Map<TableBucket, LeaderAndIsr> getLeaderAndIsrs(Collection<TableBucket> t
469478
"leader and isr");
470479
}
471480

472-
public void updateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr)
481+
public void updateLeaderAndIsr(
482+
TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int currentCoordinatorEpoch)
473483
throws Exception {
484+
// check coordinator epoch to ensure no other Coordinator leader exists.
485+
if (leaderAndIsr.coordinatorEpoch() != currentCoordinatorEpoch) {
486+
throw new InvalidCoordinatorException(
487+
String.format(
488+
"LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
489+
+ "This coordinator may no longer be the leader.",
490+
leaderAndIsr.coordinatorEpoch(), currentCoordinatorEpoch, tableBucket));
491+
}
492+
474493
String path = LeaderAndIsrZNode.path(tableBucket);
475494
zkClient.setData().forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr));
476495
LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
477496
}
478497

479-
public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> leaderAndIsrList)
498+
public void batchUpdateLeaderAndIsr(
499+
Map<TableBucket, LeaderAndIsr> leaderAndIsrList, int currentCoordinatorEpoch)
480500
throws Exception {
481501
if (leaderAndIsrList.isEmpty()) {
482502
return;
@@ -487,6 +507,17 @@ public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> leaderAndIsrL
487507
TableBucket tableBucket = entry.getKey();
488508
LeaderAndIsr leaderAndIsr = entry.getValue();
489509

510+
// check coordinator epoch to ensure no other Coordinator leader exists.
511+
if (leaderAndIsr.coordinatorEpoch() != currentCoordinatorEpoch) {
512+
throw new InvalidCoordinatorException(
513+
String.format(
514+
"LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
515+
+ "This coordinator may no longer be the leader.",
516+
leaderAndIsr.coordinatorEpoch(),
517+
currentCoordinatorEpoch,
518+
tableBucket));
519+
}
520+
490521
LOG.info("Batch Update {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
491522
String path = LeaderAndIsrZNode.path(tableBucket);
492523
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ void testCoordinatorServerElection() throws Exception {
8282

8383
CoordinatorAddress firstLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get();
8484

85-
// Find the leader
86-
// and try to close it.
85+
// Find the leader and try to close it.
8786
CoordinatorServer elected = null;
8887
for (CoordinatorServer coordinatorServer : coordinatorServerList) {
8988
if (coordinatorServer.getServerId() == firstLeaderAddress.getId()) {
@@ -92,6 +91,8 @@ void testCoordinatorServerElection() throws Exception {
9291
}
9392
}
9493
assertThat(elected).isNotNull();
94+
assertThat(zookeeperClient.getCurrentEpoch().f0)
95+
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH);
9596
elected.close();
9697
elected.start();
9798

@@ -100,6 +101,8 @@ void testCoordinatorServerElection() throws Exception {
100101
CoordinatorAddress secondLeaderAddress =
101102
zookeeperClient.getCoordinatorLeaderAddress().get();
102103
assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress);
104+
assertThat(zookeeperClient.getCurrentEpoch().f0)
105+
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 1);
103106

104107
// kill other 2 coordinator servers
105108
for (CoordinatorServer coordinatorServer : coordinatorServerList) {
@@ -112,6 +115,8 @@ void testCoordinatorServerElection() throws Exception {
112115
CoordinatorAddress thirdLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get();
113116

114117
assertThat(thirdLeaderAddress.getId()).isEqualTo(firstLeaderAddress.getId());
118+
assertThat(zookeeperClient.getCurrentEpoch().f0)
119+
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 2);
115120
}
116121

117122
/** Create a configuration with Zookeeper address setting. */

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,7 @@ protected void checkAfterStartServer() throws Exception {
7777

7878
public void waitUtilCoordinatorServerElected() {
7979
waitUntil(
80-
() -> {
81-
return zookeeperClient.getCoordinatorLeaderAddress().isPresent();
82-
},
80+
() -> zookeeperClient.getCoordinatorLeaderAddress().isPresent(),
8381
Duration.ofSeconds(10),
8482
"Fail to wait coordinator server elected");
8583
}

fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -836,9 +836,7 @@ public CoordinatorServer getCoordinatorServer() {
836836

837837
public void waitUtilCoordinatorServerElected() {
838838
waitUntil(
839-
() -> {
840-
return zooKeeperClient.getCoordinatorLeaderAddress().isPresent();
841-
},
839+
() -> zooKeeperClient.getCoordinatorLeaderAddress().isPresent(),
842840
Duration.ofSeconds(10),
843841
"Fail to wait coordinator server elected");
844842
}

0 commit comments

Comments
 (0)