From b0e4b347e64c9802ce2a0e9f47d5d17d0a2ff0e7 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Tue, 17 Feb 2026 07:51:16 +0100 Subject: [PATCH 1/4] feat: metric for dropped messages in Shard Region buffer --- .../scala/akka/cluster/sharding/ShardRegion.scala | 3 ++- .../internal/ClusterShardingInstrumentation.scala | 14 ++++++++++++++ .../ClusterShardingInstrumentationSpec.scala | 12 ++++++++---- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index aed929f79e9..3e3a1aa8774 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -1297,6 +1297,7 @@ private[akka] class ShardRegion( loggedFullBufferWarning = true } context.system.deadLetters ! msg + instrumentation.messageDropped(cluster.selfAddress, self, typeName) } else { shardBuffers.append(shardId, msg, snd) instrumentation.shardRegionBufferSizeIncremented(cluster.selfAddress, self, typeName) @@ -1456,7 +1457,7 @@ private[akka] class ShardRegion( } private def requestShardHome(shard: ShardId): Unit = { - instrumentation.regionRequestedShardHome(cluster.selfAddress, self, typeName, shard) + instrumentation.requestedShardHome(cluster.selfAddress, self, typeName, shard) coordinator.foreach(_ ! GetShardHome(shard)) } } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala index 53881871779..1fdee43631b 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala @@ -101,6 +101,9 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding shardId: String): Unit = instrumentations.foreach(_.regionRequestedShardHome(selfAddress, shardRegionActor, typeName, shardId)) + override def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit = + instrumentations.foreach(_.messageDropped(selfAddress, self, typeName)) + override def receivedShardHome( selfAddress: Address, shardRegionActor: ActorRef, @@ -108,6 +111,10 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding shardId: String): Unit = instrumentations.foreach(_.receivedShardHome(selfAddress, shardRegionActor, typeName, shardId)) + override def dependencies: immutable.Seq[String] = + instrumentations.flatMap(_.dependencies) + + override def shardHandoffStarted( selfAddress: Address, shardCoordinatorActor: ActorRef, @@ -163,6 +170,8 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation typeName: String, shardId: String): Unit = () + override def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit = () + override def shardHandoffStarted( selfAddress: Address, shardRegionActor: ActorRef, @@ -197,6 +206,11 @@ trait ClusterShardingInstrumentation { def receivedShardHome(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, shardId: String): Unit + /** + * Drop a message send to a Shard Region if the buffer is full. + */ + def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit + def shardHandoffStarted(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String, shard: String): Unit def shardHandoffFinished( diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala index ffe75742bf1..9403991dadb 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala @@ -7,14 +7,13 @@ package akka.cluster.sharding import scala.concurrent.duration._ import org.scalatest.concurrent.ScalaFutures -import org.scalatest.time.{ Seconds, Span } - +import org.scalatest.time.{Seconds, Span} import org.scalatest.concurrent.Eventually.eventually -import akka.actor.{ Actor, ActorLogging, Address, Props } +import akka.actor.{Actor, ActorLogging, Address, Props} import akka.cluster.Cluster import akka.cluster.MemberStatus -import akka.cluster.sharding.ClusterShardingInstrumentationSpec.GiveMeYourHome.{ Get, Home } +import akka.cluster.sharding.ClusterShardingInstrumentationSpec.GiveMeYourHome.{Get, Home} import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider import akka.remote.testkit.Direction import akka.testkit.TestProbe @@ -86,6 +85,10 @@ abstract class ClusterShardingInstrumentationSpec .asInstanceOf[ClusterShardingInstrumentationSpecTelemetry] .shardRegionBufferSizeCounter + private val dropMessageCounter = ClusterShardingInstrumentationProvider(system).instrumentation + .asInstanceOf[ClusterShardingInstrumentationSpecTelemetry] + .dropMessageCounter + val shardHomeRequests = ClusterShardingInstrumentationProvider(system).instrumentation .asInstanceOf[ClusterShardingInstrumentationSpecTelemetry] @@ -173,6 +176,7 @@ abstract class ClusterShardingInstrumentationSpec // we have 100 in the buffer, and our cap is 120 (per config in this test) // 10 are dropped. Should we have a metric on this? Or custom events? shardRegionBufferSizeCounter.get() shouldBe 120 + dropMessageCounter.get() shouldBe 10 } } enterBarrier("buffer-overflow") From 5cde605d6b1ca8ca6660f3c21ef5c6b10d5b879c Mon Sep 17 00:00:00 2001 From: Sebastian Date: Tue, 17 Feb 2026 09:14:36 +0100 Subject: [PATCH 2/4] rebasing --- .../src/main/scala/akka/cluster/sharding/ShardRegion.scala | 2 +- .../sharding/internal/ClusterShardingInstrumentation.scala | 4 ---- .../sharding/ClusterShardingInstrumentationSpec.scala | 6 +++--- .../ClusterShardingInstrumentationSpecTelemetry.scala | 4 ++++ 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 3e3a1aa8774..73f715fd7e3 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -1457,7 +1457,7 @@ private[akka] class ShardRegion( } private def requestShardHome(shard: ShardId): Unit = { - instrumentation.requestedShardHome(cluster.selfAddress, self, typeName, shard) + instrumentation.regionRequestedShardHome(cluster.selfAddress, self, typeName, shard) coordinator.foreach(_ ! GetShardHome(shard)) } } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala index 1fdee43631b..99ab20515a0 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala @@ -111,10 +111,6 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding shardId: String): Unit = instrumentations.foreach(_.receivedShardHome(selfAddress, shardRegionActor, typeName, shardId)) - override def dependencies: immutable.Seq[String] = - instrumentations.flatMap(_.dependencies) - - override def shardHandoffStarted( selfAddress: Address, shardCoordinatorActor: ActorRef, diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala index 9403991dadb..277b1fd7364 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala @@ -7,13 +7,13 @@ package akka.cluster.sharding import scala.concurrent.duration._ import org.scalatest.concurrent.ScalaFutures -import org.scalatest.time.{Seconds, Span} +import org.scalatest.time.{ Seconds, Span } import org.scalatest.concurrent.Eventually.eventually -import akka.actor.{Actor, ActorLogging, Address, Props} +import akka.actor.{ Actor, ActorLogging, Address, Props } import akka.cluster.Cluster import akka.cluster.MemberStatus -import akka.cluster.sharding.ClusterShardingInstrumentationSpec.GiveMeYourHome.{Get, Home} +import akka.cluster.sharding.ClusterShardingInstrumentationSpec.GiveMeYourHome.{ Get, Home } import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider import akka.remote.testkit.Direction import akka.testkit.TestProbe diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala index 14e1d8120c5..67770404fea 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala @@ -21,6 +21,7 @@ class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") syst val finishShardHandoffDurationCounter = new AtomicInteger(0) val shardHomeRequests = new AtomicInteger(0) val shardHomeResponses = new AtomicInteger(0) + val dropMessageCounter = new AtomicInteger(0) override def shardRegionBufferSize( selfAddress: Address, @@ -63,5 +64,8 @@ class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") syst typeName: String, shardId: String): Unit = shardHomeResponses.incrementAndGet() + override def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit = + dropMessageCounter.incrementAndGet() + override def dependencies: Seq[String] = Nil } From a1e124c5ee6d1b9c2c5fc3bf8ea4098e5e114327 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Tue, 17 Feb 2026 10:17:32 +0100 Subject: [PATCH 3/4] fix spec --- .../ClusterShardingInstrumentationSpec.scala | 21 +++++++++++++------ ...ShardingInstrumentationSpecTelemetry.scala | 16 +++++++++----- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala index 277b1fd7364..2bf6e389e2f 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala @@ -94,11 +94,6 @@ abstract class ClusterShardingInstrumentationSpec .asInstanceOf[ClusterShardingInstrumentationSpecTelemetry] .shardHomeRequests - val shardHomeResponses = - ClusterShardingInstrumentationProvider(system).instrumentation - .asInstanceOf[ClusterShardingInstrumentationSpecTelemetry] - .shardHomeResponses - override implicit val patienceConfig: PatienceConfig = { import akka.testkit.TestDuration PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated, Span(1000, org.scalatest.time.Millis)) @@ -191,7 +186,21 @@ abstract class ClusterShardingInstrumentationSpec "record latency of requesting ShardHome" in { runOn(second) { - shardHomeRequests.get() shouldBe shardHomeResponses.get() + eventually(timeout(Span(5, Seconds))) { + shardHomeRequests.size > 0 shouldBe true + shardHomeRequests.foreach { + case (key, value) => + if (key.startsWith("id")) { + // The "id-0", "id-1" ... messages were send during the blackhole. + // This means they were requested twice, but only received back once. + value.get() shouldBe 1 + } else { + // The "a", "b", "c" messages were send before the blackhole. + // This means they got a proper send-and-request cycle + value.get() shouldBe 0 + } + } + } } enterBarrier("measure-shard-home-latency") } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala index 67770404fea..42fbe340cd8 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala @@ -7,6 +7,7 @@ package akka.cluster.sharding import java.util.concurrent.atomic.AtomicInteger import scala.annotation.nowarn +import scala.collection.concurrent.TrieMap import akka.actor.ActorRef import akka.actor.Address @@ -19,8 +20,9 @@ class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") syst val shardRegionBufferSizeCounter = new AtomicInteger(0) val beginShardHandoffDurationCounter = new AtomicInteger(0) val finishShardHandoffDurationCounter = new AtomicInteger(0) - val shardHomeRequests = new AtomicInteger(0) - val shardHomeResponses = new AtomicInteger(0) + + val shardHomeRequests = new TrieMap[String, AtomicInteger]() + val dropMessageCounter = new AtomicInteger(0) override def shardRegionBufferSize( @@ -55,14 +57,18 @@ class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") syst selfAddress: Address, shardRegionActor: ActorRef, typeName: String, - shardId: String): Unit = - shardHomeRequests.incrementAndGet() + shardId: String): Unit = shardHomeRequests.getOrElseUpdate(shardId, new AtomicInteger(0)).incrementAndGet() override def receivedShardHome( selfAddress: Address, shardRegionActor: ActorRef, typeName: String, - shardId: String): Unit = shardHomeResponses.incrementAndGet() + shardId: String): Unit = + shardHomeRequests.get(shardId) match { + case Some(value) => + value.decrementAndGet() + case None => () // should not happen + } override def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit = dropMessageCounter.incrementAndGet() From f255373fdb404e279f11c0ec34a955af9e75b62f Mon Sep 17 00:00:00 2001 From: Sebastian Date: Tue, 17 Feb 2026 10:34:24 +0100 Subject: [PATCH 4/4] remove scaladoc --- .../sharding/internal/ClusterShardingInstrumentation.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala index 99ab20515a0..690c79e61f0 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala @@ -202,9 +202,6 @@ trait ClusterShardingInstrumentation { def receivedShardHome(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, shardId: String): Unit - /** - * Drop a message send to a Shard Region if the buffer is full. - */ def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit def shardHandoffStarted(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String, shard: String): Unit