Skip to content

Commit ec5e33f

Browse files
authored
add non-default config that allows InboundQuarantineCheck to ignore 'harmless' quarantine events (#1555)
* stub test for harmless=true Update OutboundIdleShutdownSpec.scala Update OutboundIdleShutdownSpec.scala Update OutboundIdleShutdownSpec.scala * add quarantinedButHarmless check for tests * new test case * Update OutboundIdleShutdownSpec.scala * try to not shutdown when quarantine is harmless * Update OutboundIdleShutdownSpec.scala * Create quarantine.backwards.excludes * Update quarantine.backwards.excludes * update log message * try to add config * Update ArterySettings.scala * add tests * Update OutboundIdleShutdownSpec.scala * rework test
1 parent dad6b9f commit ec5e33f

File tree

8 files changed

+292
-19
lines changed

8 files changed

+292
-19
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# changes made due to issues with downing during harmless quarantine
19+
# https://github.com/apache/pekko/issues/578
20+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.copy")
21+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.this")
22+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.apply")
23+
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.unapply")
24+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.artery.AssociationState$QuarantinedTimestamp$")

remote/src/main/resources/reference.conf

+5
Original file line numberDiff line numberDiff line change
@@ -850,6 +850,11 @@ pekko {
850850
# limit there will be extra performance and scalability cost.
851851
log-frame-size-exceeding = off
852852

853+
# If set to "on", InboundQuarantineCheck will propagate harmless quarantine events.
854+
# This is the legacy behavior. Users who see these harmless quarantine events lead
855+
# to problems can set this to "off" to suppress them (https://github.com/apache/pekko/pull/1555).
856+
propagate-harmless-quarantine-events = on
857+
853858
advanced {
854859

855860
# Maximum serialized message size, including header data.

remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala

+7
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ private[pekko] final class ArterySettings private (config: Config) {
105105
*/
106106
val Version: Byte = ArteryTransport.HighestVersion
107107

108+
/**
109+
* If set to true, harmless quarantine events are propagated in InboundQuarantineCheck.
110+
* Background is in https://github.com/apache/pekko/pull/1555
111+
*/
112+
val PropagateHarmlessQuarantineEvents: Boolean =
113+
getBoolean("propagate-harmless-quarantine-events")
114+
108115
object Advanced {
109116
val config: Config = getConfig("advanced")
110117
import config._

remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala

+11-4
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,9 @@ private[remote] object AssociationState {
108108
quarantined = ImmutableLongMap.empty[QuarantinedTimestamp],
109109
new AtomicReference(UniqueRemoteAddressValue(None, Nil)))
110110

111-
final case class QuarantinedTimestamp(nanoTime: Long) {
111+
final case class QuarantinedTimestamp(nanoTime: Long, harmless: Boolean = false) {
112112
override def toString: String =
113-
s"Quarantined ${TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - nanoTime)} seconds ago"
113+
s"Quarantined ${TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - nanoTime)} seconds ago (harmless=$harmless)"
114114
}
115115

116116
private final case class UniqueRemoteAddressValue(
@@ -159,6 +159,13 @@ private[remote] final class AssociationState private (
159159

160160
def isQuarantined(uid: Long): Boolean = quarantined.contains(uid)
161161

162+
def quarantinedButHarmless(uid: Long): Boolean = {
163+
quarantined.get(uid) match {
164+
case OptionVal.Some(qt) => qt.harmless
165+
case _ => false
166+
}
167+
}
168+
162169
@tailrec def completeUniqueRemoteAddress(peer: UniqueAddress): Unit = {
163170
val current = _uniqueRemoteAddress.get()
164171
if (current.uniqueRemoteAddress.isEmpty) {
@@ -196,14 +203,14 @@ private[remote] final class AssociationState private (
196203
quarantined,
197204
new AtomicReference(UniqueRemoteAddressValue(Some(remoteAddress), Nil)))
198205

199-
def newQuarantined(): AssociationState =
206+
def newQuarantined(harmless: Boolean = false): AssociationState =
200207
uniqueRemoteAddress() match {
201208
case Some(a) =>
202209
new AssociationState(
203210
incarnation,
204211
lastUsedTimestamp = new AtomicLong(System.nanoTime()),
205212
controlIdleKillSwitch,
206-
quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime())),
213+
quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime(), harmless)),
207214
_uniqueRemoteAddress)
208215
case None => this
209216
}

remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ private[remote] class Association(
538538
current.uniqueRemoteAddress() match {
539539
case Some(peer) if peer.uid == u =>
540540
if (!current.isQuarantined(u)) {
541-
val newState = current.newQuarantined()
541+
val newState = current.newQuarantined(harmless)
542542
if (swapState(current, newState)) {
543543
// quarantine state change was performed
544544
if (harmless) {

remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala

+18-8
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,27 @@ private[remote] class InboundQuarantineCheck(inboundContext: InboundContext)
4545
env.association match {
4646
case OptionVal.Some(association) =>
4747
if (association.associationState.isQuarantined(env.originUid)) {
48-
if (log.isDebugEnabled)
49-
log.debug(
50-
"Dropping message [{}] from [{}#{}] because the system is quarantined",
48+
if (!inboundContext.settings.PropagateHarmlessQuarantineEvents
49+
&& association.associationState.quarantinedButHarmless(env.originUid)) {
50+
log.info(
51+
"Message [{}] from [{}#{}] was dropped. " +
52+
"The system is quarantined but the UID is known to be harmless.",
5153
Logging.messageClassName(env.message),
5254
association.remoteAddress,
5355
env.originUid)
54-
// avoid starting outbound stream for heartbeats
55-
if (!env.message.isInstanceOf[Quarantined] && !isHeartbeat(env.message))
56-
inboundContext.sendControl(
57-
association.remoteAddress,
58-
Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid)))
56+
} else {
57+
if (log.isDebugEnabled)
58+
log.debug(
59+
"Dropping message [{}] from [{}#{}] because the system is quarantined",
60+
Logging.messageClassName(env.message),
61+
association.remoteAddress,
62+
env.originUid)
63+
// avoid starting outbound stream for heartbeats
64+
if (!env.message.isInstanceOf[Quarantined] && !isHeartbeat(env.message))
65+
inboundContext.sendControl(
66+
association.remoteAddress,
67+
Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid)))
68+
}
5969
pull(in)
6070
} else
6171
push(out, env)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* license agreements; and to You under the Apache License, version 2.0:
4+
*
5+
* https://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* This file is part of the Apache Pekko project, which was derived from Akka.
8+
*/
9+
10+
/*
11+
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
12+
*/
13+
14+
package org.apache.pekko.remote.artery
15+
16+
import scala.concurrent.Future
17+
import scala.concurrent.Promise
18+
19+
import org.scalatest.concurrent.Eventually
20+
import org.scalatest.time.Span
21+
22+
import org.apache.pekko
23+
import pekko.actor.ActorRef
24+
import pekko.actor.ActorSystem
25+
import pekko.actor.Address
26+
import pekko.actor.RootActorPath
27+
import pekko.remote.RARP
28+
import pekko.remote.UniqueAddress
29+
import pekko.testkit.ImplicitSender
30+
import pekko.testkit.TestActors
31+
import pekko.testkit.TestProbe
32+
33+
class HarmlessQuarantineSpec extends ArteryMultiNodeSpec("""
34+
pekko.loglevel=INFO
35+
pekko.remote.artery.propagate-harmless-quarantine-events = off
36+
pekko.remote.artery.advanced {
37+
stop-idle-outbound-after = 1 s
38+
connection-timeout = 2 s
39+
remove-quarantined-association-after = 1 s
40+
compression {
41+
actor-refs.advertisement-interval = 5 seconds
42+
}
43+
}
44+
""") with ImplicitSender with Eventually {
45+
46+
override implicit val patience: PatienceConfig = {
47+
import pekko.testkit.TestDuration
48+
PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated * 2, Span(200, org.scalatest.time.Millis))
49+
}
50+
51+
private def futureUniqueRemoteAddress(association: Association): Future[UniqueAddress] = {
52+
val p = Promise[UniqueAddress]()
53+
association.associationState.addUniqueRemoteAddressListener(a => p.success(a))
54+
p.future
55+
}
56+
57+
"Harmless Quarantine Events" should {
58+
59+
"eliminate quarantined association when not used - echo test" in withAssociation {
60+
(remoteSystem, remoteAddress, _, localArtery, localProbe) =>
61+
// event to watch out for, indicator of the issue
62+
remoteSystem.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent])
63+
64+
val remoteEcho = remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue
65+
66+
val localAddress = RARP(system).provider.getDefaultAddress
67+
68+
val localEchoRef =
69+
remoteSystem.actorSelection(RootActorPath(localAddress) / localProbe.ref.path.elements).resolveOne(
70+
remainingOrDefault).futureValue
71+
remoteEcho.tell("ping", localEchoRef)
72+
localProbe.expectMsg("ping")
73+
74+
val association = localArtery.association(remoteAddress)
75+
val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
76+
localArtery.quarantine(remoteAddress, Some(remoteUid), "Test")
77+
association.associationState.isQuarantined(remoteUid) shouldBe true
78+
association.associationState.quarantinedButHarmless(remoteUid) shouldBe false
79+
80+
remoteEcho.tell("ping", localEchoRef) // trigger sending message from remote to local, which will trigger local to wrongfully notify remote that it is quarantined
81+
eventually {
82+
expectMsgType[ThisActorSystemQuarantinedEvent] // this is what remote emits when it learns it is quarantined by local
83+
}
84+
}
85+
86+
"eliminate quarantined association when not used - echo test (harmless=true)" in withAssociation {
87+
(remoteSystem, remoteAddress, _, localArtery, localProbe) =>
88+
// event to watch out for, indicator of the issue
89+
remoteSystem.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent])
90+
91+
val remoteEcho = remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue
92+
93+
val localAddress = RARP(system).provider.getDefaultAddress
94+
95+
val localEchoRef =
96+
remoteSystem.actorSelection(RootActorPath(localAddress) / localProbe.ref.path.elements).resolveOne(
97+
remainingOrDefault).futureValue
98+
remoteEcho.tell("ping", localEchoRef)
99+
localProbe.expectMsg("ping")
100+
101+
val association = localArtery.association(remoteAddress)
102+
val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
103+
localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest", harmless = true)
104+
association.associationState.isQuarantined(remoteUid) shouldBe true
105+
association.associationState.quarantinedButHarmless(remoteUid) shouldBe true
106+
107+
remoteEcho.tell("ping", localEchoRef) // trigger sending message from remote to local, which will trigger local to wrongfully notify remote that it is quarantined
108+
eventually {
109+
expectNoMessage()
110+
}
111+
}
112+
113+
/**
114+
* Test setup fixture:
115+
* 1. A 'remote' ActorSystem is created to spawn an Echo actor,
116+
* 2. A TestProbe is spawned locally to initiate communication with the Echo actor
117+
* 3. Details (remoteAddress, remoteEcho, localArtery, localProbe) are supplied to the test
118+
*/
119+
def withAssociation(test: (ActorSystem, Address, ActorRef, ArteryTransport, TestProbe) => Any): Unit = {
120+
val remoteSystem = newRemoteSystem()
121+
try {
122+
remoteSystem.actorOf(TestActors.echoActorProps, "echo")
123+
val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress
124+
125+
def remoteEcho = system.actorSelection(RootActorPath(remoteAddress) / "user" / "echo")
126+
127+
val echoRef = remoteEcho.resolveOne(remainingOrDefault).futureValue
128+
val localProbe = new TestProbe(localSystem)
129+
130+
echoRef.tell("ping", localProbe.ref)
131+
localProbe.expectMsg("ping")
132+
133+
val artery = RARP(system).provider.transport.asInstanceOf[ArteryTransport]
134+
135+
test(remoteSystem, remoteAddress, echoRef, artery, localProbe)
136+
137+
} finally {
138+
shutdown(remoteSystem)
139+
}
140+
}
141+
}
142+
}

remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala

+84-6
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@ import pekko.testkit.ImplicitSender
3131
import pekko.testkit.TestActors
3232
import pekko.testkit.TestProbe
3333

34-
class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s"""
34+
class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec("""
3535
pekko.loglevel=INFO
36-
pekko.remote.artery.advanced.stop-idle-outbound-after = 1 s
37-
pekko.remote.artery.advanced.connection-timeout = 2 s
38-
pekko.remote.artery.advanced.remove-quarantined-association-after = 1 s
39-
pekko.remote.artery.advanced.compression {
40-
actor-refs.advertisement-interval = 5 seconds
36+
pekko.remote.artery.advanced {
37+
stop-idle-outbound-after = 1 s
38+
connection-timeout = 2 s
39+
remove-quarantined-association-after = 1 s
40+
compression {
41+
actor-refs.advertisement-interval = 5 seconds
42+
}
4143
}
4244
""") with ImplicitSender with Eventually {
4345

@@ -116,6 +118,8 @@ class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s"""
116118
val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
117119

118120
localArtery.quarantine(remoteAddress, Some(remoteUid), "Test")
121+
association.associationState.isQuarantined(remoteUid) shouldBe true
122+
association.associationState.quarantinedButHarmless(remoteUid) shouldBe false
119123

120124
eventually {
121125
assertStreamActive(association, Association.ControlQueueIndex, expected = false)
@@ -128,6 +132,80 @@ class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s"""
128132
}
129133
}
130134

135+
"eliminate quarantined association when not used (harmless=true)" in withAssociation {
136+
(_, remoteAddress, _, localArtery, _) =>
137+
val association = localArtery.association(remoteAddress)
138+
val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
139+
140+
localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest", harmless = true)
141+
association.associationState.isQuarantined(remoteUid) shouldBe true
142+
association.associationState.quarantinedButHarmless(remoteUid) shouldBe true
143+
144+
eventually {
145+
assertStreamActive(association, Association.ControlQueueIndex, expected = false)
146+
assertStreamActive(association, Association.OrdinaryQueueIndex, expected = false)
147+
}
148+
149+
// the outbound streams are inactive and association quarantined, then it's completely removed
150+
eventually {
151+
localArtery.remoteAddresses should not contain remoteAddress
152+
}
153+
}
154+
155+
"eliminate quarantined association when not used - echo test" in withAssociation {
156+
(remoteSystem, remoteAddress, _, localArtery, localProbe) =>
157+
// event to watch out for, indicator of the issue
158+
remoteSystem.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent])
159+
160+
val remoteEcho = remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue
161+
162+
val localAddress = RARP(system).provider.getDefaultAddress
163+
164+
val localEchoRef =
165+
remoteSystem.actorSelection(RootActorPath(localAddress) / localProbe.ref.path.elements).resolveOne(
166+
remainingOrDefault).futureValue
167+
remoteEcho.tell("ping", localEchoRef)
168+
localProbe.expectMsg("ping")
169+
170+
val association = localArtery.association(remoteAddress)
171+
val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
172+
localArtery.quarantine(remoteAddress, Some(remoteUid), "Test")
173+
association.associationState.isQuarantined(remoteUid) shouldBe true
174+
association.associationState.quarantinedButHarmless(remoteUid) shouldBe false
175+
176+
remoteEcho.tell("ping", localEchoRef) // trigger sending message from remote to local, which will trigger local to wrongfully notify remote that it is quarantined
177+
eventually {
178+
expectMsgType[ThisActorSystemQuarantinedEvent] // this is what remote emits when it learns it is quarantined by local
179+
}
180+
}
181+
182+
"eliminate quarantined association when not used - echo test (harmless=true)" in withAssociation {
183+
(remoteSystem, remoteAddress, _, localArtery, localProbe) =>
184+
// event to watch out for, indicator of the issue
185+
remoteSystem.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent])
186+
187+
val remoteEcho = remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue
188+
189+
val localAddress = RARP(system).provider.getDefaultAddress
190+
191+
val localEchoRef =
192+
remoteSystem.actorSelection(RootActorPath(localAddress) / localProbe.ref.path.elements).resolveOne(
193+
remainingOrDefault).futureValue
194+
remoteEcho.tell("ping", localEchoRef)
195+
localProbe.expectMsg("ping")
196+
197+
val association = localArtery.association(remoteAddress)
198+
val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
199+
localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest", harmless = true)
200+
association.associationState.isQuarantined(remoteUid) shouldBe true
201+
association.associationState.quarantinedButHarmless(remoteUid) shouldBe true
202+
203+
remoteEcho.tell("ping", localEchoRef) // trigger sending message from remote to local, which will trigger local to wrongfully notify remote that it is quarantined
204+
eventually {
205+
expectMsgType[ThisActorSystemQuarantinedEvent] // this is what remote emits when it learns it is quarantined by local
206+
}
207+
}
208+
131209
"remove inbound compression after quarantine" in withAssociation { (_, remoteAddress, _, localArtery, _) =>
132210
val association = localArtery.association(remoteAddress)
133211
val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid

0 commit comments

Comments
 (0)