diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index f489e8528dd..11910efa1ea 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1441,6 +1441,34 @@ object KyuubiConf { .checkValues(KubernetesApplicationStateSource) .createWithDefault(KubernetesApplicationStateSource.POD.toString) + // Configuration for checking and deleting pods stuck in FailedMount loop + // Kubernetes will retry ~every 2 minutes, + // Thus 30 count correspond to (30*2)/60 = 1 hours + val KUBERNETES_POD_FAILED_MOUNT_LOOP_THRESHOLD: ConfigEntry[Int] = + buildConf("kyuubi.kubernetes.pod.failed.mount.loop.threshold") + .serverOnly + .doc("The threshold for the number of failed mount loop to trigger pod deletion") + .version("1.11.0") + .intConf + .createWithDefault(30) + + val KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_ENABLED: ConfigEntry[Boolean] = + buildConf("kyuubi.kubernetes.pod.failed.mount.loop.check.enabled") + .serverOnly + .doc("Whether to periodically check Pending pods for FailedMount " + + "loops and delete them when exceeding the threshold.") + .version("1.11.0") + .booleanConf + .createWithDefault(true) + + val KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_INTERVAL: ConfigEntry[Long] = + buildConf("kyuubi.kubernetes.pod.failed.mount.loop.check.interval") + .serverOnly + .doc("Interval for periodically checking Pending pods for FailedMount loops.") + .version("1.11.0") + .timeConf + .createWithDefault(60 * 60 * 1000) + object KubernetesApplicationStateSource extends Enumeration { type KubernetesApplicationStateSource = Value val POD, CONTAINER = Value diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 0160f9077ba..c4f23d21370 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -60,6 +60,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private def allowedNamespaces: Set[String] = kyuubiConf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST) + private def failedMountLoopThreshold: Int = + kyuubiConf.get(KyuubiConf.KUBERNETES_POD_FAILED_MOUNT_LOOP_THRESHOLD) + private def appStateSource: KubernetesApplicationStateSource = KubernetesApplicationStateSource.withName( kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_SOURCE)) @@ -85,6 +88,10 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private var kubernetesClientInitializeCleanupTerminatedPodExecutor: ThreadPoolExecutor = _ + private var cleanupFailedMountLoopPodExecutor: ThreadPoolExecutor = _ + + private var failedMountLoopPeriodicChecker: ScheduledExecutorService = _ + private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = { checkKubernetesInfo(kubernetesInfo) kubernetesClients.computeIfAbsent( @@ -213,7 +220,50 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { kubernetesClientInitializeCleanupTerminatedPodExecutor = ThreadUtils.newDaemonCachedThreadPool( "kubernetes-client-initialize-cleanup-terminated-pod-thread") + cleanupFailedMountLoopPodExecutor = ThreadUtils.newDaemonCachedThreadPool( + "cleanup-failed-mount-loop-pod-thread") initializeKubernetesClient(kyuubiConf) + + // start a periodic FailedMount checker for Pending pods + if (kyuubiConf.get(KyuubiConf.KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_ENABLED)) { + val interval = kyuubiConf.get(KyuubiConf.KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_INTERVAL) + failedMountLoopPeriodicChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "failed-mount-loop-periodic-checker") + ThreadUtils.scheduleTolerableRunnableWithFixedDelay( + failedMountLoopPeriodicChecker, + new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + try { + // Iterate over known applications and check Pending pods for FailedMount loops + appInfoStore.asScala.foreach { case (_, (kubernetesInfo, appInfo)) => + if (appInfo.state == ApplicationState.PENDING) { + val client = getOrCreateKubernetesClient(kubernetesInfo) + val podNameOpt = appInfo.podName + val podOpt = podNameOpt + .flatMap { name => Option(client.pods().withName(name).get()) } + .orElse { + Option(client.pods() + .withLabel(LABEL_KYUUBI_UNIQUE_KEY, appInfo.name) + .list()) + .flatMap(l => Option(l.getItems)).map(_.asScala.headOption).flatten + } + podOpt.foreach { pod => + // Reuse the existing logic through checkPodFailedMountLoop + checkPodFailedMountLoop( + kubernetesInfo, + pod) + } + } + } + } catch { + case NonFatal(e) => warn("Periodic FailedMount checker encountered an error", e) + } + } + }, + 1 * 60 * 1000, + interval, + TimeUnit.MILLISECONDS) + } } private[kyuubi] def getKubernetesClientInitializeInfo( @@ -378,6 +428,15 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { ThreadUtils.shutdown(kubernetesClientInitializeCleanupTerminatedPodExecutor) kubernetesClientInitializeCleanupTerminatedPodExecutor = null } + if (cleanupFailedMountLoopPodExecutor != null) { + ThreadUtils.shutdown(cleanupFailedMountLoopPodExecutor) + cleanupFailedMountLoopPodExecutor = null + } + + if (failedMountLoopPeriodicChecker != null) { + ThreadUtils.shutdown(failedMountLoopPeriodicChecker) + failedMountLoopPeriodicChecker = null + } } private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo) @@ -593,6 +652,37 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { }) } } + + // Check if pod is stuck at failedMount Loop + private def checkPodFailedMountLoop( + kubernetesInfo: KubernetesInfo, + pod: Pod): Unit = { + cleanupFailedMountLoopPodExecutor.submit(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + val kyuubiUniqueKey = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) + val client = getOrCreateKubernetesClient(kubernetesInfo) + val ns = Option(pod.getMetadata.getNamespace).getOrElse(client.getNamespace) + val events = client.v1().events().inNamespace(ns) + .withField("involvedObject.kind", "Pod") + .withField("involvedObject.name", pod.getMetadata.getName) + .list() + .getItems.asScala.toList + + val hasFailedMount = events.exists { event => + val reasonMatches = Option(event.getReason) + .exists(_.contains("FailedMount")) + val count = Option(event.getCount).getOrElse(0).asInstanceOf[Int] + reasonMatches && (count >= failedMountLoopThreshold) + } + if (hasFailedMount) { + warn(s"[$kubernetesInfo] Detected FailedMount for pod " + + s" exceeding the threshold of $failedMountLoopThreshold, " + + s"${pod.getMetadata.getName}, deleting pod") + deletePod(kubernetesInfo, pod.getMetadata.getName, kyuubiUniqueKey) + } + } + }) + } } object KubernetesApplicationOperation extends Logging {