Skip to content

Commit b954655

Browse files
authored
chore: Harden ShardedDaemonProcessRescaleSpec (#32855)
* couldn't use the receptionist group router because it sometimes dropped messages, probably some init race condition * using a singleton instead
1 parent faaa7cb commit b954655

File tree

1 file changed

+76
-42
lines changed

1 file changed

+76
-42
lines changed

akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessRescaleSpec.scala

Lines changed: 76 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,22 @@ package akka.cluster.sharding.typed
77
import scala.concurrent.duration._
88

99
import com.typesafe.config.ConfigFactory
10+
import org.scalatest.concurrent.Eventually
1011
import org.scalatest.concurrent.ScalaFutures
12+
import org.scalatest.time.Span
1113

14+
import akka.Done
1215
import akka.actor.testkit.typed.scaladsl.TestProbe
1316
import akka.actor.typed.ActorRef
17+
import akka.actor.typed.ActorSystem
1418
import akka.actor.typed.Behavior
15-
import akka.actor.typed.receptionist.Receptionist
16-
import akka.actor.typed.receptionist.ServiceKey
1719
import akka.actor.typed.scaladsl.Behaviors
18-
import akka.actor.typed.scaladsl.Routers
1920
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
2021
import akka.cluster.MultiNodeClusterSpec
2122
import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
23+
import akka.cluster.typed.ClusterSingleton
2224
import akka.cluster.typed.MultiNodeTypedClusterSpec
25+
import akka.cluster.typed.SingletonActor
2326
import akka.pattern.StatusReply
2427
import akka.remote.testkit.MultiNodeConfig
2528
import akka.remote.testkit.MultiNodeSpec
@@ -34,28 +37,57 @@ object ShardedDaemonProcessRescaleSpec extends MultiNodeConfig {
3437
val sixth = role("sixth")
3538
val seventh = role("seventh")
3639

37-
val SnitchServiceKey = ServiceKey[AnyRef]("snitch")
38-
3940
case class ProcessActorEvent(id: Int, event: Any) extends CborSerializable
4041

4142
object ProcessActor {
4243
trait Command
4344
case object Stop extends Command
4445

45-
def apply(id: Int): Behavior[Command] = Behaviors.setup { ctx =>
46+
def apply(id: Int, collector: ActorRef[Collector.Command]): Behavior[Command] = Behaviors.setup { ctx =>
4647
ctx.log.info("Started [{}]", id)
47-
val snitchRouter = ctx.spawn(Routers.group(SnitchServiceKey), "router")
48-
snitchRouter ! ProcessActorEvent(id, "Started")
48+
collector ! Collector.Started(id)
4949

5050
Behaviors.receiveMessagePartial {
5151
case Stop =>
5252
ctx.log.info("Stopped [{}]", id)
53-
snitchRouter ! ProcessActorEvent(id, "Stopped")
53+
collector ! Collector.Stopped(id)
5454
Behaviors.stopped
5555
}
5656
}
5757
}
5858

59+
object Collector {
60+
sealed trait Command extends CborSerializable
61+
final case class Started(id: Int) extends Command
62+
final case class Stopped(id: Int) extends Command
63+
final case class Get(replyTo: ActorRef[Counts]) extends Command
64+
final case class Counts(startedCount: Int, stoppedCount: Int) extends CborSerializable
65+
final case class Reset(replyTo: ActorRef[Done]) extends Command
66+
67+
def init(system: ActorSystem[_]): ActorRef[Command] = {
68+
ClusterSingleton(system).init(SingletonActor(Collector(), "collector"))
69+
}
70+
71+
def apply(): Behavior[Command] = {
72+
behavior(Counts(startedCount = 0, stoppedCount = 0))
73+
}
74+
75+
private def behavior(counts: Counts): Behavior[Command] = {
76+
Behaviors.receiveMessage {
77+
case Started(_) =>
78+
behavior(counts.copy(startedCount = counts.startedCount + 1))
79+
case Stopped(_) =>
80+
behavior(counts.copy(stoppedCount = counts.stoppedCount + 1))
81+
case Get(replyTo) =>
82+
replyTo ! counts
83+
Behaviors.same
84+
case Reset(replyTo) =>
85+
replyTo ! Done
86+
behavior(Counts(0, 0))
87+
}
88+
}
89+
}
90+
5991
commonConfig(
6092
ConfigFactory.parseString("""
6193
akka.loglevel = DEBUG
@@ -89,12 +121,19 @@ class ShardedDaemonProcessRescaleMultiJvmNode7 extends ShardedDaemonProcessResca
89121
abstract class ShardedDaemonProcessRescaleSpec
90122
extends MultiNodeSpec(ShardedDaemonProcessRescaleSpec)
91123
with MultiNodeTypedClusterSpec
92-
with ScalaFutures {
124+
with ScalaFutures
125+
with Eventually {
93126

94127
import ShardedDaemonProcessRescaleSpec._
95128

96-
val topicProbe: TestProbe[AnyRef] = TestProbe[AnyRef]()
129+
implicit val patience: PatienceConfig = {
130+
import akka.testkit.TestDuration
131+
PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated * 2, Span(500, org.scalatest.time.Millis))
132+
}
133+
97134
private var sdp: ActorRef[ShardedDaemonProcessCommand] = _
135+
private var collector: ActorRef[Collector.Command] = _
136+
private val resetProbe = TestProbe[Done]()
98137

99138
private def assertNumberOfProcesses(n: Int, revision: Int): Unit = {
100139
val probe = TestProbe[NumberOfProcesses]()
@@ -108,35 +147,23 @@ abstract class ShardedDaemonProcessRescaleSpec
108147
"Cluster sharding in multi dc cluster" must {
109148
"form cluster" in {
110149
formCluster(first, second, third, fourth, fifth, sixth, seventh)
111-
runOn(first) {
112-
typedSystem.receptionist ! Receptionist.Register(SnitchServiceKey, topicProbe.ref, topicProbe.ref)
113-
topicProbe.expectMessageType[Receptionist.Registered]
114-
}
115-
enterBarrier("snitch-registered")
116150

117-
topicProbe.awaitAssert({
118-
typedSystem.receptionist ! Receptionist.Find(SnitchServiceKey, topicProbe.ref)
119-
topicProbe.expectMessageType[Receptionist.Listing].serviceInstances(SnitchServiceKey).size should ===(1)
120-
}, 5.seconds)
121-
enterBarrier("snitch-seen")
151+
collector = Collector.init(system.toTyped)
152+
enterBarrier("collector-started")
122153
}
123154

124155
"init actor set" in {
125156
sdp = ShardedDaemonProcess(typedSystem).initWithContext(
126157
"the-fearless",
127158
4,
128-
ctx => ProcessActor(ctx.processNumber),
159+
ctx => ProcessActor(ctx.processNumber, collector),
129160
ShardedDaemonProcessSettings(system.toTyped),
130161
ProcessActor.Stop)
131162
enterBarrier("sharded-daemon-process-initialized")
132-
runOn(first) {
133-
val startedIds = (0 to 3).map { _ =>
134-
val event = topicProbe.expectMessageType[ProcessActorEvent](5.seconds)
135-
event.event should ===("Started")
136-
event.id
137-
}.toSet
138-
startedIds.size should ===(4)
139-
topicProbe.expectNoMessage()
163+
eventually {
164+
val countsReplyProbe = TestProbe[Collector.Counts]()
165+
collector ! Collector.Get(countsReplyProbe.ref)
166+
countsReplyProbe.expectMessage(500.millis, Collector.Counts(startedCount = 4, stoppedCount = 0))
140167
}
141168
enterBarrier("sharded-daemon-process-started-acked")
142169
runOn(third) {
@@ -147,14 +174,18 @@ abstract class ShardedDaemonProcessRescaleSpec
147174

148175
"rescale to 8 workers" in {
149176
runOn(first) {
177+
collector ! Collector.Reset(resetProbe.ref)
178+
resetProbe.expectMessage(Done)
179+
150180
val probe = TestProbe[AnyRef]()
151181
sdp ! ChangeNumberOfProcesses(8, probe.ref)
152182
probe.expectMessage(30.seconds, StatusReply.Ack)
153-
// FIXME snitch router is dropping messages
154-
// val events = topicProbe.receiveMessages(4 + 8, 10.seconds).map(_.asInstanceOf[ProcessActorEvent])
155-
// events.collect { case evt if evt.event == "Stopped" => evt.id }.toSet.size should ===(4)
156-
// events.collect { case evt if evt.event == "Started" => evt.id }.toSet.size should ===(8)
157-
// topicProbe.expectNoMessage()
183+
}
184+
enterBarrier("sharded-daemon-process-rescaled-to-8")
185+
eventually {
186+
val countsReplyProbe = TestProbe[Collector.Counts]()
187+
collector ! Collector.Get(countsReplyProbe.ref)
188+
countsReplyProbe.expectMessage(500.millis, Collector.Counts(startedCount = 8, stoppedCount = 4))
158189
}
159190

160191
enterBarrier("sharded-daemon-process-rescaled-to-8-acked")
@@ -166,18 +197,21 @@ abstract class ShardedDaemonProcessRescaleSpec
166197

167198
"rescale to 2 workers" in {
168199
runOn(second) {
200+
collector ! Collector.Reset(resetProbe.ref)
201+
resetProbe.expectMessage(Done)
202+
169203
val probe = TestProbe[AnyRef]()
170204
sdp ! ChangeNumberOfProcesses(2, probe.ref)
171205
probe.expectMessage(30.seconds, StatusReply.Ack)
172206
}
173-
enterBarrier("sharded-daemon-process-rescaled-to-2-acked")
174-
runOn(first) {
175-
// FIXME snitch router is dropping messages
176-
// val events = topicProbe.receiveMessages(8 + 2, 10.seconds).map(_.asInstanceOf[ProcessActorEvent])
177-
// events.collect { case evt if evt.event == "Stopped" => evt.id }.toSet.size should ===(8)
178-
// events.collect { case evt if evt.event == "Started" => evt.id }.toSet.size should ===(2)
179-
// topicProbe.expectNoMessage()
207+
enterBarrier("sharded-daemon-process-rescaled-to-2")
208+
eventually {
209+
val countsReplyProbe = TestProbe[Collector.Counts]()
210+
collector ! Collector.Get(countsReplyProbe.ref)
211+
countsReplyProbe.expectMessage(500.millis, Collector.Counts(startedCount = 2, stoppedCount = 8))
180212
}
213+
214+
enterBarrier("sharded-daemon-process-rescaled-to-2-acked")
181215
runOn(third) {
182216
assertNumberOfProcesses(n = 2, revision = 2)
183217
}

0 commit comments

Comments
 (0)