Skip to content

Commit ecee976

Browse files
feat: telemetry for shard handoff (#32888)
1 parent b0272ea commit ecee976

File tree

6 files changed

+150
-72
lines changed

6 files changed

+150
-72
lines changed

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import akka.cluster.sharding.internal.{
3434
import akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy
3535
import akka.cluster.sharding.internal.ClusterShardAllocationMixin.RegionEntry
3636
import akka.cluster.sharding.internal.ClusterShardAllocationMixin.ShardSuitabilityOrdering
37+
import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider
3738
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesCoordinatorStore.MigrationMarker
3839
import akka.event.{ BusLogging, Logging }
3940
import akka.pattern.{ pipe, AskTimeoutException }
@@ -644,6 +645,11 @@ object ShardCoordinator {
644645

645646
import Internal._
646647

648+
private val instrumentation = ClusterShardingInstrumentationProvider.get(context.system).instrumentation
649+
private val cluster = Cluster(context.system)
650+
651+
instrumentation.shardHandoffStarted(cluster.selfAddress, self, typeName, shard)
652+
647653
regions.foreach { region =>
648654
region ! BeginHandOff(shard)
649655
}
@@ -707,6 +713,7 @@ object ShardCoordinator {
707713
}
708714

709715
def done(ok: Boolean): Unit = {
716+
instrumentation.shardHandoffFinished(cluster.selfAddress, self, typeName, shard, ok)
710717
context.parent ! RebalanceDone(shard, ok)
711718
context.stop(self)
712719
}

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,7 +1299,7 @@ private[akka] class ShardRegion(
12991299
context.system.deadLetters ! msg
13001300
} else {
13011301
shardBuffers.append(shardId, msg, snd)
1302-
instrumentation.incrementShardRegionBufferSize(cluster.selfAddress, self, typeName)
1302+
instrumentation.shardRegionBufferSizeIncremented(cluster.selfAddress, self, typeName)
13031303
// log some insight to how buffers are filled up every 10% of the buffer capacity
13041304
val tot = totBufSize + 1
13051305
if (tot % (bufferSize / 10) == 0) {
@@ -1375,7 +1375,7 @@ private[akka] class ShardRegion(
13751375
shardId,
13761376
buf.size + 1)
13771377
shardBuffers.append(shardId, msg, snd)
1378-
instrumentation.incrementShardRegionBufferSize(cluster.selfAddress, self, typeName)
1378+
instrumentation.shardRegionBufferSizeIncremented(cluster.selfAddress, self, typeName)
13791379
}
13801380

13811381
case _ =>

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,11 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding
8888
size: Int): Unit =
8989
instrumentations.foreach(_.shardRegionBufferSize(selfAddress, shardRegionActor, typeName, size))
9090

91-
override def incrementShardRegionBufferSize(
91+
override def shardRegionBufferSizeIncremented(
9292
selfAddress: Address,
9393
shardRegionActor: ActorRef,
9494
typeName: String): Unit =
95-
instrumentations.foreach(_.incrementShardRegionBufferSize(selfAddress, shardRegionActor, typeName))
96-
97-
override def dependencies: immutable.Seq[String] =
98-
instrumentations.flatMap(_.dependencies)
95+
instrumentations.foreach(_.shardRegionBufferSizeIncremented(selfAddress, shardRegionActor, typeName))
9996

10097
override def regionRequestedShardHome(
10198
selfAddress: Address,
@@ -110,6 +107,25 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding
110107
typeName: String,
111108
shardId: String): Unit =
112109
instrumentations.foreach(_.receivedShardHome(selfAddress, shardRegionActor, typeName, shardId))
110+
111+
override def shardHandoffStarted(
112+
selfAddress: Address,
113+
shardCoordinatorActor: ActorRef,
114+
typeName: String,
115+
shard: String): Unit =
116+
instrumentations.foreach(_.shardHandoffStarted(selfAddress, shardCoordinatorActor, typeName, shard))
117+
118+
override def shardHandoffFinished(
119+
selfAddress: Address,
120+
shardCoordinatorActor: ActorRef,
121+
typeName: String,
122+
shard: String,
123+
ok: Boolean): Unit =
124+
instrumentations.foreach(_.shardHandoffFinished(selfAddress, shardCoordinatorActor, typeName, shard, ok))
125+
126+
override def dependencies: immutable.Seq[String] =
127+
instrumentations.flatMap(_.dependencies)
128+
113129
}
114130

115131
/**
@@ -130,13 +146,11 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation
130146
typeName: String,
131147
size: Int): Unit = ()
132148

133-
override def incrementShardRegionBufferSize(
149+
override def shardRegionBufferSizeIncremented(
134150
selfAddress: Address,
135151
shardRegionActor: ActorRef,
136152
typeName: String): Unit = ()
137153

138-
override def dependencies: immutable.Seq[String] = Nil
139-
140154
override def regionRequestedShardHome(
141155
selfAddress: Address,
142156
shardRegionActor: ActorRef,
@@ -148,6 +162,21 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation
148162
shardRegionActor: ActorRef,
149163
typeName: String,
150164
shardId: String): Unit = ()
165+
166+
override def shardHandoffStarted(
167+
selfAddress: Address,
168+
shardRegionActor: ActorRef,
169+
typeName: String,
170+
shard: String): Unit = ()
171+
172+
override def shardHandoffFinished(
173+
selfAddress: Address,
174+
self: ActorRef,
175+
typeName: String,
176+
shard: String,
177+
ok: Boolean): Unit = ()
178+
179+
override def dependencies: immutable.Seq[String] = Nil
151180
}
152181

153182
/**
@@ -156,15 +185,9 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation
156185
@InternalStableApi
157186
trait ClusterShardingInstrumentation {
158187

159-
/**
160-
* @param size set current size of the buffer.
161-
*/
162188
def shardRegionBufferSize(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, size: Int): Unit
163189

164-
/**
165-
* Increase the current size of the buffer by one.
166-
*/
167-
def incrementShardRegionBufferSize(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit
190+
def shardRegionBufferSizeIncremented(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit
168191

169192
def regionRequestedShardHome(
170193
selfAddress: Address,
@@ -174,6 +197,15 @@ trait ClusterShardingInstrumentation {
174197

175198
def receivedShardHome(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, shardId: String): Unit
176199

200+
def shardHandoffStarted(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String, shard: String): Unit
201+
202+
def shardHandoffFinished(
203+
selfAddress: Address,
204+
shardCoordinatorActor: ActorRef,
205+
typeName: String,
206+
shard: String,
207+
ok: Boolean): Unit
208+
177209
/**
178210
* Optional dependencies for this instrumentation.
179211
*

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala

Lines changed: 10 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,18 @@
44

55
package akka.cluster.sharding
66

7-
import java.util.concurrent.atomic.AtomicInteger
8-
9-
import scala.annotation.nowarn
107
import scala.concurrent.duration._
118

129
import org.scalatest.concurrent.ScalaFutures
1310
import org.scalatest.time.{ Seconds, Span }
1411

1512
import org.scalatest.concurrent.Eventually.eventually
1613

17-
import akka.actor.ActorRef
18-
import akka.actor.ExtendedActorSystem
1914
import akka.actor.{ Actor, ActorLogging, Address, Props }
2015
import akka.cluster.Cluster
2116
import akka.cluster.MemberStatus
2217
import akka.cluster.sharding.ClusterShardingInstrumentationSpec.GiveMeYourHome.{ Get, Home }
23-
import akka.cluster.sharding.internal.{ ClusterShardingInstrumentation, ClusterShardingInstrumentationProvider }
18+
import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider
2419
import akka.remote.testkit.Direction
2520
import akka.testkit.TestProbe
2621
import akka.serialization.jackson.CborSerializable
@@ -32,7 +27,7 @@ object ClusterShardingInstrumentationSpecConfig
3227
additionalConfig = """
3328
akka.cluster.sharding {
3429
rebalance-interval = 120 s
35-
telemetry.instrumentations += akka.cluster.sharding.SpecClusterShardingTelemetry
30+
telemetry.instrumentations += akka.cluster.sharding.ClusterShardingInstrumentationSpecTelemetry
3631
buffer-size = 120
3732
}
3833
""") {
@@ -47,44 +42,6 @@ class ClusterShardingInstrumentationSpecMultiJvmNode1 extends ClusterShardingIns
4742

4843
class ClusterShardingInstrumentationSpecMultiJvmNode2 extends ClusterShardingInstrumentationSpec
4944

50-
class SpecClusterShardingTelemetry(@nowarn("msg=never used") system: ExtendedActorSystem)
51-
extends ClusterShardingInstrumentation {
52-
53-
val counter = new AtomicInteger(0)
54-
val shardHomeRequests = new AtomicInteger(0)
55-
val shardHomeResponses = new AtomicInteger(0)
56-
57-
override def shardRegionBufferSize(
58-
selfAddress: Address,
59-
shardRegionActor: ActorRef,
60-
typeName: String,
61-
size: Int): Unit = {
62-
counter.set(size)
63-
}
64-
65-
override def incrementShardRegionBufferSize(
66-
selfAddress: Address,
67-
shardRegionActor: ActorRef,
68-
typeName: String): Unit = {
69-
counter.incrementAndGet()
70-
}
71-
72-
override def dependencies: Seq[String] = Nil
73-
74-
override def regionRequestedShardHome(
75-
selfAddress: Address,
76-
shardRegionActor: ActorRef,
77-
typeName: String,
78-
shardId: String): Unit =
79-
shardHomeRequests.incrementAndGet()
80-
81-
override def receivedShardHome(
82-
selfAddress: Address,
83-
shardRegionActor: ActorRef,
84-
typeName: String,
85-
shardId: String): Unit = shardHomeResponses.incrementAndGet()
86-
}
87-
8845
object ClusterShardingInstrumentationSpec {
8946

9047
object GiveMeYourHome {
@@ -125,18 +82,18 @@ abstract class ClusterShardingInstrumentationSpec
12582
import ClusterShardingInstrumentationSpec.GiveMeYourHome._
12683
import ClusterShardingInstrumentationSpecConfig._
12784

128-
private val counter = ClusterShardingInstrumentationProvider(system).instrumentation
129-
.asInstanceOf[SpecClusterShardingTelemetry]
130-
.counter
85+
private val shardRegionBufferSizeCounter = ClusterShardingInstrumentationProvider(system).instrumentation
86+
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
87+
.shardRegionBufferSizeCounter
13188

13289
val shardHomeRequests =
13390
ClusterShardingInstrumentationProvider(system).instrumentation
134-
.asInstanceOf[SpecClusterShardingTelemetry]
91+
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
13592
.shardHomeRequests
13693

13794
val shardHomeResponses =
13895
ClusterShardingInstrumentationProvider(system).instrumentation
139-
.asInstanceOf[SpecClusterShardingTelemetry]
96+
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
14097
.shardHomeResponses
14198

14299
override implicit val patienceConfig: PatienceConfig = {
@@ -200,7 +157,7 @@ abstract class ClusterShardingInstrumentationSpec
200157
shardRegion.tell(Get(s"id-$n"), probe.ref)
201158
}
202159
eventually {
203-
counter.get() shouldBe 100
160+
shardRegionBufferSizeCounter.get() shouldBe 100
204161
}
205162
}
206163
enterBarrier("messages-buffered")
@@ -215,7 +172,7 @@ abstract class ClusterShardingInstrumentationSpec
215172
eventually {
216173
// we have 100 in the buffer, and our cap is 120 (per config in this test)
217174
// 10 are dropped. Should we have a metric on this? Or custom events?
218-
counter.get() shouldBe 120
175+
shardRegionBufferSizeCounter.get() shouldBe 120
219176
}
220177
}
221178
enterBarrier("buffer-overflow")
@@ -238,7 +195,7 @@ abstract class ClusterShardingInstrumentationSpec
238195
"clear buffered messages" in {
239196
runOn(second) {
240197
eventually(timeout(Span(5, Seconds))) {
241-
counter.get() shouldBe 0
198+
shardRegionBufferSizeCounter.get() shouldBe 0
242199
}
243200
}
244201
enterBarrier("messages-buffered-cleared")
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (C) 2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.cluster.sharding
6+
7+
import java.util.concurrent.atomic.AtomicInteger
8+
9+
import scala.annotation.nowarn
10+
11+
import akka.actor.ActorRef
12+
import akka.actor.Address
13+
import akka.actor.ExtendedActorSystem
14+
import akka.cluster.sharding.internal.ClusterShardingInstrumentation
15+
16+
class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") system: ExtendedActorSystem)
17+
extends ClusterShardingInstrumentation {
18+
19+
val shardRegionBufferSizeCounter = new AtomicInteger(0)
20+
val beginShardHandoffDurationCounter = new AtomicInteger(0)
21+
val finishShardHandoffDurationCounter = new AtomicInteger(0)
22+
val shardHomeRequests = new AtomicInteger(0)
23+
val shardHomeResponses = new AtomicInteger(0)
24+
25+
override def shardRegionBufferSize(
26+
selfAddress: Address,
27+
shardRegionActor: ActorRef,
28+
typeName: String,
29+
size: Int): Unit = {
30+
shardRegionBufferSizeCounter.set(size)
31+
}
32+
33+
override def shardRegionBufferSizeIncremented(
34+
selfAddress: Address,
35+
shardRegionActor: ActorRef,
36+
typeName: String): Unit = {
37+
shardRegionBufferSizeCounter.incrementAndGet()
38+
}
39+
40+
override def shardHandoffStarted(
41+
selfAddress: Address,
42+
shardCoordinatorActor: ActorRef,
43+
typeName: String,
44+
shard: String): Unit = beginShardHandoffDurationCounter.incrementAndGet()
45+
46+
override def shardHandoffFinished(
47+
selfAddress: Address,
48+
shardCoordinatorActor: ActorRef,
49+
typeName: String,
50+
shard: String,
51+
ok: Boolean): Unit = finishShardHandoffDurationCounter.incrementAndGet()
52+
53+
override def regionRequestedShardHome(
54+
selfAddress: Address,
55+
shardRegionActor: ActorRef,
56+
typeName: String,
57+
shardId: String): Unit =
58+
shardHomeRequests.incrementAndGet()
59+
60+
override def receivedShardHome(
61+
selfAddress: Address,
62+
shardRegionActor: ActorRef,
63+
typeName: String,
64+
shardId: String): Unit = shardHomeResponses.incrementAndGet()
65+
66+
override def dependencies: Seq[String] = Nil
67+
}

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/RollingUpdateShardAllocationSpec.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import akka.actor.Address
1414
import akka.actor.Props
1515
import akka.cluster.Cluster
1616
import akka.cluster.MemberStatus.Up
17+
import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider
1718
import akka.serialization.jackson.CborSerializable
1819
import akka.testkit.ImplicitSender
1920

@@ -29,6 +30,8 @@ object RollingUpdateShardAllocationSpecConfig
2930
akka.coordinated-shutdown.terminate-actor-system = off
3031
# use the new LeastShardAllocationStrategy
3132
akka.cluster.sharding.least-shard-allocation-strategy.rebalance-absolute-limit = 1
33+
# to test shard handoff instrumentation
34+
telemetry.instrumentations += akka.cluster.sharding.ClusterShardingInstrumentationSpecTelemetry
3235
}
3336
""") {
3437

@@ -220,7 +223,19 @@ abstract class RollingUpdateShardAllocationSpec
220223
}
221224
enterBarrier("completo")
222225
}
223-
226+
"verify instrumentation" in {
227+
// only run on oldest - Shard Coordinator
228+
runOn(third) {
229+
val beginShardHandoffDurationCounter = ClusterShardingInstrumentationProvider(system).instrumentation
230+
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
231+
.beginShardHandoffDurationCounter
232+
val finishShardHandoffDurationCounter = ClusterShardingInstrumentationProvider(system).instrumentation
233+
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
234+
.finishShardHandoffDurationCounter
235+
236+
beginShardHandoffDurationCounter.get() shouldBe finishShardHandoffDurationCounter.get()
237+
}
238+
enterBarrier("shard-handoff-telemetry")
239+
}
224240
}
225-
226241
}

0 commit comments

Comments
 (0)