Skip to content

Commit f44207d

Browse files
add self ref
1 parent fbd72e1 commit f44207d

File tree

3 files changed

+43
-17
lines changed

3 files changed

+43
-17
lines changed

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ private[akka] class ShardRegion(
690690
timers.startTimerWithFixedDelay(Retry, Retry, retryInterval)
691691
startRegistration()
692692
logPassivationStrategy()
693-
instrumentation.shardRegionBufferSize(cluster.selfAddress, typeName, 0)
693+
instrumentation.shardRegionBufferSize(cluster.selfAddress, self, typeName, 0)
694694
}
695695

696696
override def postStop(): Unit = {
@@ -699,7 +699,7 @@ private[akka] class ShardRegion(
699699
coordinator.foreach(_ ! RegionStopped(context.self))
700700
cluster.unsubscribe(self)
701701
gracefulShutdownProgress.trySuccess(Done)
702-
instrumentation.shardRegionBufferSize(cluster.selfAddress, typeName, 0)
702+
instrumentation.shardRegionBufferSize(cluster.selfAddress, self, typeName, 0)
703703
}
704704

705705
private def logPassivationStrategy(): Unit = {
@@ -954,7 +954,7 @@ private[akka] class ShardRegion(
954954
dropped,
955955
shard)
956956
// better to decrease by "dropped" to avoid calculating the size?
957-
instrumentation.shardRegionBufferSize(cluster.selfAddress, typeName, shardBuffers.totalSize)
957+
instrumentation.shardRegionBufferSize(cluster.selfAddress, self, typeName, shardBuffers.totalSize)
958958
}
959959
loggedFullBufferWarning = false
960960
}
@@ -1298,7 +1298,7 @@ private[akka] class ShardRegion(
12981298
context.system.deadLetters ! msg
12991299
} else {
13001300
shardBuffers.append(shardId, msg, snd)
1301-
instrumentation.incrementShardRegionBufferSize(cluster.selfAddress, typeName)
1301+
instrumentation.incrementShardRegionBufferSize(cluster.selfAddress, self, typeName)
13021302
// log some insight to how buffers are filled up every 10% of the buffer capacity
13031303
val tot = totBufSize + 1
13041304
if (tot % (bufferSize / 10) == 0) {
@@ -1331,7 +1331,7 @@ private[akka] class ShardRegion(
13311331
}
13321332

13331333
shardBuffers.remove(shardId)
1334-
instrumentation.shardRegionBufferSize(cluster.selfAddress, typeName, shardBuffers.totalSize)
1334+
instrumentation.shardRegionBufferSize(cluster.selfAddress, self, typeName, shardBuffers.totalSize)
13351335
}
13361336
loggedFullBufferWarning = false
13371337
retryCount = 0
@@ -1374,7 +1374,7 @@ private[akka] class ShardRegion(
13741374
shardId,
13751375
buf.size + 1)
13761376
shardBuffers.append(shardId, msg, snd)
1377-
instrumentation.incrementShardRegionBufferSize(cluster.selfAddress, typeName)
1377+
instrumentation.incrementShardRegionBufferSize(cluster.selfAddress, self, typeName)
13781378
}
13791379

13801380
case _ =>

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import akka.annotation.InternalStableApi
99
import scala.collection.immutable
1010
import scala.jdk.CollectionConverters._
1111

12+
import akka.actor.ActorRef
1213
import akka.actor.ActorSystem
1314
import akka.actor.Address
1415
import akka.actor.ClassicActorSystemProvider
@@ -80,11 +81,18 @@ class ClusterShardingInstrumentationProvider(system: ExtendedActorSystem) extend
8081
class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterShardingInstrumentation])
8182
extends ClusterShardingInstrumentation {
8283

83-
override def shardRegionBufferSize(selfAddress: Address, typeName: String, size: Int): Unit =
84-
instrumentations.foreach(_.shardRegionBufferSize(selfAddress, typeName, size))
84+
override def shardRegionBufferSize(
85+
selfAddress: Address,
86+
shardRegionActor: ActorRef,
87+
typeName: String,
88+
size: Int): Unit =
89+
instrumentations.foreach(_.shardRegionBufferSize(selfAddress, shardRegionActor, typeName, size))
8590

86-
override def incrementShardRegionBufferSize(selfAddress: Address, typeName: String): Unit =
87-
instrumentations.foreach(_.incrementShardRegionBufferSize(selfAddress, typeName))
91+
override def incrementShardRegionBufferSize(
92+
selfAddress: Address,
93+
shardRegionActor: ActorRef,
94+
typeName: String): Unit =
95+
instrumentations.foreach(_.incrementShardRegionBufferSize(selfAddress, shardRegionActor, typeName))
8896

8997
override def dependencies: immutable.Seq[String] =
9098
instrumentations.flatMap(_.dependencies)
@@ -102,9 +110,16 @@ object EmptyClusterShardingInstrumentation extends EmptyClusterShardingInstrumen
102110
@InternalStableApi
103111
class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation {
104112

105-
override def shardRegionBufferSize(selfAddress: Address, typeName: String, size: Int): Unit = ()
113+
override def shardRegionBufferSize(
114+
selfAddress: Address,
115+
shardRegionActor: ActorRef,
116+
typeName: String,
117+
size: Int): Unit = ()
106118

107-
override def incrementShardRegionBufferSize(selfAddress: Address, typeName: String): Unit = ()
119+
override def incrementShardRegionBufferSize(
120+
selfAddress: Address,
121+
shardRegionActor: ActorRef,
122+
typeName: String): Unit = ()
108123

109124
override def dependencies: immutable.Seq[String] = Nil
110125
}
@@ -118,12 +133,12 @@ trait ClusterShardingInstrumentation {
118133
/**
119134
* @param size set current size of the buffer.
120135
*/
121-
def shardRegionBufferSize(selfAddress: Address, typeName: String, size: Int): Unit
136+
def shardRegionBufferSize(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, size: Int): Unit
122137

123138
/**
124139
* Increase the current size of the buffer by one.
125140
*/
126-
def incrementShardRegionBufferSize(selfAddress: Address, typeName: String): Unit
141+
def incrementShardRegionBufferSize(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit
127142

128143
/**
129144
* Optional dependencies for this instrumentation.

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@
55
package akka.cluster.sharding
66

77
import java.util.concurrent.atomic.AtomicInteger
8+
89
import scala.annotation.nowarn
10+
911
import org.scalatest.concurrent.ScalaFutures
1012
import org.scalatest.time.{ Seconds, Span }
11-
1213
import scala.concurrent.duration._
14+
1315
import org.scalatest.concurrent.Eventually.eventually
16+
17+
import akka.actor.ActorRef
1418
import akka.actor.ExtendedActorSystem
1519
import akka.actor.{ Actor, ActorLogging, Address, Props }
1620
import akka.cluster.Cluster
@@ -48,11 +52,18 @@ class SpecClusterShardingTelemetry(@nowarn("msg=never used") system: ExtendedAct
4852

4953
val counter = new AtomicInteger(0)
5054

51-
override def shardRegionBufferSize(selfAddress: Address, typeName: String, size: Int): Unit = {
55+
override def shardRegionBufferSize(
56+
selfAddress: Address,
57+
shardRegionActor: ActorRef,
58+
typeName: String,
59+
size: Int): Unit = {
5260
counter.set(size)
5361
}
5462

55-
override def incrementShardRegionBufferSize(selfAddress: Address, typeName: String): Unit = {
63+
override def incrementShardRegionBufferSize(
64+
selfAddress: Address,
65+
shardRegionActor: ActorRef,
66+
typeName: String): Unit = {
5667
counter.incrementAndGet()
5768
}
5869

0 commit comments

Comments
 (0)