From e788098505e4ad0e46c8da4a8257033e26c73c8d Mon Sep 17 00:00:00 2001 From: "Oh, Hoon" Date: Wed, 29 Oct 2025 14:48:23 +0000 Subject: [PATCH 1/3] Merged PR 4876778: Deleting Kubernetes Pods stuck at FailedMount state. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- **Work Item:** #10502574 --- **Problem** -- Kyuubi driver pods get stuck at `FailedMount` error. When an application is submitted, `spark-submit` creates a driver pod and a configmap. Sometimes driver pod is created but the config map is not created. This would happen if the kyuubi dies in between those two steps. **Approach** -- We want to clean up driver pods if it is stuck at `FailedMount` stage. We will check all pending application and its pods. If a pod is stuck and `FailedMount` stage for more than certain count, we will delete the pod. This will remove all driver pods that are stuck. --- **Code Changes** -- **Three Configuration Added (`KyuubiConf.scala`)**: - `KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_ENABLED`: if true check and delete failed mount pods. - `KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_INTERVAL`: Interval to check failed mount stuck pods. Currently set to 1 hour. The check will be done every hour starting its deployment. - `KUBERNETES_POD_FAILED_MOUNT_LOOP_THRESHOLD`: If the count of `FailedMount` exceeds the threshold, then delete the pod. It is set to 720 which is roughly 24 hours. **Added Failed Mount Check in `KubernetesApplicationOperation.scala`**: Added `cleanupFailedMountLoopPodExecutor`. It runs every interval (set to 1 hour) to check failedMount pod. - Goes through all applications - If appInfo.state is PENDING get all pods - for each pod run `checkPodFailedMountLoop`. Added `checkPodFailedMountLoop` to delete pod. - get All events associated with this pod - Check if the event has "FailedMount" in it and the count is bigger than the Threshold - If it hasFailedMount with count exceeding the threshold, delete the pod. **Minor change** - Added `.metals` to `.gitignore`. (Cursor keeps creating `.metals` for scala projects). **Test** -- Tested in POC, it was able to detect and delete driver pods stuck at failedMount for long time. Log shows detection and deletion of the stuck driver pod (Newest event first). ![Pasted Graphic.png](https://geico.visualstudio.com/a9381017-9a49-48ba-968b-b91b8c491290/_apis/git/repositories/59e64e8f-fe99-4025-8d42-a666f09c4ed3/pullRequests/4876778/attachments/Pasted%20Graphic.png)  --- **Concern** -- **Performance** Failed Mount errors does not happen frequently, scanning all pending pods may be costly. Also note that it is getting all events, then filter with kyuubiId, (equivalent to `kubectl get events --field-selector involvedObject.kind=Pod,involvedObject.name=kyuubi-spark-def6721d-bff8-4b29-94c6-dd1708cfd596-driver`) If there are a lot of events, this may not be cheap. **False Positive** If a pod is stuck at FailedMount for 24 hours but somehow start working again, this code will still delete this pod, because the (past) event still exists. A pod should not be stuck for more than 24 hours in FailedMount stage (unless configMap was/is/will not create(d)). **Questions** -- - Is checking every hour good? Should I check less frequently to improve performance? - I... --- .../org/apache/kyuubi/config/KyuubiConf.scala | 28 ++++++ .../KubernetesApplicationOperation.scala | 90 +++++++++++++++++++ 2 files changed, 118 insertions(+) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index f489e8528dd..9fbff60aecb 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1441,6 +1441,34 @@ object KyuubiConf { .checkValues(KubernetesApplicationStateSource) .createWithDefault(KubernetesApplicationStateSource.POD.toString) + // GEICO: Configuration for checking and deleting pods stuck in FailedMount loop + // Kubernetes will retry ~every 2 minutes, + // Thus 30 count correspond to (30*2)/60 = 1 hours + val KUBERNETES_POD_FAILED_MOUNT_LOOP_THRESHOLD: ConfigEntry[Int] = + buildConf("kyuubi.kubernetes.pod.failed.mount.loop.threshold") + .serverOnly + .doc("The threshold for the number of failed mount loop to trigger pod deletion") + .version("1.11.0") + .intConf + .createWithDefault(30) + + val KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_ENABLED: ConfigEntry[Boolean] = + buildConf("kyuubi.kubernetes.pod.failed.mount.loop.check.enabled") + .serverOnly + .doc("Whether to periodically check Pending pods for FailedMount " + + "loops and delete them when exceeding the threshold.") + .version("1.11.0") + .booleanConf + .createWithDefault(true) + + val KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_INTERVAL: ConfigEntry[Long] = + buildConf("kyuubi.kubernetes.pod.failed.mount.loop.check.interval") + .serverOnly + .doc("Interval for periodically checking Pending pods for FailedMount loops.") + .version("1.11.0") + .timeConf + .createWithDefault(60 * 60 * 1000) + object KubernetesApplicationStateSource extends Enumeration { type KubernetesApplicationStateSource = Value val POD, CONTAINER = Value diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 0160f9077ba..825f821ea8d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -60,6 +60,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private def allowedNamespaces: Set[String] = kyuubiConf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST) + private def failedMountLoopThreshold: Int = + kyuubiConf.get(KyuubiConf.KUBERNETES_POD_FAILED_MOUNT_LOOP_THRESHOLD) + private def appStateSource: KubernetesApplicationStateSource = KubernetesApplicationStateSource.withName( kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_SOURCE)) @@ -84,6 +87,10 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _ private var kubernetesClientInitializeCleanupTerminatedPodExecutor: ThreadPoolExecutor = _ + + private var cleanupFailedMountLoopPodExecutor: ThreadPoolExecutor = _ + + private var failedMountLoopPeriodicChecker: ScheduledExecutorService = _ private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = { checkKubernetesInfo(kubernetesInfo) @@ -213,7 +220,50 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { kubernetesClientInitializeCleanupTerminatedPodExecutor = ThreadUtils.newDaemonCachedThreadPool( "kubernetes-client-initialize-cleanup-terminated-pod-thread") + cleanupFailedMountLoopPodExecutor = ThreadUtils.newDaemonCachedThreadPool( + "cleanup-failed-mount-loop-pod-thread") initializeKubernetesClient(kyuubiConf) + + // GEICO: start a periodic FailedMount checker for Pending pods + if (kyuubiConf.get(KyuubiConf.KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_ENABLED)) { + val interval = kyuubiConf.get(KyuubiConf.KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_INTERVAL) + failedMountLoopPeriodicChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "failed-mount-loop-periodic-checker") + ThreadUtils.scheduleTolerableRunnableWithFixedDelay( + failedMountLoopPeriodicChecker, + new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + try { + // Iterate over known applications and check Pending pods for FailedMount loops + appInfoStore.asScala.foreach { case (_, (kubernetesInfo, appInfo)) => + if (appInfo.state == ApplicationState.PENDING) { + val client = getOrCreateKubernetesClient(kubernetesInfo) + val podNameOpt = appInfo.podName + val podOpt = podNameOpt + .flatMap { name => Option(client.pods().withName(name).get()) } + .orElse { + Option(client.pods() + .withLabel(LABEL_KYUUBI_UNIQUE_KEY, appInfo.name) + .list()) + .flatMap(l => Option(l.getItems)).map(_.asScala.headOption).flatten + } + podOpt.foreach { pod => + // Reuse the existing logic through checkPodFailedMountLoop + checkPodFailedMountLoop( + kubernetesInfo, + pod) + } + } + } + } catch { + case NonFatal(e) => warn("Periodic FailedMount checker encountered an error", e) + } + } + }, + 1 * 60 * 1000, + interval, + TimeUnit.MILLISECONDS) + } } private[kyuubi] def getKubernetesClientInitializeInfo( @@ -378,6 +428,15 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { ThreadUtils.shutdown(kubernetesClientInitializeCleanupTerminatedPodExecutor) kubernetesClientInitializeCleanupTerminatedPodExecutor = null } + if (cleanupFailedMountLoopPodExecutor != null) { + ThreadUtils.shutdown(cleanupFailedMountLoopPodExecutor) + cleanupFailedMountLoopPodExecutor = null + } + + if (failedMountLoopPeriodicChecker != null) { + ThreadUtils.shutdown(failedMountLoopPeriodicChecker) + failedMountLoopPeriodicChecker = null + } } private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo) @@ -593,6 +652,37 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { }) } } + + // GEICO: Check if pod is stuck at failedMount Loop + private def checkPodFailedMountLoop( + kubernetesInfo: KubernetesInfo, + pod: Pod): Unit = { + cleanupFailedMountLoopPodExecutor.submit(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + val kyuubiUniqueKey = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) + val client = getOrCreateKubernetesClient(kubernetesInfo) + val ns = Option(pod.getMetadata.getNamespace).getOrElse(client.getNamespace) + val events = client.v1().events().inNamespace(ns) + .withField("involvedObject.kind", "Pod") + .withField("involvedObject.name", pod.getMetadata.getName) + .list() + .getItems.asScala.toList + + val hasFailedMount = events.exists { event => + val reasonMatches = Option(event.getReason) + .exists(_.contains("FailedMount")) + val count = Option(event.getCount).getOrElse(0).asInstanceOf[Int] + reasonMatches && (count >= failedMountLoopThreshold) + } + if (hasFailedMount) { + warn(s"[$kubernetesInfo] Detected FailedMount for pod " + + s" exceeding the threshold of $failedMountLoopThreshold, " + + s"${pod.getMetadata.getName}, deleting pod") + deletePod(kubernetesInfo, pod.getMetadata.getName, kyuubiUniqueKey) + } + } + }) + } } object KubernetesApplicationOperation extends Logging { From 7fdd99120bc427cd28a0666c6b53838806e48103 Mon Sep 17 00:00:00 2001 From: Hoon Oh Date: Thu, 16 Apr 2026 12:08:43 -0400 Subject: [PATCH 2/3] comment cleanups --- .../src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala | 2 +- .../apache/kyuubi/engine/KubernetesApplicationOperation.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 9fbff60aecb..11910efa1ea 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1441,7 +1441,7 @@ object KyuubiConf { .checkValues(KubernetesApplicationStateSource) .createWithDefault(KubernetesApplicationStateSource.POD.toString) - // GEICO: Configuration for checking and deleting pods stuck in FailedMount loop + // Configuration for checking and deleting pods stuck in FailedMount loop // Kubernetes will retry ~every 2 minutes, // Thus 30 count correspond to (30*2)/60 = 1 hours val KUBERNETES_POD_FAILED_MOUNT_LOOP_THRESHOLD: ConfigEntry[Int] = diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 825f821ea8d..36a7e45ddc8 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -224,7 +224,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { "cleanup-failed-mount-loop-pod-thread") initializeKubernetesClient(kyuubiConf) - // GEICO: start a periodic FailedMount checker for Pending pods + // start a periodic FailedMount checker for Pending pods if (kyuubiConf.get(KyuubiConf.KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_ENABLED)) { val interval = kyuubiConf.get(KyuubiConf.KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_INTERVAL) failedMountLoopPeriodicChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor( @@ -653,7 +653,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } } - // GEICO: Check if pod is stuck at failedMount Loop + // Check if pod is stuck at failedMount Loop private def checkPodFailedMountLoop( kubernetesInfo: KubernetesInfo, pod: Pod): Unit = { From 1aacfd8ad551de2e96bfc7254464a2bb40435ecf Mon Sep 17 00:00:00 2001 From: Hoon Oh Date: Tue, 28 Apr 2026 15:58:49 -0400 Subject: [PATCH 3/3] Style fix --- .../apache/kyuubi/engine/KubernetesApplicationOperation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 36a7e45ddc8..c4f23d21370 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -87,7 +87,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _ private var kubernetesClientInitializeCleanupTerminatedPodExecutor: ThreadPoolExecutor = _ - + private var cleanupFailedMountLoopPodExecutor: ThreadPoolExecutor = _ private var failedMountLoopPeriodicChecker: ScheduledExecutorService = _