diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index aed929f79e9..73f715fd7e3 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -1297,6 +1297,7 @@ private[akka] class ShardRegion( loggedFullBufferWarning = true } context.system.deadLetters ! msg + instrumentation.messageDropped(cluster.selfAddress, self, typeName) } else { shardBuffers.append(shardId, msg, snd) instrumentation.shardRegionBufferSizeIncremented(cluster.selfAddress, self, typeName) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala index 53881871779..690c79e61f0 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala @@ -101,6 +101,9 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding shardId: String): Unit = instrumentations.foreach(_.regionRequestedShardHome(selfAddress, shardRegionActor, typeName, shardId)) + override def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit = + instrumentations.foreach(_.messageDropped(selfAddress, self, typeName)) + override def receivedShardHome( selfAddress: Address, shardRegionActor: ActorRef, @@ -163,6 +166,8 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation typeName: String, shardId: String): Unit = () + override def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit = () + override def shardHandoffStarted( selfAddress: Address, shardRegionActor: ActorRef, @@ -197,6 +202,8 @@ trait ClusterShardingInstrumentation { def receivedShardHome(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, shardId: String): Unit + def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit + def shardHandoffStarted(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String, shard: String): Unit def shardHandoffFinished( diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala index ffe75742bf1..2bf6e389e2f 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala @@ -8,7 +8,6 @@ import scala.concurrent.duration._ import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{ Seconds, Span } - import org.scalatest.concurrent.Eventually.eventually import akka.actor.{ Actor, ActorLogging, Address, Props } @@ -86,16 +85,15 @@ abstract class ClusterShardingInstrumentationSpec .asInstanceOf[ClusterShardingInstrumentationSpecTelemetry] .shardRegionBufferSizeCounter + private val dropMessageCounter = ClusterShardingInstrumentationProvider(system).instrumentation + .asInstanceOf[ClusterShardingInstrumentationSpecTelemetry] + .dropMessageCounter + val shardHomeRequests = ClusterShardingInstrumentationProvider(system).instrumentation .asInstanceOf[ClusterShardingInstrumentationSpecTelemetry] .shardHomeRequests - val shardHomeResponses = - ClusterShardingInstrumentationProvider(system).instrumentation - .asInstanceOf[ClusterShardingInstrumentationSpecTelemetry] - .shardHomeResponses - override implicit val patienceConfig: PatienceConfig = { import akka.testkit.TestDuration PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated, Span(1000, org.scalatest.time.Millis)) @@ -173,6 +171,7 @@ abstract class ClusterShardingInstrumentationSpec // we have 100 in the buffer, and our cap is 120 (per config in this test) // 10 are dropped. Should we have a metric on this? Or custom events? shardRegionBufferSizeCounter.get() shouldBe 120 + dropMessageCounter.get() shouldBe 10 } } enterBarrier("buffer-overflow") @@ -187,7 +186,21 @@ abstract class ClusterShardingInstrumentationSpec "record latency of requesting ShardHome" in { runOn(second) { - shardHomeRequests.get() shouldBe shardHomeResponses.get() + eventually(timeout(Span(5, Seconds))) { + shardHomeRequests.size > 0 shouldBe true + shardHomeRequests.foreach { + case (key, value) => + if (key.startsWith("id")) { + // The "id-0", "id-1" ... messages were send during the blackhole. + // This means they were requested twice, but only received back once. + value.get() shouldBe 1 + } else { + // The "a", "b", "c" messages were send before the blackhole. + // This means they got a proper send-and-request cycle + value.get() shouldBe 0 + } + } + } } enterBarrier("measure-shard-home-latency") } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala index 14e1d8120c5..42fbe340cd8 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala @@ -7,6 +7,7 @@ package akka.cluster.sharding import java.util.concurrent.atomic.AtomicInteger import scala.annotation.nowarn +import scala.collection.concurrent.TrieMap import akka.actor.ActorRef import akka.actor.Address @@ -19,8 +20,10 @@ class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") syst val shardRegionBufferSizeCounter = new AtomicInteger(0) val beginShardHandoffDurationCounter = new AtomicInteger(0) val finishShardHandoffDurationCounter = new AtomicInteger(0) - val shardHomeRequests = new AtomicInteger(0) - val shardHomeResponses = new AtomicInteger(0) + + val shardHomeRequests = new TrieMap[String, AtomicInteger]() + + val dropMessageCounter = new AtomicInteger(0) override def shardRegionBufferSize( selfAddress: Address, @@ -54,14 +57,21 @@ class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") syst selfAddress: Address, shardRegionActor: ActorRef, typeName: String, - shardId: String): Unit = - shardHomeRequests.incrementAndGet() + shardId: String): Unit = shardHomeRequests.getOrElseUpdate(shardId, new AtomicInteger(0)).incrementAndGet() override def receivedShardHome( selfAddress: Address, shardRegionActor: ActorRef, typeName: String, - shardId: String): Unit = shardHomeResponses.incrementAndGet() + shardId: String): Unit = + shardHomeRequests.get(shardId) match { + case Some(value) => + value.decrementAndGet() + case None => () // should not happen + } + + override def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit = + dropMessageCounter.incrementAndGet() override def dependencies: Seq[String] = Nil }