Skip to content

Commit a2c1935

Browse files
committed
support coordinator epoch
support coordinator epoch
1 parent 5c4482d commit a2c1935

File tree

9 files changed

+80
-32
lines changed

9 files changed

+80
-32
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class CoordinatorContext {
5454
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorContext.class);
5555

5656
public static final int INITIAL_COORDINATOR_EPOCH = 0;
57-
public static final int INITIAL_COORDINATOR_EPOCH_ZKVERSION = 0;
57+
public static final int INITIAL_COORDINATOR_EPOCH_ZK_VERSION = 0;
5858

5959
// for simplicity, we just use retry time, may consider make it a configurable value
6060
// and use combine retry times and retry delay
@@ -106,7 +106,7 @@ public class CoordinatorContext {
106106

107107
private ServerInfo coordinatorServerInfo = null;
108108
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
109-
private int coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZKVERSION;
109+
private int coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZK_VERSION;
110110

111111
public CoordinatorContext() {}
112112

@@ -127,7 +127,6 @@ public Set<Integer> getLiveCoordinatorServers() {
127127
return liveCoordinatorServers;
128128
}
129129

130-
@VisibleForTesting
131130
public void setLiveCoordinatorServers(Set<Integer> servers) {
132131
liveCoordinatorServers.clear();
133132
liveCoordinatorServers.addAll(servers);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,7 +1012,8 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
10121012
}
10131013

10141014
try {
1015-
zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList);
1015+
zooKeeperClient.batchUpdateLeaderAndIsr(
1016+
newLeaderAndIsrList, coordinatorContext.getCoordinatorEpoch());
10161017
newLeaderAndIsrList.forEach(
10171018
(tableBucket, newLeaderAndIsr) ->
10181019
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
@@ -1023,7 +1024,8 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
10231024
TableBucket tableBucket = entry.getKey();
10241025
LeaderAndIsr newLeaderAndIsr = entry.getValue();
10251026
try {
1026-
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
1027+
zooKeeperClient.updateLeaderAndIsr(
1028+
tableBucket, newLeaderAndIsr, coordinatorContext.getCoordinatorEpoch());
10271029
} catch (Exception e) {
10281030
LOG.error("Error when register leader and isr.", e);
10291031
result.add(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,6 @@ protected void startCoordinatorLeaderService() throws Exception {
238238
rpcServer.start();
239239

240240
registerCoordinatorLeader();
241-
// when init session, register coordinator server again
242-
// ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
243-
// zkClient, this::registerCoordinatorLeader, this);
244241

245242
this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME);
246243
this.rpcClient = RpcClient.create(conf, clientMetricGroup, true);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ private void updateMetricsViaAccessContext() {
106106
AccessContextEvent<MetricsData> accessContextEvent =
107107
new AccessContextEvent<>(
108108
context -> {
109+
int coordinatorServerCount = context.getLiveCoordinatorServers().size();
109110
int tabletServerCount = context.getLiveTabletServers().size();
110111
int tableCount = context.allTables().size();
111112
int bucketCount = context.bucketLeaderAndIsr().size();
@@ -138,6 +139,7 @@ private void updateMetricsViaAccessContext() {
138139
}
139140

140141
return new MetricsData(
142+
coordinatorServerCount,
141143
tabletServerCount,
142144
tableCount,
143145
bucketCount,
@@ -151,6 +153,7 @@ private void updateMetricsViaAccessContext() {
151153
// Wait for the result and update local metrics
152154
try {
153155
MetricsData metricsData = accessContextEvent.getResultFuture().get();
156+
this.aliveCoordinatorServerCount = metricsData.coordinatorServerCount;
154157
this.tabletServerCount = metricsData.tabletServerCount;
155158
this.tableCount = metricsData.tableCount;
156159
this.bucketCount = metricsData.bucketCount;
@@ -273,6 +276,7 @@ public QueuedEvent(CoordinatorEvent event, long enqueueTimeMs) {
273276
}
274277

275278
private static class MetricsData {
279+
private final int coordinatorServerCount;
276280
private final int tabletServerCount;
277281
private final int tableCount;
278282
private final int bucketCount;
@@ -281,12 +285,14 @@ private static class MetricsData {
281285
private final int replicasToDeleteCount;
282286

283287
public MetricsData(
288+
int coordinatorServerCount,
284289
int tabletServerCount,
285290
int tableCount,
286291
int bucketCount,
287292
int partitionCount,
288293
int offlineBucketCount,
289294
int replicasToDeleteCount) {
295+
this.coordinatorServerCount = coordinatorServerCount;
290296
this.tabletServerCount = tabletServerCount;
291297
this.tableCount = tableCount;
292298
this.bucketCount = bucketCount;

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,8 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
459459
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
460460
}
461461
try {
462-
zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList);
462+
zooKeeperClient.batchUpdateLeaderAndIsr(
463+
toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorEpoch());
463464
toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
464465
return adjustedLeaderAndIsr;
465466
} catch (Exception e) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,10 @@ private Optional<ElectionResult> electNewLeaderForTableBuckets(
487487
}
488488
ElectionResult electionResult = optionalElectionResult.get();
489489
try {
490-
zooKeeperClient.updateLeaderAndIsr(tableBucket, electionResult.leaderAndIsr);
490+
zooKeeperClient.updateLeaderAndIsr(
491+
tableBucket,
492+
electionResult.leaderAndIsr,
493+
coordinatorContext.getCoordinatorEpoch());
491494
} catch (Exception e) {
492495
LOG.error(
493496
"Fail to update bucket LeaderAndIsr for table bucket {}.",

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.config.ConfigOptions;
2222
import org.apache.fluss.config.Configuration;
23+
import org.apache.fluss.exception.InvalidCoordinatorException;
2324
import org.apache.fluss.metadata.PhysicalTablePath;
2425
import org.apache.fluss.metadata.ResolvedPartitionSpec;
2526
import org.apache.fluss.metadata.Schema;
@@ -178,13 +179,9 @@ public Optional<Integer> fenceBecomeCoordinatorLeader(int coordinatorId) throws
178179
ensureEpochZnodeExists();
179180

180181
try {
181-
Stat currentStat = new Stat();
182-
byte[] bytes =
183-
zkClient.getData()
184-
.storingStatIn(currentStat)
185-
.forPath(ZkData.CoordinatorEpochZNode.path());
186-
int currentEpoch = ZkData.CoordinatorEpochZNode.decode(bytes);
187-
int currentVersion = currentStat.getVersion();
182+
Tuple2<Integer, Integer> getEpoch = getCurrentEpoch();
183+
int currentEpoch = getEpoch.f0;
184+
int currentVersion = getEpoch.f1;
188185
int newEpoch = currentEpoch + 1;
189186
LOG.info(
190187
"Coordinator leader {} tries to update epoch. Current epoch={}, Zookeeper version={}, new epoch={}",
@@ -226,7 +223,7 @@ public Optional<CoordinatorAddress> getCoordinatorLeaderAddress() throws Excepti
226223
Optional<byte[]> bytes = getOrEmpty(ZkData.CoordinatorLeaderZNode.path());
227224
return bytes.map(
228225
data ->
229-
// maybe a empty node when a leader is elected but not registered
226+
// maybe an empty node when a leader is elected but not registered
230227
data.length == 0 ? null : ZkData.CoordinatorLeaderZNode.decode(data));
231228
}
232229

@@ -247,12 +244,24 @@ public void ensureEpochZnodeExists() throws Exception {
247244
.forPath(
248245
path,
249246
ZkData.CoordinatorEpochZNode.encode(
250-
CoordinatorContext.INITIAL_COORDINATOR_EPOCH));
247+
CoordinatorContext.INITIAL_COORDINATOR_EPOCH - 1));
251248
} catch (KeeperException.NodeExistsException e) {
252249
}
253250
}
254251
}
255252

253+
/** Get epoch now in ZK. */
254+
public Tuple2<Integer, Integer> getCurrentEpoch() throws Exception {
255+
Stat currentStat = new Stat();
256+
byte[] bytes =
257+
zkClient.getData()
258+
.storingStatIn(currentStat)
259+
.forPath(ZkData.CoordinatorEpochZNode.path());
260+
int currentEpoch = ZkData.CoordinatorEpochZNode.decode(bytes);
261+
int currentVersion = currentStat.getVersion();
262+
return new Tuple2<>(currentEpoch, currentVersion);
263+
}
264+
256265
// --------------------------------------------------------------------------------------------
257266
// Tablet server
258267
// --------------------------------------------------------------------------------------------
@@ -469,14 +478,25 @@ public Map<TableBucket, LeaderAndIsr> getLeaderAndIsrs(Collection<TableBucket> t
469478
"leader and isr");
470479
}
471480

472-
public void updateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr)
481+
public void updateLeaderAndIsr(
482+
TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int currentCoordinatorEpoch)
473483
throws Exception {
484+
// check coordinator epoch to ensure no other Coordinator leader exists.
485+
if (leaderAndIsr.coordinatorEpoch() != currentCoordinatorEpoch) {
486+
throw new InvalidCoordinatorException(
487+
String.format(
488+
"LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
489+
+ "This coordinator may no longer be the leader.",
490+
leaderAndIsr.coordinatorEpoch(), currentCoordinatorEpoch, tableBucket));
491+
}
492+
474493
String path = LeaderAndIsrZNode.path(tableBucket);
475494
zkClient.setData().forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr));
476495
LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
477496
}
478497

479-
public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> leaderAndIsrList)
498+
public void batchUpdateLeaderAndIsr(
499+
Map<TableBucket, LeaderAndIsr> leaderAndIsrList, int currentCoordinatorEpoch)
480500
throws Exception {
481501
if (leaderAndIsrList.isEmpty()) {
482502
return;
@@ -487,6 +507,17 @@ public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> leaderAndIsrL
487507
TableBucket tableBucket = entry.getKey();
488508
LeaderAndIsr leaderAndIsr = entry.getValue();
489509

510+
// check coordinator epoch to ensure no other Coordinator leader exists.
511+
if (leaderAndIsr.coordinatorEpoch() != currentCoordinatorEpoch) {
512+
throw new InvalidCoordinatorException(
513+
String.format(
514+
"LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
515+
+ "This coordinator may no longer be the leader.",
516+
leaderAndIsr.coordinatorEpoch(),
517+
currentCoordinatorEpoch,
518+
tableBucket));
519+
}
520+
490521
LOG.info("Batch Update {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
491522
String path = LeaderAndIsrZNode.path(tableBucket);
492523
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ void testCoordinatorServerElection() throws Exception {
8282

8383
CoordinatorAddress firstLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get();
8484

85-
// Find the leader
86-
// and try to close it.
85+
// Find the leader and try to close it.
8786
CoordinatorServer elected = null;
8887
for (CoordinatorServer coordinatorServer : coordinatorServerList) {
8988
if (coordinatorServer.getServerId() == firstLeaderAddress.getId()) {
@@ -92,6 +91,8 @@ void testCoordinatorServerElection() throws Exception {
9291
}
9392
}
9493
assertThat(elected).isNotNull();
94+
assertThat(zookeeperClient.getCurrentEpoch().f0)
95+
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH);
9596
elected.close();
9697
elected.start();
9798

@@ -100,6 +101,8 @@ void testCoordinatorServerElection() throws Exception {
100101
CoordinatorAddress secondLeaderAddress =
101102
zookeeperClient.getCoordinatorLeaderAddress().get();
102103
assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress);
104+
assertThat(zookeeperClient.getCurrentEpoch().f0)
105+
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 1);
103106

104107
// kill other 2 coordinator servers
105108
for (CoordinatorServer coordinatorServer : coordinatorServerList) {
@@ -112,6 +115,8 @@ void testCoordinatorServerElection() throws Exception {
112115
CoordinatorAddress thirdLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get();
113116

114117
assertThat(thirdLeaderAddress.getId()).isEqualTo(firstLeaderAddress.getId());
118+
assertThat(zookeeperClient.getCurrentEpoch().f0)
119+
.isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 2);
115120
}
116121

117122
/** Create a configuration with Zookeeper address setting. */

fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.fluss.metadata.TableDescriptor;
2828
import org.apache.fluss.metadata.TablePartition;
2929
import org.apache.fluss.metadata.TablePath;
30+
import org.apache.fluss.server.coordinator.CoordinatorContext;
3031
import org.apache.fluss.server.entity.RegisterTableBucketLeadAndIsrInfo;
3132
import org.apache.fluss.server.zk.data.BucketAssignment;
3233
import org.apache.fluss.server.zk.data.BucketSnapshot;
@@ -181,8 +182,8 @@ void testLeaderAndIsr() throws Exception {
181182
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket2)).isEmpty();
182183

183184
// try to register bucket leaderAndIsr
184-
LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2, 3), 100, 1000);
185-
LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4, 5, 6), 100, 1000);
185+
LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2, 3), 0, 1000);
186+
LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4, 5, 6), 0, 1000);
186187

187188
zookeeperClient.registerLeaderAndIsr(tableBucket1, leaderAndIsr1);
188189
zookeeperClient.registerLeaderAndIsr(tableBucket2, leaderAndIsr2);
@@ -192,8 +193,9 @@ void testLeaderAndIsr() throws Exception {
192193
.containsValues(leaderAndIsr1, leaderAndIsr2);
193194

194195
// test update
195-
leaderAndIsr1 = new LeaderAndIsr(2, 20, Collections.emptyList(), 200, 2000);
196-
zookeeperClient.updateLeaderAndIsr(tableBucket1, leaderAndIsr1);
196+
leaderAndIsr1 = new LeaderAndIsr(2, 20, Collections.emptyList(), 0, 2000);
197+
zookeeperClient.updateLeaderAndIsr(
198+
tableBucket1, leaderAndIsr1, CoordinatorContext.INITIAL_COORDINATOR_EPOCH);
197199
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).hasValue(leaderAndIsr1);
198200

199201
// test delete
@@ -210,7 +212,7 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep
210212
TableBucket tableBucket =
211213
isPartitionTable ? new TableBucket(1, 2L, i) : new TableBucket(1, i);
212214
LeaderAndIsr leaderAndIsr =
213-
new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000);
215+
new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 0, 1000);
214216
leaderAndIsrList.add(leaderAndIsr);
215217
RegisterTableBucketLeadAndIsrInfo info =
216218
isPartitionTable
@@ -251,7 +253,8 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep
251253
entry.setValue(adjustLeaderAndIsr);
252254
});
253255
// batch update
254-
zookeeperClient.batchUpdateLeaderAndIsr(updateMap);
256+
zookeeperClient.batchUpdateLeaderAndIsr(
257+
updateMap, CoordinatorContext.INITIAL_COORDINATOR_EPOCH);
255258
for (int i = 0; i < 100; i++) {
256259
// each should update successful
257260
Optional<LeaderAndIsr> optionalLeaderAndIsr =
@@ -270,7 +273,7 @@ void testBatchUpdateLeaderAndIsr() throws Exception {
270273
for (int i = 0; i < totalCount; i++) {
271274
TableBucket tableBucket = new TableBucket(1, i);
272275
LeaderAndIsr leaderAndIsr =
273-
new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000);
276+
new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 0, 1000);
274277
leaderAndIsrList.put(tableBucket, leaderAndIsr);
275278
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
276279
}
@@ -287,10 +290,11 @@ void testBatchUpdateLeaderAndIsr() throws Exception {
287290
old.leader() + 1,
288291
old.leaderEpoch() + 1,
289292
old.isr(),
290-
old.coordinatorEpoch() + 1,
293+
old.coordinatorEpoch(),
291294
old.bucketEpoch() + 1);
292295
}));
293-
zookeeperClient.batchUpdateLeaderAndIsr(updateLeaderAndIsrList);
296+
zookeeperClient.batchUpdateLeaderAndIsr(
297+
updateLeaderAndIsrList, CoordinatorContext.INITIAL_COORDINATOR_EPOCH);
294298
for (Map.Entry<TableBucket, LeaderAndIsr> entry : updateLeaderAndIsrList.entrySet()) {
295299
TableBucket tableBucket = entry.getKey();
296300
LeaderAndIsr leaderAndIsr = entry.getValue();

0 commit comments

Comments
 (0)