Skip to content

Commit 393d01a

Browse files
committed
support coordinator epoch
1 parent 3865de1 commit 393d01a

File tree

10 files changed

+148
-9
lines changed

10 files changed

+148
-9
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class CoordinatorContext {
5454
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorContext.class);
5555

5656
public static final int INITIAL_COORDINATOR_EPOCH = 0;
57+
public static final int INITIAL_COORDINATOR_EPOCH_ZKVERSION = 0;
5758

5859
// for simplicity, we just use retry time, may consider make it a configurable value
5960
// and use combine retry times and retry delay
@@ -105,13 +106,23 @@ public class CoordinatorContext {
105106

106107
private ServerInfo coordinatorServerInfo = null;
107108
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
109+
private int coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZKVERSION;
108110

109111
public CoordinatorContext() {}
110112

111113
public int getCoordinatorEpoch() {
112114
return coordinatorEpoch;
113115
}
114116

117+
public int getCoordinatorEpochZkVersion() {
118+
return coordinatorEpochZkVersion;
119+
}
120+
121+
public void setCoordinatorEpochAndZkVersion(int newEpoch, int newZkVersion) {
122+
this.coordinatorEpoch = newEpoch;
123+
this.coordinatorEpochZkVersion = newZkVersion;
124+
}
125+
115126
public Set<Integer> getLiveCoordinatorServers() {
116127
return liveCoordinatorServers;
117128
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,44 @@
1818

1919
package org.apache.fluss.server.coordinator;
2020

21+
import org.apache.fluss.exception.CoordinatorEpochFencedException;
22+
import org.apache.fluss.server.zk.ZooKeeperClient;
2123
import org.apache.fluss.server.zk.data.ZkData;
22-
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
2324
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch;
2425
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
2526

2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
2829

2930
import java.io.IOException;
31+
import java.util.Optional;
3032
import java.util.concurrent.atomic.AtomicBoolean;
3133

3234
/** Using by coordinator server. Coordinator servers listen ZK node and elect leadership. */
3335
public class CoordinatorLeaderElection implements AutoCloseable {
3436
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLeaderElection.class);
3537

3638
private final int serverId;
39+
private final ZooKeeperClient zkClient;
40+
private final CoordinatorContext coordinatorContext;
3741
private final LeaderLatch leaderLatch;
42+
private final CoordinatorServer server;
3843
private final AtomicBoolean isLeader = new AtomicBoolean(false);
3944

40-
public CoordinatorLeaderElection(CuratorFramework zkClient, int serverId) {
45+
public CoordinatorLeaderElection(
46+
ZooKeeperClient zkClient,
47+
int serverId,
48+
CoordinatorContext coordinatorContext,
49+
CoordinatorServer server) {
4150
this.serverId = serverId;
51+
this.zkClient = zkClient;
52+
this.coordinatorContext = coordinatorContext;
53+
this.server = server;
4254
this.leaderLatch =
4355
new LeaderLatch(
44-
zkClient, ZkData.CoordinatorElectionZNode.path(), String.valueOf(serverId));
56+
zkClient.getCuratorClient(),
57+
ZkData.CoordinatorElectionZNode.path(),
58+
String.valueOf(serverId));
4559
}
4660

4761
public void startElectLeader(Runnable initLeaderServices) {
@@ -51,10 +65,28 @@ public void startElectLeader(Runnable initLeaderServices) {
5165
public void isLeader() {
5266
LOG.info("Coordinator server {} has become the leader.", serverId);
5367
isLeader.set(true);
68+
try {
69+
// to avoid split-brain
70+
Optional<Integer> optionalEpoch =
71+
zkClient.fenceBecomeCoordinatorLeader(serverId);
72+
if (optionalEpoch.isPresent()) {
73+
coordinatorContext.setCoordinatorEpochAndZkVersion(
74+
optionalEpoch.get(),
75+
coordinatorContext.getCoordinatorEpochZkVersion() + 1);
76+
} else {
77+
throw new CoordinatorEpochFencedException(
78+
"Fenced to become coordinator leader.");
79+
}
80+
} catch (Exception e) {
81+
relinquishLeadership();
82+
throw new CoordinatorEpochFencedException(
83+
"Fenced to become coordinator leader.");
84+
}
5485
}
5586

5687
@Override
5788
public void notLeader() {
89+
relinquishLeadership();
5890
LOG.warn("Coordinator server {} has lost the leadership.", serverId);
5991
isLeader.set(false);
6092
}
@@ -87,4 +119,16 @@ public void close() throws IOException {
87119
public boolean isLeader() {
88120
return this.isLeader.get();
89121
}
122+
123+
private void relinquishLeadership() {
124+
isLeader.set(false);
125+
LOG.info("Coordinator server {} has been fenced.", serverId);
126+
127+
try {
128+
leaderLatch.close();
129+
server.closeAsync();
130+
leaderLatch.start();
131+
} catch (Exception e) {
132+
}
133+
}
90134
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,7 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() {
670670
// tablet servers.
671671
return makeUpdateMetadataRequest(
672672
coordinatorContext.getCoordinatorServerInfo(),
673+
coordinatorContext.getCoordinatorEpoch(),
673674
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
674675
tableMetadataList,
675676
partitionMetadataList);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ public static void main(String[] args) {
158158

159159
@Override
160160
protected void startServices() throws Exception {
161+
this.coordinatorContext = new CoordinatorContext();
161162
electCoordinatorLeader();
162163
}
163164

@@ -167,16 +168,13 @@ private void electCoordinatorLeader() throws Exception {
167168
// Coordinator Server supports high availability. If 3 coordinator servers are alive,
168169
// one of them will be elected as leader and the other two will be standby.
169170
// When leader fails, one of standby coordinators will be elected as new leader.
170-
// All of them register to ZK like tablet servers in path
171-
// "/coordinators/ids/1","/coordinators/ids/2","/coordinators/ids/3".
172-
// but the leader will be elected in path "/coordinators/leader" additionally.
173171
registerCoordinatorServer();
174172
ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
175173
zkClient, this::registerCoordinatorServer, this);
176174

177175
// standby
178176
CoordinatorLeaderElection coordinatorLeaderElection =
179-
new CoordinatorLeaderElection(zkClient.getCuratorClient(), serverId);
177+
new CoordinatorLeaderElection(zkClient, serverId, coordinatorContext, this);
180178
coordinatorLeaderElection.startElectLeader(
181179
() -> {
182180
try {
@@ -207,7 +205,6 @@ protected void startCoordinatorLeaderService() throws Exception {
207205
dynamicConfigManager.register(lakeCatalogDynamicLoader);
208206
dynamicConfigManager.startup();
209207

210-
this.coordinatorContext = new CoordinatorContext();
211208
this.metadataCache = new CoordinatorMetadataCache();
212209

213210
this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager);

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ public static MetadataResponse buildMetadataResponse(
312312

313313
public static UpdateMetadataRequest makeUpdateMetadataRequest(
314314
@Nullable ServerInfo coordinatorServer,
315+
@Nullable Integer coordinatorEpoch,
315316
Set<ServerInfo> aliveTableServers,
316317
List<TableMetadata> tableMetadataList,
317318
List<PartitionMetadata> partitionMetadataList) {
@@ -354,6 +355,9 @@ public static UpdateMetadataRequest makeUpdateMetadataRequest(
354355
updateMetadataRequest.addAllTableMetadatas(pbTableMetadataList);
355356
updateMetadataRequest.addAllPartitionMetadatas(pbPartitionMetadataList);
356357

358+
if (coordinatorEpoch != null) {
359+
updateMetadataRequest.setCoordinatorEpoch(coordinatorEpoch);
360+
}
357361
return updateMetadataRequest;
358362
}
359363

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.fluss.security.acl.Resource;
3232
import org.apache.fluss.security.acl.ResourceType;
3333
import org.apache.fluss.server.authorizer.DefaultAuthorizer.VersionedAcls;
34+
import org.apache.fluss.server.coordinator.CoordinatorContext;
3435
import org.apache.fluss.server.entity.RegisterTableBucketLeadAndIsrInfo;
3536
import org.apache.fluss.server.metadata.BucketMetadata;
3637
import org.apache.fluss.server.zk.ZkAsyncRequest.ZkGetChildrenRequest;
@@ -168,6 +169,49 @@ public void registerCoordinatorServer(int coordinatorId) throws Exception {
168169
LOG.info("Registered Coordinator server {} at path {}.", coordinatorId, path);
169170
}
170171

172+
/**
173+
* Become coordinator leader. This method is a step after electCoordinatorLeader() and before
174+
* registerCoordinatorLeader(). This is to ensure the coordinator get and update the coordinator
175+
* epoch and coordinator epoch zk version.
176+
*/
177+
public Optional<Integer> fenceBecomeCoordinatorLeader(int coordinatorId) throws Exception {
178+
try {
179+
ensureEpochZnodeExists();
180+
181+
try {
182+
Stat currentStat = new Stat();
183+
byte[] bytes =
184+
zkClient.getData()
185+
.storingStatIn(currentStat)
186+
.forPath(ZkData.CoordinatorEpochZNode.path());
187+
int currentEpoch = ZkData.CoordinatorEpochZNode.decode(bytes);
188+
int currentVersion = currentStat.getVersion();
189+
int newEpoch = currentEpoch + 1;
190+
LOG.info(
191+
"Coordinator leader {} tries to update epoch. Current epoch={}, Zookeeper version={}, new epoch={}",
192+
coordinatorId,
193+
currentEpoch,
194+
currentVersion,
195+
newEpoch);
196+
197+
// atomically update epoch
198+
zkClient.setData()
199+
.withVersion(currentVersion)
200+
.forPath(
201+
ZkData.CoordinatorEpochZNode.path(),
202+
ZkData.CoordinatorEpochZNode.encode(newEpoch));
203+
204+
return Optional.of(newEpoch);
205+
} catch (KeeperException.BadVersionException e) {
206+
// Other coordinator leader has updated epoch.
207+
// If this happens, it means our fence is in effect.
208+
LOG.info("Coordinator leader {} failed to update epoch.", coordinatorId);
209+
}
210+
} catch (KeeperException.NodeExistsException e) {
211+
}
212+
return Optional.empty();
213+
}
214+
171215
/** Register a coordinator leader to ZK. */
172216
public void registerCoordinatorLeader(CoordinatorAddress coordinatorAddress) throws Exception {
173217
String path = ZkData.CoordinatorLeaderZNode.path();
@@ -181,7 +225,6 @@ public void registerCoordinatorLeader(CoordinatorAddress coordinatorAddress) thr
181225
/** Get the leader address registered in ZK. */
182226
public Optional<CoordinatorAddress> getCoordinatorLeaderAddress() throws Exception {
183227
Optional<byte[]> bytes = getOrEmpty(ZkData.CoordinatorLeaderZNode.path());
184-
// return bytes.map(CoordinatorZNode::decode);
185228
return bytes.map(
186229
data ->
187230
// maybe a empty node when a leader is elected but not registered
@@ -194,6 +237,23 @@ public int[] getCoordinatorServerList() throws Exception {
194237
return coordinatorServers.stream().mapToInt(Integer::parseInt).toArray();
195238
}
196239

240+
/** Ensure epoch znode exists. */
241+
public void ensureEpochZnodeExists() throws Exception {
242+
String path = ZkData.CoordinatorEpochZNode.path();
243+
if (zkClient.checkExists().forPath(path) == null) {
244+
try {
245+
zkClient.create()
246+
.creatingParentsIfNeeded()
247+
.withMode(CreateMode.PERSISTENT)
248+
.forPath(
249+
path,
250+
ZkData.CoordinatorEpochZNode.encode(
251+
CoordinatorContext.INITIAL_COORDINATOR_EPOCH));
252+
} catch (KeeperException.NodeExistsException e) {
253+
}
254+
}
255+
}
256+
197257
// --------------------------------------------------------------------------------------------
198258
// Tablet server
199259
// --------------------------------------------------------------------------------------------

fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,25 @@ public static CoordinatorAddress decode(byte[] json) {
300300
return JsonSerdeUtils.readValue(json, CoordinatorAddressJsonSerde.INSTANCE);
301301
}
302302
}
303+
304+
/**
305+
* The znode for the coordinator epoch. The znode path is:
306+
*
307+
* <p>/coordinators/epoch
308+
*/
309+
public static final class CoordinatorEpochZNode {
310+
public static String path() {
311+
return "/coordinators/epoch";
312+
}
313+
314+
public static byte[] encode(int epoch) {
315+
return String.valueOf(epoch).getBytes();
316+
}
317+
318+
public static int decode(byte[] bytes) {
319+
return Integer.parseInt(new String(bytes));
320+
}
321+
}
303322
// ------------------------------------------------------------------------------------------
304323
// ZNodes under "/tabletservers/"
305324
// ------------------------------------------------------------------------------------------

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ private void checkSendRequest(
9393
// we use update metadata request to test for simplicity
9494
UpdateMetadataRequest updateMetadataRequest =
9595
makeUpdateMetadataRequest(
96+
null,
9697
null,
9798
Collections.emptySet(),
9899
Collections.emptyList(),

fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,7 @@ void testMetadata(boolean isCoordinatorServer) throws Exception {
495495
.updateMetadata(
496496
makeUpdateMetadataRequest(
497497
coordinatorServerInfo,
498+
null,
498499
new HashSet<>(tabletServerInfos),
499500
Collections.emptyList(),
500501
Collections.emptyList()))

fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,7 @@ void testBecomeLeaderOrFollowerWithOneTabletServerOffline() throws Exception {
880880
.updateMetadata(
881881
makeUpdateMetadataRequest(
882882
coordinatorServerInfo,
883+
null,
883884
newTabletServerInfos,
884885
Collections.emptyList(),
885886
Collections.emptyList()))

0 commit comments

Comments
 (0)