Skip to content

Commit 3930318

Browse files
feat: latency for shard handoff
1 parent f44207d commit 3930318

File tree

5 files changed

+110
-33
lines changed

5 files changed

+110
-33
lines changed

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import akka.cluster.sharding.internal.{
3434
import akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy
3535
import akka.cluster.sharding.internal.ClusterShardAllocationMixin.RegionEntry
3636
import akka.cluster.sharding.internal.ClusterShardAllocationMixin.ShardSuitabilityOrdering
37+
import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider
3738
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesCoordinatorStore.MigrationMarker
3839
import akka.event.{ BusLogging, Logging }
3940
import akka.pattern.{ pipe, AskTimeoutException }
@@ -644,6 +645,11 @@ object ShardCoordinator {
644645

645646
import Internal._
646647

648+
private val instrumentation = ClusterShardingInstrumentationProvider.get(context.system).instrumentation
649+
val cluster = Cluster(context.system)
650+
651+
instrumentation.beginnShardHandoff(cluster.selfAddress, self, typeName)
652+
647653
regions.foreach { region =>
648654
region ! BeginHandOff(shard)
649655
}
@@ -707,6 +713,11 @@ object ShardCoordinator {
707713
}
708714

709715
def done(ok: Boolean): Unit = {
716+
if (ok) {
717+
instrumentation.finishedShardHandoffOk(cluster.selfAddress, self, typeName)
718+
} else {
719+
instrumentation.finishedShardHandoffNotOk(cluster.selfAddress, self, typeName)
720+
}
710721
context.parent ! RebalanceDone(shard, ok)
711722
context.stop(self)
712723
}

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,18 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding
9494
typeName: String): Unit =
9595
instrumentations.foreach(_.incrementShardRegionBufferSize(selfAddress, shardRegionActor, typeName))
9696

97+
override def beginnShardHandoff(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit =
98+
instrumentations.foreach(_.beginnShardHandoff(selfAddress, shardCoordinatorActor, typeName))
99+
100+
override def finishedShardHandoffOk(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit =
101+
instrumentations.foreach(_.finishedShardHandoffOk(selfAddress, shardCoordinatorActor, typeName))
102+
103+
override def finishedShardHandoffNotOk(
104+
selfAddress: Address,
105+
shardCoordinatorActor: ActorRef,
106+
typeName: String): Unit =
107+
instrumentations.foreach(_.finishedShardHandoffNotOk(selfAddress, shardCoordinatorActor, typeName))
108+
97109
override def dependencies: immutable.Seq[String] =
98110
instrumentations.flatMap(_.dependencies)
99111
}
@@ -121,6 +133,12 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation
121133
shardRegionActor: ActorRef,
122134
typeName: String): Unit = ()
123135

136+
override def beginnShardHandoff(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit = ()
137+
138+
override def finishedShardHandoffOk(selfAddress: Address, self: ActorRef, typeName: String): Unit = ()
139+
140+
override def finishedShardHandoffNotOk(selfAddress: Address, self: ActorRef, typeName: String): Unit = ()
141+
124142
override def dependencies: immutable.Seq[String] = Nil
125143
}
126144

@@ -140,6 +158,12 @@ trait ClusterShardingInstrumentation {
140158
*/
141159
def incrementShardRegionBufferSize(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit
142160

161+
def beginnShardHandoff(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit
162+
163+
def finishedShardHandoffOk(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit
164+
165+
def finishedShardHandoffNotOk(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit
166+
143167
/**
144168
* Optional dependencies for this instrumentation.
145169
*

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,17 @@
44

55
package akka.cluster.sharding
66

7-
import java.util.concurrent.atomic.AtomicInteger
8-
9-
import scala.annotation.nowarn
10-
117
import org.scalatest.concurrent.ScalaFutures
128
import org.scalatest.time.{ Seconds, Span }
139
import scala.concurrent.duration._
1410

1511
import org.scalatest.concurrent.Eventually.eventually
1612

17-
import akka.actor.ActorRef
18-
import akka.actor.ExtendedActorSystem
1913
import akka.actor.{ Actor, ActorLogging, Address, Props }
2014
import akka.cluster.Cluster
2115
import akka.cluster.MemberStatus
2216
import akka.cluster.sharding.ClusterShardingInstrumentationSpec.GiveMeYourHome.{ Get, Home }
23-
import akka.cluster.sharding.internal.{ ClusterShardingInstrumentation, ClusterShardingInstrumentationProvider }
17+
import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider
2418
import akka.remote.testkit.Direction
2519
import akka.testkit.TestProbe
2620
import akka.serialization.jackson.CborSerializable
@@ -32,7 +26,7 @@ object ClusterShardingInstrumentationSpecConfig
3226
additionalConfig = """
3327
akka.cluster.sharding {
3428
rebalance-interval = 120 s
35-
telemetry.instrumentations += akka.cluster.sharding.SpecClusterShardingTelemetry
29+
telemetry.instrumentations += akka.cluster.sharding.ClusterShardingInstrumentationSpecTelemetry
3630
buffer-size = 120
3731
}
3832
""") {
@@ -47,29 +41,6 @@ class ClusterShardingInstrumentationSpecMultiJvmNode1 extends ClusterShardingIns
4741

4842
class ClusterShardingInstrumentationSpecMultiJvmNode2 extends ClusterShardingInstrumentationSpec
4943

50-
class SpecClusterShardingTelemetry(@nowarn("msg=never used") system: ExtendedActorSystem)
51-
extends ClusterShardingInstrumentation {
52-
53-
val counter = new AtomicInteger(0)
54-
55-
override def shardRegionBufferSize(
56-
selfAddress: Address,
57-
shardRegionActor: ActorRef,
58-
typeName: String,
59-
size: Int): Unit = {
60-
counter.set(size)
61-
}
62-
63-
override def incrementShardRegionBufferSize(
64-
selfAddress: Address,
65-
shardRegionActor: ActorRef,
66-
typeName: String): Unit = {
67-
counter.incrementAndGet()
68-
}
69-
70-
override def dependencies: Seq[String] = Nil
71-
}
72-
7344
object ClusterShardingInstrumentationSpec {
7445

7546
object GiveMeYourHome {
@@ -111,7 +82,7 @@ abstract class ClusterShardingInstrumentationSpec
11182
import ClusterShardingInstrumentationSpecConfig._
11283

11384
private val counter = ClusterShardingInstrumentationProvider(system).instrumentation
114-
.asInstanceOf[SpecClusterShardingTelemetry]
85+
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
11586
.counter
11687

11788
override implicit val patienceConfig: PatienceConfig = {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (C) 2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.cluster.sharding
6+
7+
import java.util.concurrent.atomic.AtomicInteger
8+
import java.util.concurrent.atomic.AtomicLong
9+
10+
import scala.annotation.nowarn
11+
12+
import akka.actor.ActorRef
13+
import akka.actor.Address
14+
import akka.actor.ExtendedActorSystem
15+
import akka.cluster.sharding.internal.ClusterShardingInstrumentation
16+
17+
class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") system: ExtendedActorSystem)
18+
extends ClusterShardingInstrumentation {
19+
20+
val counter = new AtomicInteger(0)
21+
val shardHandoffDuration = new AtomicLong(0)
22+
23+
override def shardRegionBufferSize(
24+
selfAddress: Address,
25+
shardRegionActor: ActorRef,
26+
typeName: String,
27+
size: Int): Unit = {
28+
counter.set(size)
29+
}
30+
31+
override def incrementShardRegionBufferSize(
32+
selfAddress: Address,
33+
shardRegionActor: ActorRef,
34+
typeName: String): Unit = {
35+
counter.incrementAndGet()
36+
}
37+
38+
override def beginnShardHandoff(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit = {
39+
println("ok jea >?>>")
40+
shardHandoffDuration.set(System.currentTimeMillis())
41+
println(s"ok jea >?>> ${shardHandoffDuration.get()}")
42+
}
43+
44+
override def finishedShardHandoffOk(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit = {
45+
println("ok jea >?>> FINNNNNNN")
46+
val currentValue = shardHandoffDuration.get()
47+
shardHandoffDuration.set(System.currentTimeMillis() - currentValue)
48+
println(s"ok jea >?>> FINNNNNNN ${shardHandoffDuration.get()}")
49+
}
50+
51+
override def finishedShardHandoffNotOk(
52+
selfAddress: Address,
53+
shardCoordinatorActor: ActorRef,
54+
typeName: String): Unit = {
55+
println("ok jea >?>> NOT FINNNNNNN")
56+
}
57+
58+
override def dependencies: Seq[String] = Nil
59+
}

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/RollingUpdateShardAllocationSpec.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import akka.actor.Address
1414
import akka.actor.Props
1515
import akka.cluster.Cluster
1616
import akka.cluster.MemberStatus.Up
17+
import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider
1718
import akka.serialization.jackson.CborSerializable
1819
import akka.testkit.ImplicitSender
1920

@@ -29,6 +30,8 @@ object RollingUpdateShardAllocationSpecConfig
2930
akka.coordinated-shutdown.terminate-actor-system = off
3031
# use the new LeastShardAllocationStrategy
3132
akka.cluster.sharding.least-shard-allocation-strategy.rebalance-absolute-limit = 1
33+
# to test shard handoff latency telemetry
34+
telemetry.instrumentations += akka.cluster.sharding.ClusterShardingInstrumentationSpecTelemetry
3235
}
3336
""") {
3437

@@ -220,7 +223,16 @@ abstract class RollingUpdateShardAllocationSpec
220223
}
221224
enterBarrier("completo")
222225
}
226+
"verify instrumentation" in {
227+
// only run on oldest - Shard Coordinator
228+
runOn(third) {
229+
val shardHandoffDuration = ClusterShardingInstrumentationProvider(system).instrumentation
230+
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
231+
.shardHandoffDuration
223232

233+
shardHandoffDuration.get() > 0 shouldBe true
234+
}
235+
enterBarrier("shard-handoff-latency")
236+
}
224237
}
225-
226238
}

0 commit comments

Comments
 (0)