Skip to content

Commit a1e124c

Browse files
fix spec
1 parent 5cde605 commit a1e124c

File tree

2 files changed

+26
-11
lines changed

2 files changed

+26
-11
lines changed

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,6 @@ abstract class ClusterShardingInstrumentationSpec
9494
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
9595
.shardHomeRequests
9696

97-
val shardHomeResponses =
98-
ClusterShardingInstrumentationProvider(system).instrumentation
99-
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
100-
.shardHomeResponses
101-
10297
override implicit val patienceConfig: PatienceConfig = {
10398
import akka.testkit.TestDuration
10499
PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated, Span(1000, org.scalatest.time.Millis))
@@ -191,7 +186,21 @@ abstract class ClusterShardingInstrumentationSpec
191186

192187
"record latency of requesting ShardHome" in {
193188
runOn(second) {
194-
shardHomeRequests.get() shouldBe shardHomeResponses.get()
189+
eventually(timeout(Span(5, Seconds))) {
190+
shardHomeRequests.size > 0 shouldBe true
191+
shardHomeRequests.foreach {
192+
case (key, value) =>
193+
if (key.startsWith("id")) {
194+
// The "id-0", "id-1" ... messages were send during the blackhole.
195+
// This means they were requested twice, but only received back once.
196+
value.get() shouldBe 1
197+
} else {
198+
// The "a", "b", "c" messages were send before the blackhole.
199+
// This means they got a proper send-and-request cycle
200+
value.get() shouldBe 0
201+
}
202+
}
203+
}
195204
}
196205
enterBarrier("measure-shard-home-latency")
197206
}

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package akka.cluster.sharding
77
import java.util.concurrent.atomic.AtomicInteger
88

99
import scala.annotation.nowarn
10+
import scala.collection.concurrent.TrieMap
1011

1112
import akka.actor.ActorRef
1213
import akka.actor.Address
@@ -19,8 +20,9 @@ class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") syst
1920
val shardRegionBufferSizeCounter = new AtomicInteger(0)
2021
val beginShardHandoffDurationCounter = new AtomicInteger(0)
2122
val finishShardHandoffDurationCounter = new AtomicInteger(0)
22-
val shardHomeRequests = new AtomicInteger(0)
23-
val shardHomeResponses = new AtomicInteger(0)
23+
24+
val shardHomeRequests = new TrieMap[String, AtomicInteger]()
25+
2426
val dropMessageCounter = new AtomicInteger(0)
2527

2628
override def shardRegionBufferSize(
@@ -55,14 +57,18 @@ class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") syst
5557
selfAddress: Address,
5658
shardRegionActor: ActorRef,
5759
typeName: String,
58-
shardId: String): Unit =
59-
shardHomeRequests.incrementAndGet()
60+
shardId: String): Unit = shardHomeRequests.getOrElseUpdate(shardId, new AtomicInteger(0)).incrementAndGet()
6061

6162
override def receivedShardHome(
6263
selfAddress: Address,
6364
shardRegionActor: ActorRef,
6465
typeName: String,
65-
shardId: String): Unit = shardHomeResponses.incrementAndGet()
66+
shardId: String): Unit =
67+
shardHomeRequests.get(shardId) match {
68+
case Some(value) =>
69+
value.decrementAndGet()
70+
case None => () // should not happen
71+
}
6672

6773
override def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit =
6874
dropMessageCounter.incrementAndGet()

0 commit comments

Comments
 (0)