Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import akka.cluster.sharding.internal.{
import akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy
import akka.cluster.sharding.internal.ClusterShardAllocationMixin.RegionEntry
import akka.cluster.sharding.internal.ClusterShardAllocationMixin.ShardSuitabilityOrdering
import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesCoordinatorStore.MigrationMarker
import akka.event.{ BusLogging, Logging }
import akka.pattern.{ pipe, AskTimeoutException }
Expand Down Expand Up @@ -644,6 +645,11 @@ object ShardCoordinator {

import Internal._

private val instrumentation = ClusterShardingInstrumentationProvider.get(context.system).instrumentation
private val cluster = Cluster(context.system)

instrumentation.shardHandoffStarted(cluster.selfAddress, self, typeName, shard)

regions.foreach { region =>
region ! BeginHandOff(shard)
}
Expand Down Expand Up @@ -707,6 +713,7 @@ object ShardCoordinator {
}

def done(ok: Boolean): Unit = {
instrumentation.shardHandoffFinished(cluster.selfAddress, self, typeName, shard, ok)
context.parent ! RebalanceDone(shard, ok)
context.stop(self)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ private[akka] class ShardRegion(
context.system.deadLetters ! msg
} else {
shardBuffers.append(shardId, msg, snd)
instrumentation.incrementShardRegionBufferSize(cluster.selfAddress, self, typeName)
instrumentation.shardRegionBufferSizeIncremented(cluster.selfAddress, self, typeName)
// log some insight to how buffers are filled up every 10% of the buffer capacity
val tot = totBufSize + 1
if (tot % (bufferSize / 10) == 0) {
Expand Down Expand Up @@ -1375,7 +1375,7 @@ private[akka] class ShardRegion(
shardId,
buf.size + 1)
shardBuffers.append(shardId, msg, snd)
instrumentation.incrementShardRegionBufferSize(cluster.selfAddress, self, typeName)
instrumentation.shardRegionBufferSizeIncremented(cluster.selfAddress, self, typeName)
}

case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,11 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding
size: Int): Unit =
instrumentations.foreach(_.shardRegionBufferSize(selfAddress, shardRegionActor, typeName, size))

override def incrementShardRegionBufferSize(
override def shardRegionBufferSizeIncremented(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String): Unit =
instrumentations.foreach(_.incrementShardRegionBufferSize(selfAddress, shardRegionActor, typeName))

override def dependencies: immutable.Seq[String] =
instrumentations.flatMap(_.dependencies)
instrumentations.foreach(_.shardRegionBufferSizeIncremented(selfAddress, shardRegionActor, typeName))

override def regionRequestedShardHome(
selfAddress: Address,
Expand All @@ -110,6 +107,25 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding
typeName: String,
shardId: String): Unit =
instrumentations.foreach(_.receivedShardHome(selfAddress, shardRegionActor, typeName, shardId))

override def shardHandoffStarted(
selfAddress: Address,
shardCoordinatorActor: ActorRef,
typeName: String,
shard: String): Unit =
instrumentations.foreach(_.shardHandoffStarted(selfAddress, shardCoordinatorActor, typeName, shard))

override def shardHandoffFinished(
selfAddress: Address,
shardCoordinatorActor: ActorRef,
typeName: String,
shard: String,
ok: Boolean): Unit =
instrumentations.foreach(_.shardHandoffFinished(selfAddress, shardCoordinatorActor, typeName, shard, ok))

override def dependencies: immutable.Seq[String] =
instrumentations.flatMap(_.dependencies)

}

/**
Expand All @@ -130,13 +146,11 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation
typeName: String,
size: Int): Unit = ()

override def incrementShardRegionBufferSize(
override def shardRegionBufferSizeIncremented(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String): Unit = ()

override def dependencies: immutable.Seq[String] = Nil

override def regionRequestedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
Expand All @@ -148,6 +162,21 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit = ()

override def shardHandoffStarted(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shard: String): Unit = ()

override def shardHandoffFinished(
selfAddress: Address,
self: ActorRef,
typeName: String,
shard: String,
ok: Boolean): Unit = ()

override def dependencies: immutable.Seq[String] = Nil
}

/**
Expand All @@ -156,15 +185,9 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation
@InternalStableApi
trait ClusterShardingInstrumentation {

/**
* @param size set current size of the buffer.
*/
def shardRegionBufferSize(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, size: Int): Unit

/**
* Increase the current size of the buffer by one.
*/
def incrementShardRegionBufferSize(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit
def shardRegionBufferSizeIncremented(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit

def regionRequestedShardHome(
selfAddress: Address,
Expand All @@ -174,6 +197,15 @@ trait ClusterShardingInstrumentation {

def receivedShardHome(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, shardId: String): Unit

def shardHandoffStarted(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String, shard: String): Unit

def shardHandoffFinished(
selfAddress: Address,
shardCoordinatorActor: ActorRef,
typeName: String,
shard: String,
ok: Boolean): Unit

/**
* Optional dependencies for this instrumentation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,18 @@

package akka.cluster.sharding

import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.nowarn
import scala.concurrent.duration._

import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Seconds, Span }

import org.scalatest.concurrent.Eventually.eventually

import akka.actor.ActorRef
import akka.actor.ExtendedActorSystem
import akka.actor.{ Actor, ActorLogging, Address, Props }
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.cluster.sharding.ClusterShardingInstrumentationSpec.GiveMeYourHome.{ Get, Home }
import akka.cluster.sharding.internal.{ ClusterShardingInstrumentation, ClusterShardingInstrumentationProvider }
import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider
import akka.remote.testkit.Direction
import akka.testkit.TestProbe
import akka.serialization.jackson.CborSerializable
Expand All @@ -32,7 +27,7 @@ object ClusterShardingInstrumentationSpecConfig
additionalConfig = """
akka.cluster.sharding {
rebalance-interval = 120 s
telemetry.instrumentations += akka.cluster.sharding.SpecClusterShardingTelemetry
telemetry.instrumentations += akka.cluster.sharding.ClusterShardingInstrumentationSpecTelemetry
buffer-size = 120
}
""") {
Expand All @@ -47,44 +42,6 @@ class ClusterShardingInstrumentationSpecMultiJvmNode1 extends ClusterShardingIns

class ClusterShardingInstrumentationSpecMultiJvmNode2 extends ClusterShardingInstrumentationSpec

class SpecClusterShardingTelemetry(@nowarn("msg=never used") system: ExtendedActorSystem)
extends ClusterShardingInstrumentation {

val counter = new AtomicInteger(0)
val shardHomeRequests = new AtomicInteger(0)
val shardHomeResponses = new AtomicInteger(0)

override def shardRegionBufferSize(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
size: Int): Unit = {
counter.set(size)
}

override def incrementShardRegionBufferSize(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String): Unit = {
counter.incrementAndGet()
}

override def dependencies: Seq[String] = Nil

override def regionRequestedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit =
shardHomeRequests.incrementAndGet()

override def receivedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit = shardHomeResponses.incrementAndGet()
}

object ClusterShardingInstrumentationSpec {

object GiveMeYourHome {
Expand Down Expand Up @@ -125,18 +82,18 @@ abstract class ClusterShardingInstrumentationSpec
import ClusterShardingInstrumentationSpec.GiveMeYourHome._
import ClusterShardingInstrumentationSpecConfig._

private val counter = ClusterShardingInstrumentationProvider(system).instrumentation
.asInstanceOf[SpecClusterShardingTelemetry]
.counter
private val shardRegionBufferSizeCounter = ClusterShardingInstrumentationProvider(system).instrumentation
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
.shardRegionBufferSizeCounter

val shardHomeRequests =
ClusterShardingInstrumentationProvider(system).instrumentation
.asInstanceOf[SpecClusterShardingTelemetry]
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
.shardHomeRequests

val shardHomeResponses =
ClusterShardingInstrumentationProvider(system).instrumentation
.asInstanceOf[SpecClusterShardingTelemetry]
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
.shardHomeResponses

override implicit val patienceConfig: PatienceConfig = {
Expand Down Expand Up @@ -200,7 +157,7 @@ abstract class ClusterShardingInstrumentationSpec
shardRegion.tell(Get(s"id-$n"), probe.ref)
}
eventually {
counter.get() shouldBe 100
shardRegionBufferSizeCounter.get() shouldBe 100
}
}
enterBarrier("messages-buffered")
Expand All @@ -215,7 +172,7 @@ abstract class ClusterShardingInstrumentationSpec
eventually {
// we have 100 in the buffer, and our cap is 120 (per config in this test)
// 10 are dropped. Should we have a metric on this? Or custom events?
counter.get() shouldBe 120
shardRegionBufferSizeCounter.get() shouldBe 120
}
}
enterBarrier("buffer-overflow")
Expand All @@ -238,7 +195,7 @@ abstract class ClusterShardingInstrumentationSpec
"clear buffered messages" in {
runOn(second) {
eventually(timeout(Span(5, Seconds))) {
counter.get() shouldBe 0
shardRegionBufferSizeCounter.get() shouldBe 0
}
}
enterBarrier("messages-buffered-cleared")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2025 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.cluster.sharding

import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.nowarn

import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.cluster.sharding.internal.ClusterShardingInstrumentation

class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") system: ExtendedActorSystem)
extends ClusterShardingInstrumentation {

val shardRegionBufferSizeCounter = new AtomicInteger(0)
val beginShardHandoffDurationCounter = new AtomicInteger(0)
val finishShardHandoffDurationCounter = new AtomicInteger(0)
val shardHomeRequests = new AtomicInteger(0)
val shardHomeResponses = new AtomicInteger(0)

override def shardRegionBufferSize(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
size: Int): Unit = {
shardRegionBufferSizeCounter.set(size)
}

override def shardRegionBufferSizeIncremented(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String): Unit = {
shardRegionBufferSizeCounter.incrementAndGet()
}

override def shardHandoffStarted(
selfAddress: Address,
shardCoordinatorActor: ActorRef,
typeName: String,
shard: String): Unit = beginShardHandoffDurationCounter.incrementAndGet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not latency though, it is a counter. Latency would be a timestamp at begin to and then one at finish to calculate the time it took.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it is just the test counting NVM!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, thanks for pointing out. I now avoid the term "latency" at all, as you are correct here!


override def shardHandoffFinished(
selfAddress: Address,
shardCoordinatorActor: ActorRef,
typeName: String,
shard: String,
ok: Boolean): Unit = finishShardHandoffDurationCounter.incrementAndGet()

override def regionRequestedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit =
shardHomeRequests.incrementAndGet()

override def receivedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit = shardHomeResponses.incrementAndGet()

override def dependencies: Seq[String] = Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import akka.actor.Address
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.MemberStatus.Up
import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider
import akka.serialization.jackson.CborSerializable
import akka.testkit.ImplicitSender

Expand All @@ -29,6 +30,8 @@ object RollingUpdateShardAllocationSpecConfig
akka.coordinated-shutdown.terminate-actor-system = off
# use the new LeastShardAllocationStrategy
akka.cluster.sharding.least-shard-allocation-strategy.rebalance-absolute-limit = 1
# to test shard handoff instrumentation
telemetry.instrumentations += akka.cluster.sharding.ClusterShardingInstrumentationSpecTelemetry
}
""") {

Expand Down Expand Up @@ -220,7 +223,19 @@ abstract class RollingUpdateShardAllocationSpec
}
enterBarrier("completo")
}

"verify instrumentation" in {
// only run on oldest - Shard Coordinator
runOn(third) {
val beginShardHandoffDurationCounter = ClusterShardingInstrumentationProvider(system).instrumentation
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
.beginShardHandoffDurationCounter
val finishShardHandoffDurationCounter = ClusterShardingInstrumentationProvider(system).instrumentation
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
.finishShardHandoffDurationCounter

beginShardHandoffDurationCounter.get() shouldBe finishShardHandoffDurationCounter.get()
}
enterBarrier("shard-handoff-telemetry")
}
}

}