@@ -7,19 +7,22 @@ package akka.cluster.sharding.typed
77import scala .concurrent .duration ._
88
99import com .typesafe .config .ConfigFactory
10+ import org .scalatest .concurrent .Eventually
1011import org .scalatest .concurrent .ScalaFutures
12+ import org .scalatest .time .Span
1113
14+ import akka .Done
1215import akka .actor .testkit .typed .scaladsl .TestProbe
1316import akka .actor .typed .ActorRef
17+ import akka .actor .typed .ActorSystem
1418import akka .actor .typed .Behavior
15- import akka .actor .typed .receptionist .Receptionist
16- import akka .actor .typed .receptionist .ServiceKey
1719import akka .actor .typed .scaladsl .Behaviors
18- import akka .actor .typed .scaladsl .Routers
1920import akka .actor .typed .scaladsl .adapter .ClassicActorSystemOps
2021import akka .cluster .MultiNodeClusterSpec
2122import akka .cluster .sharding .typed .scaladsl .ShardedDaemonProcess
23+ import akka .cluster .typed .ClusterSingleton
2224import akka .cluster .typed .MultiNodeTypedClusterSpec
25+ import akka .cluster .typed .SingletonActor
2326import akka .pattern .StatusReply
2427import akka .remote .testkit .MultiNodeConfig
2528import akka .remote .testkit .MultiNodeSpec
@@ -34,28 +37,57 @@ object ShardedDaemonProcessRescaleSpec extends MultiNodeConfig {
3437 val sixth = role(" sixth" )
3538 val seventh = role(" seventh" )
3639
37- val SnitchServiceKey = ServiceKey [AnyRef ](" snitch" )
38-
3940 case class ProcessActorEvent (id : Int , event : Any ) extends CborSerializable
4041
4142 object ProcessActor {
4243 trait Command
4344 case object Stop extends Command
4445
45- def apply (id : Int ): Behavior [Command ] = Behaviors .setup { ctx =>
46+ def apply (id : Int , collector : ActorRef [ Collector . Command ] ): Behavior [Command ] = Behaviors .setup { ctx =>
4647 ctx.log.info(" Started [{}]" , id)
47- val snitchRouter = ctx.spawn(Routers .group(SnitchServiceKey ), " router" )
48- snitchRouter ! ProcessActorEvent (id, " Started" )
48+ collector ! Collector .Started (id)
4949
5050 Behaviors .receiveMessagePartial {
5151 case Stop =>
5252 ctx.log.info(" Stopped [{}]" , id)
53- snitchRouter ! ProcessActorEvent (id, " Stopped " )
53+ collector ! Collector . Stopped (id)
5454 Behaviors .stopped
5555 }
5656 }
5757 }
5858
59+ object Collector {
60+ sealed trait Command extends CborSerializable
61+ final case class Started (id : Int ) extends Command
62+ final case class Stopped (id : Int ) extends Command
63+ final case class Get (replyTo : ActorRef [Counts ]) extends Command
64+ final case class Counts (startedCount : Int , stoppedCount : Int ) extends CborSerializable
65+ final case class Reset (replyTo : ActorRef [Done ]) extends Command
66+
67+ def init (system : ActorSystem [_]): ActorRef [Command ] = {
68+ ClusterSingleton (system).init(SingletonActor (Collector (), " collector" ))
69+ }
70+
71+ def apply (): Behavior [Command ] = {
72+ behavior(Counts (startedCount = 0 , stoppedCount = 0 ))
73+ }
74+
75+ private def behavior (counts : Counts ): Behavior [Command ] = {
76+ Behaviors .receiveMessage {
77+ case Started (_) =>
78+ behavior(counts.copy(startedCount = counts.startedCount + 1 ))
79+ case Stopped (_) =>
80+ behavior(counts.copy(stoppedCount = counts.stoppedCount + 1 ))
81+ case Get (replyTo) =>
82+ replyTo ! counts
83+ Behaviors .same
84+ case Reset (replyTo) =>
85+ replyTo ! Done
86+ behavior(Counts (0 , 0 ))
87+ }
88+ }
89+ }
90+
5991 commonConfig(
6092 ConfigFactory .parseString("""
6193 akka.loglevel = DEBUG
@@ -89,12 +121,19 @@ class ShardedDaemonProcessRescaleMultiJvmNode7 extends ShardedDaemonProcessResca
89121abstract class ShardedDaemonProcessRescaleSpec
90122 extends MultiNodeSpec (ShardedDaemonProcessRescaleSpec )
91123 with MultiNodeTypedClusterSpec
92- with ScalaFutures {
124+ with ScalaFutures
125+ with Eventually {
93126
94127 import ShardedDaemonProcessRescaleSpec ._
95128
96- val topicProbe : TestProbe [AnyRef ] = TestProbe [AnyRef ]()
129+ implicit val patience : PatienceConfig = {
130+ import akka .testkit .TestDuration
131+ PatienceConfig (testKitSettings.DefaultTimeout .duration.dilated * 2 , Span (500 , org.scalatest.time.Millis ))
132+ }
133+
97134 private var sdp : ActorRef [ShardedDaemonProcessCommand ] = _
135+ private var collector : ActorRef [Collector .Command ] = _
136+ private val resetProbe = TestProbe [Done ]()
98137
99138 private def assertNumberOfProcesses (n : Int , revision : Int ): Unit = {
100139 val probe = TestProbe [NumberOfProcesses ]()
@@ -108,35 +147,23 @@ abstract class ShardedDaemonProcessRescaleSpec
108147 " Cluster sharding in multi dc cluster" must {
109148 " form cluster" in {
110149 formCluster(first, second, third, fourth, fifth, sixth, seventh)
111- runOn(first) {
112- typedSystem.receptionist ! Receptionist .Register (SnitchServiceKey , topicProbe.ref, topicProbe.ref)
113- topicProbe.expectMessageType[Receptionist .Registered ]
114- }
115- enterBarrier(" snitch-registered" )
116150
117- topicProbe.awaitAssert({
118- typedSystem.receptionist ! Receptionist .Find (SnitchServiceKey , topicProbe.ref)
119- topicProbe.expectMessageType[Receptionist .Listing ].serviceInstances(SnitchServiceKey ).size should === (1 )
120- }, 5 .seconds)
121- enterBarrier(" snitch-seen" )
151+ collector = Collector .init(system.toTyped)
152+ enterBarrier(" collector-started" )
122153 }
123154
124155 " init actor set" in {
125156 sdp = ShardedDaemonProcess (typedSystem).initWithContext(
126157 " the-fearless" ,
127158 4 ,
128- ctx => ProcessActor (ctx.processNumber),
159+ ctx => ProcessActor (ctx.processNumber, collector ),
129160 ShardedDaemonProcessSettings (system.toTyped),
130161 ProcessActor .Stop )
131162 enterBarrier(" sharded-daemon-process-initialized" )
132- runOn(first) {
133- val startedIds = (0 to 3 ).map { _ =>
134- val event = topicProbe.expectMessageType[ProcessActorEvent ](5 .seconds)
135- event.event should === (" Started" )
136- event.id
137- }.toSet
138- startedIds.size should === (4 )
139- topicProbe.expectNoMessage()
163+ eventually {
164+ val countsReplyProbe = TestProbe [Collector .Counts ]()
165+ collector ! Collector .Get (countsReplyProbe.ref)
166+ countsReplyProbe.expectMessage(500 .millis, Collector .Counts (startedCount = 4 , stoppedCount = 0 ))
140167 }
141168 enterBarrier(" sharded-daemon-process-started-acked" )
142169 runOn(third) {
@@ -147,14 +174,18 @@ abstract class ShardedDaemonProcessRescaleSpec
147174
148175 " rescale to 8 workers" in {
149176 runOn(first) {
177+ collector ! Collector .Reset (resetProbe.ref)
178+ resetProbe.expectMessage(Done )
179+
150180 val probe = TestProbe [AnyRef ]()
151181 sdp ! ChangeNumberOfProcesses (8 , probe.ref)
152182 probe.expectMessage(30 .seconds, StatusReply .Ack )
153- // FIXME snitch router is dropping messages
154- // val events = topicProbe.receiveMessages(4 + 8, 10.seconds).map(_.asInstanceOf[ProcessActorEvent])
155- // events.collect { case evt if evt.event == "Stopped" => evt.id }.toSet.size should ===(4)
156- // events.collect { case evt if evt.event == "Started" => evt.id }.toSet.size should ===(8)
157- // topicProbe.expectNoMessage()
183+ }
184+ enterBarrier(" sharded-daemon-process-rescaled-to-8" )
185+ eventually {
186+ val countsReplyProbe = TestProbe [Collector .Counts ]()
187+ collector ! Collector .Get (countsReplyProbe.ref)
188+ countsReplyProbe.expectMessage(500 .millis, Collector .Counts (startedCount = 8 , stoppedCount = 4 ))
158189 }
159190
160191 enterBarrier(" sharded-daemon-process-rescaled-to-8-acked" )
@@ -166,18 +197,21 @@ abstract class ShardedDaemonProcessRescaleSpec
166197
167198 " rescale to 2 workers" in {
168199 runOn(second) {
200+ collector ! Collector .Reset (resetProbe.ref)
201+ resetProbe.expectMessage(Done )
202+
169203 val probe = TestProbe [AnyRef ]()
170204 sdp ! ChangeNumberOfProcesses (2 , probe.ref)
171205 probe.expectMessage(30 .seconds, StatusReply .Ack )
172206 }
173- enterBarrier(" sharded-daemon-process-rescaled-to-2-acked" )
174- runOn(first) {
175- // FIXME snitch router is dropping messages
176- // val events = topicProbe.receiveMessages(8 + 2, 10.seconds).map(_.asInstanceOf[ProcessActorEvent])
177- // events.collect { case evt if evt.event == "Stopped" => evt.id }.toSet.size should ===(8)
178- // events.collect { case evt if evt.event == "Started" => evt.id }.toSet.size should ===(2)
179- // topicProbe.expectNoMessage()
207+ enterBarrier(" sharded-daemon-process-rescaled-to-2" )
208+ eventually {
209+ val countsReplyProbe = TestProbe [Collector .Counts ]()
210+ collector ! Collector .Get (countsReplyProbe.ref)
211+ countsReplyProbe.expectMessage(500 .millis, Collector .Counts (startedCount = 2 , stoppedCount = 8 ))
180212 }
213+
214+ enterBarrier(" sharded-daemon-process-rescaled-to-2-acked" )
181215 runOn(third) {
182216 assertNumberOfProcesses(n = 2 , revision = 2 )
183217 }
0 commit comments