Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,34 @@ object KyuubiConf {
.checkValues(KubernetesApplicationStateSource)
.createWithDefault(KubernetesApplicationStateSource.POD.toString)

// Configuration for checking and deleting pods stuck in FailedMount loop
// Kubernetes will retry ~every 2 minutes,
// Thus 30 count correspond to (30*2)/60 = 1 hours
val KUBERNETES_POD_FAILED_MOUNT_LOOP_THRESHOLD: ConfigEntry[Int] =
buildConf("kyuubi.kubernetes.pod.failed.mount.loop.threshold")
.serverOnly
.doc("The threshold for the number of failed mount loop to trigger pod deletion")
.version("1.11.0")
.intConf
.createWithDefault(30)

val KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.kubernetes.pod.failed.mount.loop.check.enabled")
.serverOnly
.doc("Whether to periodically check Pending pods for FailedMount " +
"loops and delete them when exceeding the threshold.")
.version("1.11.0")
.booleanConf
.createWithDefault(true)

val KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("kyuubi.kubernetes.pod.failed.mount.loop.check.interval")
.serverOnly
.doc("Interval for periodically checking Pending pods for FailedMount loops.")
.version("1.11.0")
.timeConf
.createWithDefault(60 * 60 * 1000)

object KubernetesApplicationStateSource extends Enumeration {
type KubernetesApplicationStateSource = Value
val POD, CONTAINER = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
private def allowedNamespaces: Set[String] =
kyuubiConf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST)

private def failedMountLoopThreshold: Int =
kyuubiConf.get(KyuubiConf.KUBERNETES_POD_FAILED_MOUNT_LOOP_THRESHOLD)

private def appStateSource: KubernetesApplicationStateSource =
KubernetesApplicationStateSource.withName(
kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_SOURCE))
Expand All @@ -85,6 +88,10 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {

private var kubernetesClientInitializeCleanupTerminatedPodExecutor: ThreadPoolExecutor = _

private var cleanupFailedMountLoopPodExecutor: ThreadPoolExecutor = _

private var failedMountLoopPeriodicChecker: ScheduledExecutorService = _

private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = {
checkKubernetesInfo(kubernetesInfo)
kubernetesClients.computeIfAbsent(
Expand Down Expand Up @@ -213,7 +220,50 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
kubernetesClientInitializeCleanupTerminatedPodExecutor =
ThreadUtils.newDaemonCachedThreadPool(
"kubernetes-client-initialize-cleanup-terminated-pod-thread")
cleanupFailedMountLoopPodExecutor = ThreadUtils.newDaemonCachedThreadPool(
"cleanup-failed-mount-loop-pod-thread")
initializeKubernetesClient(kyuubiConf)

// start a periodic FailedMount checker for Pending pods
if (kyuubiConf.get(KyuubiConf.KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_ENABLED)) {
val interval = kyuubiConf.get(KyuubiConf.KUBERNETES_POD_FAILED_MOUNT_LOOP_CHECK_INTERVAL)
failedMountLoopPeriodicChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"failed-mount-loop-periodic-checker")
ThreadUtils.scheduleTolerableRunnableWithFixedDelay(
failedMountLoopPeriodicChecker,
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
try {
// Iterate over known applications and check Pending pods for FailedMount loops
appInfoStore.asScala.foreach { case (_, (kubernetesInfo, appInfo)) =>
if (appInfo.state == ApplicationState.PENDING) {
val client = getOrCreateKubernetesClient(kubernetesInfo)
val podNameOpt = appInfo.podName
val podOpt = podNameOpt
.flatMap { name => Option(client.pods().withName(name).get()) }
.orElse {
Option(client.pods()
.withLabel(LABEL_KYUUBI_UNIQUE_KEY, appInfo.name)
.list())
.flatMap(l => Option(l.getItems)).map(_.asScala.headOption).flatten
}
podOpt.foreach { pod =>
// Reuse the existing logic through checkPodFailedMountLoop
checkPodFailedMountLoop(
kubernetesInfo,
pod)
}
}
}
} catch {
case NonFatal(e) => warn("Periodic FailedMount checker encountered an error", e)
}
}
},
1 * 60 * 1000,
interval,
TimeUnit.MILLISECONDS)
}
}

private[kyuubi] def getKubernetesClientInitializeInfo(
Expand Down Expand Up @@ -378,6 +428,15 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
ThreadUtils.shutdown(kubernetesClientInitializeCleanupTerminatedPodExecutor)
kubernetesClientInitializeCleanupTerminatedPodExecutor = null
}
if (cleanupFailedMountLoopPodExecutor != null) {
ThreadUtils.shutdown(cleanupFailedMountLoopPodExecutor)
cleanupFailedMountLoopPodExecutor = null
}

if (failedMountLoopPeriodicChecker != null) {
ThreadUtils.shutdown(failedMountLoopPeriodicChecker)
failedMountLoopPeriodicChecker = null
}
}

private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo)
Expand Down Expand Up @@ -593,6 +652,37 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
})
}
}

// Check if pod is stuck at failedMount Loop
private def checkPodFailedMountLoop(
kubernetesInfo: KubernetesInfo,
pod: Pod): Unit = {
cleanupFailedMountLoopPodExecutor.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val kyuubiUniqueKey = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
val client = getOrCreateKubernetesClient(kubernetesInfo)
val ns = Option(pod.getMetadata.getNamespace).getOrElse(client.getNamespace)
val events = client.v1().events().inNamespace(ns)
.withField("involvedObject.kind", "Pod")
.withField("involvedObject.name", pod.getMetadata.getName)
.list()
.getItems.asScala.toList

val hasFailedMount = events.exists { event =>
val reasonMatches = Option(event.getReason)
.exists(_.contains("FailedMount"))
val count = Option(event.getCount).getOrElse(0).asInstanceOf[Int]
reasonMatches && (count >= failedMountLoopThreshold)
}
if (hasFailedMount) {
warn(s"[$kubernetesInfo] Detected FailedMount for pod " +
s" exceeding the threshold of $failedMountLoopThreshold, " +
s"${pod.getMetadata.getName}, deleting pod")
deletePod(kubernetesInfo, pod.getMetadata.getName, kyuubiUniqueKey)
}
}
})
}
}

object KubernetesApplicationOperation extends Logging {
Expand Down
Loading