Skip to content

Commit 4e144a0

Browse files
feat: metric for dropped messages in Shard Region buffer
1 parent f44207d commit 4e144a0

File tree

3 files changed

+20
-1
lines changed

3 files changed

+20
-1
lines changed

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,7 @@ private[akka] class ShardRegion(
12961296
loggedFullBufferWarning = true
12971297
}
12981298
context.system.deadLetters ! msg
1299+
instrumentation.dropMessagesShardRegion(cluster.selfAddress, self, typeName)
12991300
} else {
13001301
shardBuffers.append(shardId, msg, snd)
13011302
instrumentation.incrementShardRegionBufferSize(cluster.selfAddress, self, typeName)

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding
9494
typeName: String): Unit =
9595
instrumentations.foreach(_.incrementShardRegionBufferSize(selfAddress, shardRegionActor, typeName))
9696

97+
override def dropMessagesShardRegion(selfAddress: Address, self: ActorRef, typeName: String): Unit =
98+
instrumentations.foreach(_.dropMessagesShardRegion(selfAddress, self, typeName))
99+
97100
override def dependencies: immutable.Seq[String] =
98101
instrumentations.flatMap(_.dependencies)
99102
}
@@ -121,6 +124,8 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation
121124
shardRegionActor: ActorRef,
122125
typeName: String): Unit = ()
123126

127+
override def dropMessagesShardRegion(selfAddress: Address, self: ActorRef, typeName: String): Unit = ()
128+
124129
override def dependencies: immutable.Seq[String] = Nil
125130
}
126131

@@ -140,6 +145,11 @@ trait ClusterShardingInstrumentation {
140145
*/
141146
def incrementShardRegionBufferSize(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit
142147

148+
/**
149+
* Drop a mesage send to a Shard Region as the buffer is full
150+
*/
151+
def dropMessagesShardRegion(selfAddress: Address, self: ActorRef, typeName: String): Unit
152+
143153
/**
144154
* Optional dependencies for this instrumentation.
145155
*

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class SpecClusterShardingTelemetry(@nowarn("msg=never used") system: ExtendedAct
5151
extends ClusterShardingInstrumentation {
5252

5353
val counter = new AtomicInteger(0)
54+
val dropMessageCounter = new AtomicInteger(0)
5455

5556
override def shardRegionBufferSize(
5657
selfAddress: Address,
@@ -67,6 +68,9 @@ class SpecClusterShardingTelemetry(@nowarn("msg=never used") system: ExtendedAct
6768
counter.incrementAndGet()
6869
}
6970

71+
override def dropMessagesShardRegion(selfAddress: Address, self: ActorRef, typeName: String): Unit =
72+
dropMessageCounter.incrementAndGet()
73+
7074
override def dependencies: Seq[String] = Nil
7175
}
7276

@@ -113,6 +117,9 @@ abstract class ClusterShardingInstrumentationSpec
113117
private val counter = ClusterShardingInstrumentationProvider(system).instrumentation
114118
.asInstanceOf[SpecClusterShardingTelemetry]
115119
.counter
120+
private val dropMessageCounter = ClusterShardingInstrumentationProvider(system).instrumentation
121+
.asInstanceOf[SpecClusterShardingTelemetry]
122+
.dropMessageCounter
116123

117124
override implicit val patienceConfig: PatienceConfig = {
118125
import akka.testkit.TestDuration
@@ -189,8 +196,9 @@ abstract class ClusterShardingInstrumentationSpec
189196
}
190197
eventually {
191198
// we have 100 in the buffer, and our cap is 120 (per config in this test)
192-
// 10 are dropped. Should we have a metric on this? Or custom events?
199+
// 10 are dropped.
193200
counter.get() shouldBe 120
201+
dropMessageCounter.get() shouldBe 10
194202
}
195203
}
196204
enterBarrier("buffer-overflow")

0 commit comments

Comments
 (0)