Skip to content

Commit beda092

Browse files
rename spec, add local address
1 parent 4bbcb57 commit beda092

File tree

3 files changed

+28
-29
lines changed

3 files changed

+28
-29
lines changed

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ private[akka] class ShardRegion(
690690
timers.startTimerWithFixedDelay(Retry, Retry, retryInterval)
691691
startRegistration()
692692
logPassivationStrategy()
693-
instrumentation.shardRegionBufferSize(typeName, 0)
693+
instrumentation.shardRegionBufferSize(cluster.selfAddress, typeName, 0)
694694
}
695695

696696
override def postStop(): Unit = {
@@ -699,7 +699,7 @@ private[akka] class ShardRegion(
699699
coordinator.foreach(_ ! RegionStopped(context.self))
700700
cluster.unsubscribe(self)
701701
gracefulShutdownProgress.trySuccess(Done)
702-
instrumentation.shardRegionBufferSize(typeName, 0)
702+
instrumentation.shardRegionBufferSize(cluster.selfAddress, typeName, 0)
703703
}
704704

705705
private def logPassivationStrategy(): Unit = {
@@ -754,7 +754,7 @@ private[akka] class ShardRegion(
754754
coordinator.map { coordRef =>
755755
val a = coordRef.path.address
756756

757-
if (a.hasLocalScope) cluster.selfMember.address
757+
if (a.hasLocalScope) cluster.selfAddress
758758
else a
759759
}
760760

@@ -954,7 +954,7 @@ private[akka] class ShardRegion(
954954
dropped,
955955
shard)
956956
// better to decrease by "dropped" to avoid calculating the size?
957-
instrumentation.shardRegionBufferSize(typeName, shardBuffers.totalSize)
957+
instrumentation.shardRegionBufferSize(cluster.selfAddress, typeName, shardBuffers.totalSize)
958958
}
959959
loggedFullBufferWarning = false
960960
}
@@ -1298,7 +1298,7 @@ private[akka] class ShardRegion(
12981298
context.system.deadLetters ! msg
12991299
} else {
13001300
shardBuffers.append(shardId, msg, snd)
1301-
instrumentation.incrementShardRegionBufferSize(typeName)
1301+
instrumentation.incrementShardRegionBufferSize(cluster.selfAddress, typeName)
13021302
// log some insight to how buffers are filled up every 10% of the buffer capacity
13031303
val tot = totBufSize + 1
13041304
if (tot % (bufferSize / 10) == 0) {
@@ -1331,7 +1331,7 @@ private[akka] class ShardRegion(
13311331
}
13321332

13331333
shardBuffers.remove(shardId)
1334-
instrumentation.shardRegionBufferSize(typeName, shardBuffers.totalSize)
1334+
instrumentation.shardRegionBufferSize(cluster.selfAddress, typeName, shardBuffers.totalSize)
13351335
}
13361336
loggedFullBufferWarning = false
13371337
retryCount = 0
@@ -1374,7 +1374,7 @@ private[akka] class ShardRegion(
13741374
shardId,
13751375
buf.size + 1)
13761376
shardBuffers.append(shardId, msg, snd)
1377-
instrumentation.incrementShardRegionBufferSize(typeName)
1377+
instrumentation.incrementShardRegionBufferSize(cluster.selfAddress, typeName)
13781378
}
13791379

13801380
case _ =>

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,16 @@ package akka.cluster.sharding.internal
66

77
import akka.actor.ExtendedActorSystem
88
import akka.annotation.InternalStableApi
9-
109
import scala.collection.immutable
1110
import scala.jdk.CollectionConverters._
1211

1312
import akka.actor.ActorSystem
13+
import akka.actor.Address
1414
import akka.actor.ClassicActorSystemProvider
1515
import akka.actor.Extension
1616
import akka.actor.ExtensionId
1717
import akka.actor.ExtensionIdProvider
1818
import akka.event.Logging
19-
2019
import akka.util.TopologicalSort.topologicalSort
2120

2221
/**
@@ -81,11 +80,11 @@ class ClusterShardingInstrumentationProvider(system: ExtendedActorSystem) extend
8180
class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterShardingInstrumentation])
8281
extends ClusterShardingInstrumentation {
8382

84-
override def shardRegionBufferSize(typeName: String, size: Int): Unit =
85-
instrumentations.foreach(_.shardRegionBufferSize(typeName, size))
83+
override def shardRegionBufferSize(selfAddress: Address, typeName: String, size: Int): Unit =
84+
instrumentations.foreach(_.shardRegionBufferSize(selfAddress, typeName, size))
8685

87-
override def incrementShardRegionBufferSize(typeName: String): Unit =
88-
instrumentations.foreach(_.incrementShardRegionBufferSize(typeName))
86+
override def incrementShardRegionBufferSize(selfAddress: Address, typeName: String): Unit =
87+
instrumentations.foreach(_.incrementShardRegionBufferSize(selfAddress, typeName))
8988

9089
override def dependencies: immutable.Seq[String] =
9190
instrumentations.flatMap(_.dependencies)
@@ -103,9 +102,9 @@ object EmptyClusterShardingInstrumentation extends EmptyClusterShardingInstrumen
103102
@InternalStableApi
104103
class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation {
105104

106-
override def shardRegionBufferSize(typeName: String, size: Int): Unit = ()
105+
override def shardRegionBufferSize(selfAddress: Address, typeName: String, size: Int): Unit = ()
107106

108-
override def incrementShardRegionBufferSize(typeName: String): Unit = ()
107+
override def incrementShardRegionBufferSize(selfAddress: Address, typeName: String): Unit = ()
109108

110109
override def dependencies: immutable.Seq[String] = Nil
111110
}
@@ -119,12 +118,12 @@ trait ClusterShardingInstrumentation {
119118
/**
120119
* @param size set current size of the buffer.
121120
*/
122-
def shardRegionBufferSize(typeName: String, size: Int): Unit
121+
def shardRegionBufferSize(selfAddress: Address, typeName: String, size: Int): Unit
123122

124123
/**
125124
* Increase the current size of the buffer by one.
126125
*/
127-
def incrementShardRegionBufferSize(typeName: String): Unit
126+
def incrementShardRegionBufferSize(selfAddress: Address, typeName: String): Unit
128127

129128
/**
130129
* Optional dependencies for this instrumentation.

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ import akka.actor.ExtendedActorSystem
1515
import akka.actor.{ Actor, ActorLogging, Address, Props }
1616
import akka.cluster.Cluster
1717
import akka.cluster.MemberStatus
18-
import akka.cluster.sharding.ClusterShardInstrumentatioSpec.GiveMeYourHome.{ Get, Home }
18+
import akka.cluster.sharding.ClusterShardingInstrumentationSpec.GiveMeYourHome.{ Get, Home }
1919
import akka.cluster.sharding.internal.{ ClusterShardingInstrumentation, ClusterShardingInstrumentationProvider }
2020
import akka.remote.testkit.Direction
2121
import akka.testkit.TestProbe
2222
import akka.serialization.jackson.CborSerializable
2323
import akka.testkit.ImplicitSender
2424

25-
object ClusterShardInstrumentatioSpecConfig
25+
object ClusterShardingInstrumentationSpecConfig
2626
extends MultiNodeClusterShardingConfig(
2727
//loglevel = "DEBUG",
2828
additionalConfig = """
@@ -39,27 +39,27 @@ object ClusterShardInstrumentatioSpecConfig
3939

4040
}
4141

42-
class ClusterShardInstrumentatioSpecMultiJvmNode1 extends ClusterShardInstrumentatioSpec
42+
class ClusterShardingInstrumentationSpecMultiJvmNode1 extends ClusterShardingInstrumentationSpec
4343

44-
class ClusterShardInstrumentatioSpecMultiJvmNode2 extends ClusterShardInstrumentatioSpec
44+
class ClusterShardingInstrumentationSpecMultiJvmNode2 extends ClusterShardingInstrumentationSpec
4545

4646
class SpecClusterShardingTelemetry(@nowarn("msg=never used") system: ExtendedActorSystem)
4747
extends ClusterShardingInstrumentation {
4848

4949
val counter = new AtomicInteger(0)
5050

51-
override def shardRegionBufferSize(typeName: String, size: Int): Unit = {
51+
override def shardRegionBufferSize(selfAddress: Address, typeName: String, size: Int): Unit = {
5252
counter.set(size)
5353
}
5454

55-
override def incrementShardRegionBufferSize(typeName: String): Unit = {
55+
override def incrementShardRegionBufferSize(selfAddress: Address, typeName: String): Unit = {
5656
counter.incrementAndGet()
5757
}
5858

5959
override def dependencies: Seq[String] = Nil
6060
}
6161

62-
object ClusterShardInstrumentatioSpec {
62+
object ClusterShardingInstrumentationSpec {
6363

6464
object GiveMeYourHome {
6565
case class Get(id: String) extends CborSerializable
@@ -90,14 +90,14 @@ object ClusterShardInstrumentatioSpec {
9090
}
9191
}
9292

93-
abstract class ClusterShardInstrumentatioSpec
94-
extends MultiNodeClusterShardingSpec(ClusterShardInstrumentatioSpecConfig)
93+
abstract class ClusterShardingInstrumentationSpec
94+
extends MultiNodeClusterShardingSpec(ClusterShardingInstrumentationSpecConfig)
9595
with ImplicitSender
9696
with ScalaFutures {
9797

98-
import ClusterShardInstrumentatioSpec._
99-
import ClusterShardInstrumentatioSpec.GiveMeYourHome._
100-
import ClusterShardInstrumentatioSpecConfig._
98+
import ClusterShardingInstrumentationSpec._
99+
import ClusterShardingInstrumentationSpec.GiveMeYourHome._
100+
import ClusterShardingInstrumentationSpecConfig._
101101

102102
private val counter = ClusterShardingInstrumentationProvider(system).instrumentation
103103
.asInstanceOf[SpecClusterShardingTelemetry]

0 commit comments

Comments
 (0)