From 86005827f6629a6797b75ba9408e40ed3bde5351 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 21 Jun 2021 12:19:47 +0200 Subject: [PATCH] Detect and down suspended process, #30323 --- .../scala/akka/dispatch/SuspendDetector.scala | 78 +++++++++++++ .../scala/akka/cluster/ClusterDaemon.scala | 10 +- .../akka/cluster/sbr/DowningStrategy.scala | 6 + .../akka/cluster/sbr/SplitBrainResolver.scala | 106 +++++++++++------- 4 files changed, 153 insertions(+), 47 deletions(-) create mode 100644 akka-actor/src/main/scala/akka/dispatch/SuspendDetector.scala diff --git a/akka-actor/src/main/scala/akka/dispatch/SuspendDetector.scala b/akka-actor/src/main/scala/akka/dispatch/SuspendDetector.scala new file mode 100644 index 00000000000..38770be1285 --- /dev/null +++ b/akka-actor/src/main/scala/akka/dispatch/SuspendDetector.scala @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.dispatch + +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.actor.ClassicActorSystemProvider +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.event.Logging + +object SuspendDetector extends ExtensionId[SuspendDetector] with ExtensionIdProvider { + override def get(system: ActorSystem): SuspendDetector = super.get(system) + + override def get(system: ClassicActorSystemProvider): SuspendDetector = super.get(system) + + override def lookup = SuspendDetector + + override def createExtension(system: ExtendedActorSystem): SuspendDetector = new SuspendDetector(system) + + /** + * Published to the ActorSystem's eventStream when the process suspension has been detected. + * Note that this message could be stale when it is received and additional check with + * [[SuspendDetected#wasSuspended]] is recommended to be sure that the suspension occurred recently. + */ + final class SuspendDetected(suspendDetectedNanoTime: Long) { + def wasSuspended(since: FiniteDuration): Boolean = + (System.nanoTime() - suspendDetectedNanoTime <= since.toNanos) + } + +} + +class SuspendDetector(val system: ExtendedActorSystem) extends Extension { + import SuspendDetector.SuspendDetected + + // FIXME config + private val tickInterval = 100.millis + private val tickDeadlineNanos = 5.seconds.toNanos // FIXME default should be > 30 seconds + + private val log = Logging(system, classOf[SuspendDetector]) + + @volatile private var aliveTime = System.nanoTime() + @volatile private var suspendDetectedTime = aliveTime - 1.day.toNanos + + system.scheduler.scheduleWithFixedDelay(tickInterval, tickInterval) { () => + checkTime() + }(system.dispatcher) + + private def checkTime(): Boolean = synchronized { + val now = System.nanoTime() + val suspendDetected = + if (now - aliveTime >= tickDeadlineNanos) { + suspendDetectedTime = now + true + } else { + false + } + + if (suspendDetected) { + log.warning("Process was suspended for [{} seconds]", (now - aliveTime).nanos.toSeconds) + system.eventStream.publish(new SuspendDetected(now)) + } + + aliveTime = now + + suspendDetected + } + + def wasSuspended(since: FiniteDuration): Boolean = { + checkTime() || (System.nanoTime() - suspendDetectedTime <= since.toNanos) + } + +} diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index a3917f57662..adc7a75e45d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -9,8 +9,8 @@ import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.control.NonFatal - import scala.annotation.nowarn + import com.typesafe.config.Config import akka.Done @@ -19,11 +19,12 @@ import akka.actor.SupervisorStrategy.Stop import akka.annotation.InternalApi import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ -import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } +import akka.dispatch.SuspendDetector +import akka.dispatch.{RequiresMessageQueue, UnboundedMessageQueueSemantics} import akka.event.ActorWithLogClass import akka.event.Logging import akka.pattern.ask -import akka.remote.{ QuarantinedEvent => ClassicQuarantinedEvent } +import akka.remote.{QuarantinedEvent => ClassicQuarantinedEvent} import akka.remote.artery.QuarantinedEvent import akka.util.Timeout import akka.util.Version @@ -1492,7 +1493,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh * Reaps the unreachable members according to the failure detector's verdict. */ def reapUnreachableMembers(): Unit = { - if (!isSingletonCluster) { + // FIXME how long should the suspend timeout be? + if (!isSingletonCluster && !SuspendDetector(context.system).wasSuspended(3.seconds)) { // only scrutinize if we are a non-singleton cluster val localGossip = latestGossip diff --git a/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala b/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala index cc5f57e7bb5..29055953bfd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala +++ b/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala @@ -51,6 +51,9 @@ import akka.coordination.lease.scaladsl.Lease case object DownSelfQuarantinedByRemote extends Decision { override def isIndirectlyConnected: Boolean = false } + case object DownSelfSuspended extends Decision { + override def isIndirectlyConnected: Boolean = false + } } /** @@ -278,6 +281,9 @@ import akka.coordination.lease.scaladsl.Lease case DownSelfQuarantinedByRemote => if (downable.contains(selfUniqueAddress)) Set(selfUniqueAddress) else Set.empty + case DownSelfSuspended => + if (downable.contains(selfUniqueAddress)) Set(selfUniqueAddress) + else Set.empty } } diff --git a/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala b/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala index f4a6fd78f01..edff97aa245 100644 --- a/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala +++ b/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala @@ -27,6 +27,7 @@ import akka.cluster.Member import akka.cluster.Reachability import akka.cluster.UniqueAddress import akka.cluster.sbr.DowningStrategy.Decision +import akka.dispatch.SuspendDetector import akka.event.DiagnosticMarkerBusLoggingAdapter import akka.event.Logging import akka.pattern.pipe @@ -146,6 +147,10 @@ import akka.remote.artery.ThisActorSystemQuarantinedEvent val log: DiagnosticMarkerBusLoggingAdapter = Logging.withMarker(this) + private val suspendTimeout = 1.minute + private val suspendDetector = SuspendDetector(context.system) // make sure it's started + context.system.eventStream.subscribe(self, classOf[SuspendDetector.SuspendDetected]) + @InternalStableApi def strategy: DowningStrategy = _strategy @@ -287,6 +292,7 @@ import akka.remote.artery.ThisActorSystemQuarantinedEvent case Tick => tick() case ThisActorSystemQuarantinedEvent(_, remote) => thisActorSystemWasQuarantined(remote) case _: ClusterDomainEvent => // not interested in other events + case s: SuspendDetector.SuspendDetected => if (s.wasSuspended(suspendTimeout)) suspendDetected() } private def leaderChanged(leaderOption: Option[Address]): Unit = { @@ -296,53 +302,58 @@ import akka.remote.artery.ThisActorSystemQuarantinedEvent } private def tick(): Unit = { - // note the DownAll due to instability is running on all nodes to make that decision as quickly and - // aggressively as possible if time is out - if (reachabilityChangedStats.changeCount > 0) { - val now = System.nanoTime() - val durationSinceLatestChange = (now - reachabilityChangedStats.latestChangeTimestamp).nanos - val durationSinceFirstChange = (now - reachabilityChangedStats.firstChangeTimestamp).nanos - - val downAllWhenUnstableEnabled = downAllWhenUnstable > Duration.Zero - if (downAllWhenUnstableEnabled && durationSinceFirstChange > (stableAfter + downAllWhenUnstable)) { - log.warning( - ClusterLogMarker.sbrInstability, - "SBR detected instability and will down all nodes: {}", - reachabilityChangedStats) - actOnDecision(DownAll) - } else if (!downAllWhenUnstableEnabled && durationSinceLatestChange > (stableAfter * 2)) { - // downAllWhenUnstable is disabled but reset for meaningful logging - log.debug("SBR no reachability changes within {} ms, resetting stats", (stableAfter * 2).toMillis) - resetReachabilityChangedStats() + if (suspendDetector.wasSuspended(suspendTimeout)) { + // note that suspend detection is running on all nodes + suspendDetected() + } else { + // note the DownAll due to instability is running on all nodes to make that decision as quickly and + // aggressively as possible if time is out + if (reachabilityChangedStats.changeCount > 0) { + val now = System.nanoTime() + val durationSinceLatestChange = (now - reachabilityChangedStats.latestChangeTimestamp).nanos + val durationSinceFirstChange = (now - reachabilityChangedStats.firstChangeTimestamp).nanos + + val downAllWhenUnstableEnabled = downAllWhenUnstable > Duration.Zero + if (downAllWhenUnstableEnabled && durationSinceFirstChange > (stableAfter + downAllWhenUnstable)) { + log.warning( + ClusterLogMarker.sbrInstability, + "SBR detected instability and will down all nodes: {}", + reachabilityChangedStats) + actOnDecision(DownAll) + } else if (!downAllWhenUnstableEnabled && durationSinceLatestChange > (stableAfter * 2)) { + // downAllWhenUnstable is disabled but reset for meaningful logging + log.debug("SBR no reachability changes within {} ms, resetting stats", (stableAfter * 2).toMillis) + resetReachabilityChangedStats() + } } - } - if (isResponsible && strategy.unreachable.nonEmpty && stableDeadline.isOverdue()) { - strategy.decide() match { - case decision: AcquireLeaseDecision => - strategy.lease match { - case Some(lease) => - if (lease.checkLease()) { - log.info( - ClusterLogMarker.sbrLeaseAcquired(decision), - "SBR has acquired lease for decision [{}]", - decision) - actOnDecision(decision) - } else { - if (decision.acquireDelay == Duration.Zero) - acquireLease() // reply message is AcquireLeaseResult - else { - log.debug("SBR delayed attempt to acquire lease for [{} ms]", decision.acquireDelay.toMillis) - timers.startSingleTimer(AcquireLease, AcquireLease, decision.acquireDelay) + if (isResponsible && strategy.unreachable.nonEmpty && stableDeadline.isOverdue()) { + strategy.decide() match { + case decision: AcquireLeaseDecision => + strategy.lease match { + case Some(lease) => + if (lease.checkLease()) { + log.info( + ClusterLogMarker.sbrLeaseAcquired(decision), + "SBR has acquired lease for decision [{}]", + decision) + actOnDecision(decision) + } else { + if (decision.acquireDelay == Duration.Zero) + acquireLease() // reply message is AcquireLeaseResult + else { + log.debug("SBR delayed attempt to acquire lease for [{} ms]", decision.acquireDelay.toMillis) + timers.startSingleTimer(AcquireLease, AcquireLease, decision.acquireDelay) + } + context.become(waitingForLease(decision)) } - context.become(waitingForLease(decision)) - } - case None => - throw new IllegalStateException("Unexpected lease decision although lease is not configured") - } + case None => + throw new IllegalStateException("Unexpected lease decision although lease is not configured") + } - case decision => - actOnDecision(decision) + case decision => + actOnDecision(decision) + } } } @@ -364,6 +375,15 @@ import akka.remote.artery.ThisActorSystemQuarantinedEvent } } + private def suspendDetected(): Unit = { + if (strategy.allMembersInDC.size > 1) { + log.warning( + ClusterLogMarker.sbrInstability, + "SBR detected that the process was suspended too long and will down itself.") + actOnDecision(DowningStrategy.DownSelfSuspended) + } + } + private def acquireLease(): Unit = { log.debug("SBR trying to acquire lease") implicit val ec: ExecutionContext = internalDispatcher