diff --git a/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala b/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala index 4bf48f3c358..3bcadc0bd72 100644 --- a/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala +++ b/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala @@ -247,7 +247,7 @@ import akka.coordination.lease.scaladsl.Lease } private def indirectlyConnectedFromSeenCurrentGossip: Set[UniqueAddress] = { - reachability.records.flatMap { r => + reachability.records.iterator.flatMap { r => if (seenBy(r.subject.address)) r.observer :: r.subject :: Nil else Nil }.toSet @@ -295,6 +295,7 @@ import akka.coordination.lease.scaladsl.Lease try { val intersectionOfObserversAndSubjects = indirectlyConnectedFromIntersectionOfObserversAndSubjects val haveSeenCurrentGossip = indirectlyConnectedFromSeenCurrentGossip + // remove records between the indirectly connected _reachability = reachability.filterRecords { r => // we only retain records for addresses that are still downable @@ -306,13 +307,34 @@ import akka.coordination.lease.scaladsl.Lease _unreachable = reachability.allUnreachableOrTerminated val additionalDecision = decide() - if (additionalDecision.isIndirectlyConnected) - throw new IllegalStateException( - s"SBR double $additionalDecision decision, downing all instead. " + - s"originalReachability: [$originalReachability], filtered reachability [$reachability], " + - s"still indirectlyConnected: [$indirectlyConnected], seenBy: [$seenBy]") - - nodesToDown(additionalDecision) + if (additionalDecision.isIndirectlyConnected) { + val directlyConnectedObservers = + reachability.allObservers.diff(intersectionOfObserversAndSubjects).diff(haveSeenCurrentGossip) + val unreachableByDirectlyConnectedObservers = + reachability.records.iterator.flatMap { r => + if (directlyConnectedObservers(r.observer)) Some(r.subject) else None + }.toSet + + // does not change the set of unreachable nodes, we're just ignoring some observations + _reachability = reachability.filterRecords { r => + // keep observations by the directly connected + directlyConnectedObservers(r.observer) || + // and also keep observations that no directly-connected observed + !(unreachableByDirectlyConnectedObservers(r.subject)) + } + _unreachable = reachability.allUnreachableOrTerminated + + val secondOpinion = decide() // or is that third opinion? + if (secondOpinion.isIndirectlyConnected) + throw new IllegalStateException( + s"SBR double $additionalDecision decision, downing all instead. " + + s"originalReachability: [$originalReachability], filtered reachability [$reachability], " + + s"still indirectlyConnected: [$indirectlyConnected], seenBy: [$seenBy]") + + // nodesToDown(additionalDecision) is a subset of the already-known indirectly connected, + // so will be union'd in by caller + nodesToDown(secondOpinion) + } else nodesToDown(additionalDecision) } finally { _unreachable = originalUnreachable diff --git a/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala b/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala index 943b7f6a0c9..33f22b9ad71 100644 --- a/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala @@ -569,7 +569,7 @@ class SplitBrainResolverSpec assertDowningSide(side2, Set(memberA, memberB, memberC, memberD, memberE, memberF, memberG)) } - "double DownIndirectlyConnected when indirectly connected happens before clean partition: {A, B, C} | {(D, E), (F, G)} => {}" in new Setup2( + "down all when indirectly connected happens (covering all of majority side) before clean partition: {A, B, C} | {(D, E), (F, G)} => {}" in new Setup2( role = None) { side1 = Set(memberA, memberB, memberC) side2 = Set(memberD, memberE, memberF, memberG) @@ -578,19 +578,70 @@ class SplitBrainResolverSpec // from side1 of the partition, minority // D and G are observers and marked E and F as unreachable - // A has marked D and G as unreachable + // D, E, F, G unreachable due to the partition + // // The records D->E, G->F are not removed in the second decision because they are not detected via seenB - // due to clean partition. That means that the second decision will also be DownIndirectlyConnected. To bail - // out from this situation the strategy will throw IllegalStateException, which is caught and translated to - // DownAll. - intercept[IllegalStateException] { - assertDowningSide(side1, Set(memberA, memberB, memberC)) - } - - // from side2 of the partition, majority + // due to clean partition. + // For the third decision, D->E and G->F are removed because E and F are known to be unreachable by A by + // the clean partition + assertDowningSide(side1, Set(memberA, memberB, memberC, memberD, memberG)) + + // from side2 of the partition, "majority" + // A, B, C are on minority side + // E, F are also unreachable assertDowningSide(side2, Set(memberA, memberB, memberC, memberD, memberE, memberF, memberG)) } + "retain majority when indirectly connected happens (minority side) before clean partition { (A, B), C } | { D, E, F, G } => { D, E, F, G}" in new Setup2( + role = None) { + side1 = Set(memberA, memberB, memberC) + side2 = Set(memberD, memberE, memberF, memberG) + indirectlyConnected = List(memberA -> memberB) + + // from side 1 (minority) + // A observed B unreachable before partition + assertDowningSide(side1, Set(memberA, memberB, memberC)) + + // from side 2 (majority) + // A observed B unreachable before partition + // A, B, C unreachable due to partition + // A is indirectly connected (A observed and is observed unreachable) + // + // A->B not removed for second decision because B is not indirectly connected => A still indirectly connected + // Since B is observed unreachable from a directly connected node, decide as if we hadn't seen the A->B observation + // side2 is a reachable majority, so DownUnreachable + assertDowningSide(side2, Set(memberA, memberB, memberC)) + } + + "double DownIndirectlyConnected when indirectly connected happens (large majority side) before clean partition { A } | { (C, E, G), B, D, F, H } => {}" in new Setup2( + role = None) { + side1 = Set(memberA) + side2 = Set(memberB, memberC, memberD, memberE, memberF, memberG, memberH) + indirectlyConnected = List(memberE -> memberG, memberG -> memberC) + + assertDowningSide(side1, Set(memberA, memberE, memberG)) + + // assertDowningSide assumes that all on the majority side have seen latest gossip, which removes the indirectly + // connnected when doing the additional decision + { + val strategy = { + val s = createStrategy() + (side1 ++ side2).foreach(s.add) + s + } + val unreachability = (indirectlyConnected ++ side1.map(o => side2.head -> o)).toSet.toList + val r = createReachability(unreachability) + strategy.setReachability(r) + + unreachability.foreach { case (_, to) => strategy.addUnreachable(to) } + // let's say that neither C nor G is has seen gossip + strategy.setSeenBy(side2.filterNot(m => (m eq memberC) || (m eq memberG)).map(_.address)) + + an[IllegalStateException] shouldBe thrownBy { + strategy.nodesToDown() should be((side1 ++ side2).map(_.uniqueAddress)) + } + } + } } "KeepOldest" must { diff --git a/akka-cluster/src/test/scala/akka/cluster/sbr/TestAddresses.scala b/akka-cluster/src/test/scala/akka/cluster/sbr/TestAddresses.scala index c234174685b..f83398d01fd 100644 --- a/akka-cluster/src/test/scala/akka/cluster/sbr/TestAddresses.scala +++ b/akka-cluster/src/test/scala/akka/cluster/sbr/TestAddresses.scala @@ -46,6 +46,8 @@ object TestAddresses { new Member(UniqueAddress(addressA.copy(host = Some("f")), 0L), 5, Up, Set(defaultDcRole), Version.Zero) val memberG = new Member(UniqueAddress(addressA.copy(host = Some("g")), 0L), 6, Up, Set(defaultDcRole), Version.Zero) + val memberH = + new Member(UniqueAddress(addressA.copy(host = Some("h")), 0L), 7, Up, Set(defaultDcRole), Version.Zero) val memberAWeaklyUp = new Member(memberA.uniqueAddress, Int.MaxValue, WeaklyUp, memberA.roles, Version.Zero) val memberBWeaklyUp = new Member(memberB.uniqueAddress, Int.MaxValue, WeaklyUp, memberB.roles, Version.Zero)