Skip to content

Commit 5ea5a34

Browse files
feat: Add shutdown watchdog configuration and tests
- Introduced a new configuration property `kyuubi.session.engine.shutdown.watchdog.timeout` to manage the maximum wait time for engine shutdown. - Updated the `SparkSQLEngine` to utilize the new watchdog feature. - Minor adjustments to existing configurations and documentation to reflect the new feature.
1 parent 5f4b1f0 commit 5ea5a34

5 files changed

Lines changed: 630 additions & 5 deletions

File tree

docs/configuration/settings.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
494494
| kyuubi.session.engine.open.onFailure | RETRY | The behavior when opening engine failed: <ul> <li>RETRY: retry to open engine for kyuubi.session.engine.open.max.attempts times.</li> <li>DEREGISTER_IMMEDIATELY: deregister the engine immediately.</li> <li>DEREGISTER_AFTER_RETRY: deregister the engine after retry to open engine for kyuubi.session.engine.open.max.attempts times.</li></ul> | string | 1.8.1 |
495495
| kyuubi.session.engine.open.retry.wait | PT10S | How long to wait before retrying to open the engine after failure. | duration | 1.7.0 |
496496
| kyuubi.session.engine.share.level | USER | (deprecated) - Using kyuubi.engine.share.level instead | string | 1.0.0 |
497+
| kyuubi.session.engine.shutdown.watchdog.timeout | PT1M | The maximum time to wait for the engine to shutdown gracefully before forcing termination. When an engine shutdown is initiated, this watchdog timer starts counting down. If the engine doesn't complete shutdown within this timeout period, it will be forcefully terminated to prevent hanging. Set to 0 or a negative value to disable the forced shutdown mechanism. | duration | 1.11.0 |
497498
| kyuubi.session.engine.spark.initialize.sql || The initialize sql for Spark session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 |
498499
| kyuubi.session.engine.spark.main.resource | &lt;undefined&gt; | The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default | string | 1.0.0 |
499500
| kyuubi.session.engine.spark.max.initial.wait | PT1M | Max wait time for the initial connection to Spark engine. The engine will self-terminate no new incoming connection is established within this time. This setting only applies at the CONNECTION share level. 0 or negative means not to self-terminate. | duration | 1.8.0 |

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark
2020
import java.time.Instant
2121
import java.util.{Locale, UUID}
2222
import java.util.concurrent.{CountDownLatch, ScheduledExecutorService, ThreadPoolExecutor, TimeUnit}
23-
import java.util.concurrent.atomic.AtomicBoolean
23+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
2424

2525
import scala.concurrent.duration.Duration
2626
import scala.util.control.NonFatal
@@ -45,17 +45,17 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._
4545
import org.apache.kyuubi.ha.client.RetryPolicies
4646
import org.apache.kyuubi.service.Serverable
4747
import org.apache.kyuubi.session.SessionHandle
48-
import org.apache.kyuubi.util.{JavaUtils, SignalRegister, ThreadUtils}
48+
import org.apache.kyuubi.util.{JavaUtils, SignalRegister, ThreadDumpUtils, ThreadUtils}
4949
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
50-
5150
case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {
52-
5351
override val backendService = new SparkSQLBackendService(spark)
5452
override val frontendServices = Seq(new SparkTBinaryFrontendService(this))
5553

5654
private val shutdown = new AtomicBoolean(false)
5755
private val gracefulStopDeregistered = new AtomicBoolean(false)
58-
56+
@volatile private var watchdogThreadRef: AtomicReference[Thread] = new AtomicReference[Thread]()
57+
private val EMERGENCY_SHUTDOWN_EXIT_CODE = 99
58+
private val WATCHDOG_ERROR_EXIT_CODE = 98
5959
@volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None
6060
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
6161
private lazy val engineSavePath =
@@ -98,6 +98,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
9898
}
9999

100100
override def stop(): Unit = if (shutdown.compareAndSet(false, true)) {
101+
startShutdownWatchdog()
101102
super.stop()
102103
lifetimeTerminatingChecker.foreach(checker => {
103104
val shutdownTimeout = conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
@@ -121,6 +122,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
121122
}
122123

123124
def gracefulStop(): Unit = if (gracefulStopDeregistered.compareAndSet(false, true)) {
125+
startShutdownWatchdog()
124126
val stopTask: Runnable = () => {
125127
if (!shutdown.get) {
126128
info(s"Spark engine is de-registering from engine discovery space.")
@@ -212,6 +214,76 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
212214
TimeUnit.MILLISECONDS)
213215
}
214216
}
217+
218+
/**
219+
* Starts a shutdown watchdog thread as a failsafe mechanism.
220+
*
221+
* This thread monitors the shutdown process and will forcefully terminate
222+
* the JVM if graceful shutdown takes too long. This prevents zombie processes
223+
* caused by non-daemon threads that refuse to terminate.
224+
*/
225+
private def startShutdownWatchdog(): Unit = {
226+
if (org.apache.kyuubi.Utils.isTesting) {
227+
info("Shutdown Watchdog is disabled in test mode.")
228+
return
229+
}
230+
231+
val shutdownWatchdogTimeout = conf.get(ENGINE_SHUTDOWN_WATCHDOG_TIMEOUT)
232+
if (shutdownWatchdogTimeout <= 0) {
233+
info("Shutdown Watchdog is disabled (timeout <= 0).")
234+
return
235+
}
236+
237+
// Prevent multiple watchdog threads
238+
watchdogThreadRef.synchronized {
239+
if (watchdogThreadRef.get() != null) {
240+
warn("Shutdown Watchdog is already running, ignoring duplicate start request")
241+
return
242+
}
243+
}
244+
245+
info(s"Shutdown Watchdog activated. Engine will be forcefully terminated if graceful " +
246+
s"shutdown exceeds ${shutdownWatchdogTimeout} ms.")
247+
248+
val watchdogThread = new Thread("shutdown-watchdog") {
249+
override def run(): Unit = {
250+
debug("Shutdown Watchdog thread started, monitoring graceful shutdown process")
251+
try {
252+
TimeUnit.MILLISECONDS.sleep(shutdownWatchdogTimeout)
253+
254+
error(s"EMERGENCY SHUTDOWN TRIGGERED")
255+
error(s"Graceful shutdown exceeded ${shutdownWatchdogTimeout} ms timeout")
256+
error(s"Non-daemon threads are preventing JVM exit")
257+
error(s"Initiating forced termination...")
258+
259+
// Thread dump for diagnostics
260+
error(s"=== THREAD DUMP FOR DIAGNOSTIC ===")
261+
ThreadDumpUtils.dumpToLogger(logger)
262+
error(s"=== END OF THREAD DUMP ===")
263+
264+
error(s"Forcefully terminating JVM now...")
265+
System.exit(EMERGENCY_SHUTDOWN_EXIT_CODE)
266+
267+
} catch {
268+
case _: InterruptedException =>
269+
warn("Shutdown Watchdog: Normal shutdown detected, watchdog exiting.")
270+
case t: Throwable =>
271+
error(
272+
s"Shutdown Watchdog error: ${t.getClass.getSimpleName}: ${t.getMessage}")
273+
t.printStackTrace(System.err)
274+
error("Proceeding with emergency termination...")
275+
System.exit(WATCHDOG_ERROR_EXIT_CODE) // Watchdog error
276+
}
277+
}
278+
}
279+
280+
watchdogThread.setDaemon(true)
281+
watchdogThread.start()
282+
watchdogThreadRef.set(watchdogThread)
283+
284+
debug(s"Shutdown Watchdog thread started: ${watchdogThread.getName}")
285+
}
286+
215287
}
216288

217289
object SparkSQLEngine extends Logging {
@@ -407,6 +479,7 @@ object SparkSQLEngine extends Logging {
407479
startEngine(spark)
408480
// blocking main thread
409481
countDownLatch.await()
482+
currentEngine.foreach(_.startShutdownWatchdog())
410483
} catch {
411484
case e: KyuubiException =>
412485
currentEngine match {

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1784,6 +1784,17 @@ object KyuubiConf {
17841784
.timeConf
17851785
.createWithDefault(Duration.ofMinutes(30L).toMillis)
17861786

1787+
val ENGINE_SHUTDOWN_WATCHDOG_TIMEOUT: ConfigEntry[Long] =
1788+
buildConf("kyuubi.session.engine.shutdown.watchdog.timeout")
1789+
.doc("The maximum time to wait for the engine to shutdown gracefully before " +
1790+
"forcing termination. When an engine shutdown is initiated, this watchdog " +
1791+
"timer starts counting down. If the engine doesn't complete shutdown within " +
1792+
"this timeout period, it will be forcefully terminated to prevent hanging. " +
1793+
"Set to 0 or a negative value to disable the forced shutdown mechanism.")
1794+
.version("1.11.0")
1795+
.timeConf
1796+
.createWithDefault(Duration.ofMinutes(1L).toMillis)
1797+
17871798
val SESSION_CONF_IGNORE_LIST: ConfigEntry[Set[String]] =
17881799
buildConf("kyuubi.session.conf.ignore.list")
17891800
.doc("A comma-separated list of ignored keys. If the client connection contains any of" +

0 commit comments

Comments
 (0)