Skip to content

Commit 9c2e143

Browse files
feat: instrumenting GetShardHome requests (#32885)
1 parent 0fbb370 commit 9c2e143

File tree

3 files changed

+76
-4
lines changed

3 files changed

+76
-4
lines changed

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,7 @@ private[akka] class ShardRegion(
907907
}
908908

909909
case ShardHome(shard, shardRegionRef) =>
910+
instrumentation.receivedShardHome(cluster.selfAddress, self, typeName, shard)
910911
receiveShardHome(shard, shardRegionRef)
911912

912913
case ShardHomes(homes) =>
@@ -1266,7 +1267,7 @@ private[akka] class ShardRegion(
12661267
shard,
12671268
coord,
12681269
buf.size)
1269-
coord ! GetShardHome(shard)
1270+
requestShardHome(shard)
12701271
}
12711272

12721273
if (retryCount >= 5 && retryCount % 5 == 0 && log.isWarningEnabled) {
@@ -1365,7 +1366,7 @@ private[akka] class ShardRegion(
13651366
case None =>
13661367
if (!shardBuffers.contains(shardId)) {
13671368
log.debug("{}: Request shard [{}] home. Coordinator [{}]", typeName, shardId, coordinator)
1368-
coordinator.foreach(_ ! GetShardHome(shardId))
1369+
requestShardHome(shardId)
13691370
}
13701371
val buf = shardBuffers.getOrEmpty(shardId)
13711372
log.debug(
@@ -1400,7 +1401,7 @@ private[akka] class ShardRegion(
14001401
case None =>
14011402
if (!shardBuffers.contains(shardId)) {
14021403
log.debug("{}: Request shard [{}] home. Coordinator [{}]", typeName, shardId, coordinator)
1403-
coordinator.foreach(_ ! GetShardHome(shardId))
1404+
requestShardHome(shardId)
14041405
}
14051406
bufferMessage(shardId, msg, snd)
14061407
}
@@ -1453,4 +1454,9 @@ private[akka] class ShardRegion(
14531454
actorSelections.foreach(_ ! GracefulShutdownReq(self))
14541455
}
14551456
}
1457+
1458+
private def requestShardHome(shard: ShardId): Unit = {
1459+
instrumentation.regionRequestedShardHome(cluster.selfAddress, self, typeName, shard)
1460+
coordinator.foreach(_ ! GetShardHome(shard))
1461+
}
14561462
}

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,20 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding
9696

9797
override def dependencies: immutable.Seq[String] =
9898
instrumentations.flatMap(_.dependencies)
99+
100+
override def regionRequestedShardHome(
101+
selfAddress: Address,
102+
shardRegionActor: ActorRef,
103+
typeName: String,
104+
shardId: String): Unit =
105+
instrumentations.foreach(_.regionRequestedShardHome(selfAddress, shardRegionActor, typeName, shardId))
106+
107+
override def receivedShardHome(
108+
selfAddress: Address,
109+
shardRegionActor: ActorRef,
110+
typeName: String,
111+
shardId: String): Unit =
112+
instrumentations.foreach(_.receivedShardHome(selfAddress, shardRegionActor, typeName, shardId))
99113
}
100114

101115
/**
@@ -122,6 +136,18 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation
122136
typeName: String): Unit = ()
123137

124138
override def dependencies: immutable.Seq[String] = Nil
139+
140+
override def regionRequestedShardHome(
141+
selfAddress: Address,
142+
shardRegionActor: ActorRef,
143+
typeName: String,
144+
shardId: String): Unit = ()
145+
146+
override def receivedShardHome(
147+
selfAddress: Address,
148+
shardRegionActor: ActorRef,
149+
typeName: String,
150+
shardId: String): Unit = ()
125151
}
126152

127153
/**
@@ -140,6 +166,14 @@ trait ClusterShardingInstrumentation {
140166
*/
141167
def incrementShardRegionBufferSize(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit
142168

169+
def regionRequestedShardHome(
170+
selfAddress: Address,
171+
shardRegionActor: ActorRef,
172+
typeName: String,
173+
shardId: String): Unit
174+
175+
def receivedShardHome(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, shardId: String): Unit
176+
143177
/**
144178
* Optional dependencies for this instrumentation.
145179
*

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ package akka.cluster.sharding
77
import java.util.concurrent.atomic.AtomicInteger
88

99
import scala.annotation.nowarn
10+
import scala.concurrent.duration._
1011

1112
import org.scalatest.concurrent.ScalaFutures
1213
import org.scalatest.time.{ Seconds, Span }
13-
import scala.concurrent.duration._
1414

1515
import org.scalatest.concurrent.Eventually.eventually
1616

@@ -51,6 +51,8 @@ class SpecClusterShardingTelemetry(@nowarn("msg=never used") system: ExtendedAct
5151
extends ClusterShardingInstrumentation {
5252

5353
val counter = new AtomicInteger(0)
54+
val shardHomeRequests = new AtomicInteger(0)
55+
val shardHomeResponses = new AtomicInteger(0)
5456

5557
override def shardRegionBufferSize(
5658
selfAddress: Address,
@@ -68,6 +70,19 @@ class SpecClusterShardingTelemetry(@nowarn("msg=never used") system: ExtendedAct
6870
}
6971

7072
override def dependencies: Seq[String] = Nil
73+
74+
override def regionRequestedShardHome(
75+
selfAddress: Address,
76+
shardRegionActor: ActorRef,
77+
typeName: String,
78+
shardId: String): Unit =
79+
shardHomeRequests.incrementAndGet()
80+
81+
override def receivedShardHome(
82+
selfAddress: Address,
83+
shardRegionActor: ActorRef,
84+
typeName: String,
85+
shardId: String): Unit = shardHomeResponses.incrementAndGet()
7186
}
7287

7388
object ClusterShardingInstrumentationSpec {
@@ -114,6 +129,16 @@ abstract class ClusterShardingInstrumentationSpec
114129
.asInstanceOf[SpecClusterShardingTelemetry]
115130
.counter
116131

132+
val shardHomeRequests =
133+
ClusterShardingInstrumentationProvider(system).instrumentation
134+
.asInstanceOf[SpecClusterShardingTelemetry]
135+
.shardHomeRequests
136+
137+
val shardHomeResponses =
138+
ClusterShardingInstrumentationProvider(system).instrumentation
139+
.asInstanceOf[SpecClusterShardingTelemetry]
140+
.shardHomeResponses
141+
117142
override implicit val patienceConfig: PatienceConfig = {
118143
import akka.testkit.TestDuration
119144
PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated, Span(1000, org.scalatest.time.Millis))
@@ -203,6 +228,13 @@ abstract class ClusterShardingInstrumentationSpec
203228
enterBarrier("blackhole-removed")
204229
}
205230

231+
"record latency of requesting ShardHome" in {
232+
runOn(second) {
233+
shardHomeRequests.get() shouldBe shardHomeResponses.get()
234+
}
235+
enterBarrier("measure-shard-home-latency")
236+
}
237+
206238
"clear buffered messages" in {
207239
runOn(second) {
208240
eventually(timeout(Span(5, Seconds))) {

0 commit comments

Comments
 (0)