Skip to content

Commit 860538f

Browse files
committed
fix
1 parent e3b108c commit 860538f

4 files changed

Lines changed: 52 additions & 29 deletions

File tree

docs/configuration/ha.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ license: |
2020
| Key | Default | isDynamic | Description | Since | Deprecated |
2121
| --- | ------- | --------- | ----------- | ----- | ---------- |
2222
| celeborn.master.ha.enabled | false | false | When true, master nodes run as Raft cluster mode. | 0.3.0 | celeborn.ha.enabled |
23-
| celeborn.master.ha.gracefulShutdown.enabled | false | false | When true, the master will run a shutdown hook and transfer Raft leadership before shutting down. This reduces chances of client side failures by avoiding the Raft election window where no leader is available. | 0.7.0 | |
23+
| celeborn.master.ha.gracefulShutdown.enabled | false | false | When true, the master will transfer Raft leadership before shutting down gracefully. This reduces chances of client side failures by avoiding the Raft election window where no leader is available. | 0.7.0 | |
2424
| celeborn.master.ha.node.&lt;id&gt;.host | &lt;required&gt; | false | Host to bind of master node <id> in HA mode. | 0.3.0 | celeborn.ha.master.node.&lt;id&gt;.host |
2525
| celeborn.master.ha.node.&lt;id&gt;.internal.port | 8097 | false | Internal port for the workers and other masters to bind to a master node <id> in HA mode. | 0.5.0 | |
2626
| celeborn.master.ha.node.&lt;id&gt;.port | 9097 | false | Port to bind of master node <id> in HA mode. | 0.3.0 | celeborn.ha.master.node.&lt;id&gt;.port |

master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.*;
2626
import java.util.concurrent.ScheduledExecutorService;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.concurrent.locks.ReentrantReadWriteLock;
2930

3031
import javax.net.ssl.KeyManager;
@@ -99,6 +100,7 @@ public class HARaftServer {
99100
private Optional<LeaderPeerEndpoints> cachedLeaderPeerRpcEndpoints = Optional.empty();
100101

101102
private final CelebornConf conf;
103+
private final AtomicBoolean stopped = new AtomicBoolean(false);
102104
private long workerTimeoutDeadline;
103105
private long appTimeoutDeadline;
104106

@@ -274,8 +276,16 @@ public void start() throws IOException {
274276
}
275277

276278
public void stop() {
279+
stop(false);
280+
}
281+
282+
public void stop(boolean transferLeadership) {
283+
if (!stopped.compareAndSet(false, true)) {
284+
LOG.info("Raft server {} already stopped.", server.getId());
285+
return;
286+
}
277287
try {
278-
if (isLeader()) {
288+
if (transferLeadership && isLeader()) {
279289
LOG.info(
280290
"This node {} is the Raft leader. Transferring leadership before shutdown.",
281291
server.getId());
@@ -647,16 +657,13 @@ boolean stepDown() {
647657
REQUEST_TIMEOUT_MS);
648658
RaftClientReply reply = server.transferLeadership(request);
649659
if (reply.isSuccess()) {
650-
LOG.info("Successfully step down leader {}.", server.getId());
651660
return true;
652-
} else {
653-
LOG.warn("Step down leader failed!");
654-
return false;
655661
}
662+
LOG.warn("Step down leader {} failed.", server.getId());
656663
} catch (Exception e) {
657-
LOG.warn("Step down leader failed!", e);
658-
return false;
664+
LOG.warn("Step down leader {} failed.", server.getId(), e);
659665
}
666+
return false;
660667
}
661668

662669
public void setDeadlineTime(long increaseWorkerTime, long increaseAppTime) {

master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -378,26 +378,6 @@ private[celeborn] class Master(
378378
}
379379
logInfo("Stopping Celeborn Master.")
380380

381-
// Transfer Raft leadership before shutting down so other masters can
382-
// immediately take over without waiting for heartbeat timeout
383-
if (conf.haMasterGracefulShutdownEnabled) {
384-
statusSystem match {
385-
case ha: HAMasterMetaManager =>
386-
val ratisServer = ha.getRatisServer
387-
if (ratisServer != null) {
388-
logInfo("HA graceful shutdown enabled. Stopping Raft server (will transfer leadership if leader).")
389-
try {
390-
ratisServer.stop()
391-
} catch {
392-
case e: Exception =>
393-
logError("Failed to stop Raft server during Master shutdown.", e)
394-
}
395-
}
396-
case _ =>
397-
logInfo("Single-master mode, skipping Raft shutdown.")
398-
}
399-
}
400-
401381
Option(checkForWorkerTimeoutTask).foreach(_.cancel(true))
402382
Option(checkForUnavailableWorkerTimeOutTask).foreach(_.cancel(true))
403383
Option(checkForApplicationTimeOutTask).foreach(_.cancel(true))
@@ -1607,6 +1587,22 @@ private[celeborn] class Master(
16071587
override def stop(exitKind: Int): Unit = synchronized {
16081588
if (!stopped) {
16091589
logInfo("Stopping Master")
1590+
// Transfer Raft leadership before shutting down so other masters can
1591+
// immediately take over without waiting for heartbeat timeout.
1592+
val transferLeadership = conf.haMasterGracefulShutdownEnabled
1593+
statusSystem match {
1594+
case ha: HAMasterMetaManager =>
1595+
val ratisServer = ha.getRatisServer
1596+
if (ratisServer != null) {
1597+
try {
1598+
ratisServer.stop(transferLeadership)
1599+
} catch {
1600+
case e: Exception =>
1601+
logError("Failed to stop Raft server during Master shutdown.", e)
1602+
}
1603+
}
1604+
case _ => // single-master mode, no Raft server to stop
1605+
}
16101606
rpcEnv.stop(self)
16111607
if (conf.internalPortEnabled) {
16121608
internalRpcEnvInUse.stop(internalRpcEndpointRef)

master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1892,11 +1892,31 @@ public void testGracefulLeaderShutdownStepDown() {
18921892
.getMasterStateMachine()
18931893
.notifyLogFailed(new Exception("test leader graceful step down"), null);
18941894

1895-
Assert.assertFalse("Leader should step down without closing the shared server", leader.isLeader());
1895+
Assert.assertFalse(
1896+
"Leader should step down without closing the shared server", leader.isLeader());
18961897
Assert.assertNotEquals(
18971898
"Raft server should remain available to the rest of the suite",
18981899
"CLOSED",
18991900
leader.getServer().getLifeCycleState().name());
1901+
1902+
// Wait for a new leader to be elected so subsequent tests are not affected.
1903+
boolean newLeaderElected = false;
1904+
for (int i = 0; i < 30; i++) {
1905+
for (HARaftServer server : Arrays.asList(RATISSERVER1, RATISSERVER2, RATISSERVER3)) {
1906+
if (server != leader && server.isLeader()) {
1907+
newLeaderElected = true;
1908+
break;
1909+
}
1910+
}
1911+
if (newLeaderElected) break;
1912+
try {
1913+
Thread.sleep(1000);
1914+
} catch (InterruptedException e) {
1915+
Thread.currentThread().interrupt();
1916+
break;
1917+
}
1918+
}
1919+
Assert.assertTrue("A new leader should be elected after step down", newLeaderElected);
19001920
}
19011921

19021922
@AfterClass

0 commit comments

Comments
 (0)