Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
"fields" : [ {
"name" : "MODULE$"
} ]
}, {
"name" : "akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider$",
"fields" : [ {
"name" : "MODULE$"
} ]
}, {
"name" : "akka.cluster.sharding.protobuf.ClusterShardingMessageSerializer",
"methods" : [ {
Expand All @@ -27,4 +32,4 @@
"methods" : [ {
"name" : "<init>"
} ]
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import akka.cluster.sharding.internal.{
import akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy
import akka.cluster.sharding.internal.ClusterShardAllocationMixin.RegionEntry
import akka.cluster.sharding.internal.ClusterShardAllocationMixin.ShardSuitabilityOrdering
import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesCoordinatorStore.MigrationMarker
import akka.event.{ BusLogging, Logging }
import akka.pattern.{ pipe, AskTimeoutException }
Expand Down Expand Up @@ -644,6 +645,11 @@ object ShardCoordinator {

import Internal._

private val instrumentation = ClusterShardingInstrumentationProvider.get(context.system).instrumentation
private val cluster = Cluster(context.system)

instrumentation.shardHandoffStarted(cluster.selfAddress, self, typeName, shard)

regions.foreach { region =>
region ! BeginHandOff(shard)
}
Expand Down Expand Up @@ -707,6 +713,7 @@ object ShardCoordinator {
}

def done(ok: Boolean): Unit = {
instrumentation.shardHandoffFinished(cluster.selfAddress, self, typeName, shard, ok)
context.parent ! RebalanceDone(shard, ok)
context.stop(self)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.cluster.sharding.ClusterShardingSettings.PassivationStrategy
import akka.cluster.sharding.Shard.ShardStats
import akka.cluster.sharding.internal.ClusterShardingInstrumentationProvider
import akka.cluster.sharding.internal.RememberEntitiesProvider
import akka.cluster.sharding.internal.RememberEntityStarterManager
import akka.event.Logging
Expand Down Expand Up @@ -639,6 +640,8 @@ private[akka] class ShardRegion(

private val verboseDebug = context.system.settings.config.getBoolean("akka.cluster.sharding.verbose-debug-logging")

private val instrumentation = ClusterShardingInstrumentationProvider.get(context.system).instrumentation

// sort by age, oldest first
val ageOrdering = Member.ageOrdering
// membersByAge is only used for tracking where coordinator is running
Expand Down Expand Up @@ -687,6 +690,7 @@ private[akka] class ShardRegion(
timers.startTimerWithFixedDelay(Retry, Retry, retryInterval)
startRegistration()
logPassivationStrategy()
instrumentation.shardRegionBufferSize(cluster.selfAddress, self, typeName, 0)
}

override def postStop(): Unit = {
Expand All @@ -695,6 +699,7 @@ private[akka] class ShardRegion(
coordinator.foreach(_ ! RegionStopped(context.self))
cluster.unsubscribe(self)
gracefulShutdownProgress.trySuccess(Done)
instrumentation.shardRegionBufferSize(cluster.selfAddress, self, typeName, 0)
}

private def logPassivationStrategy(): Unit = {
Expand Down Expand Up @@ -749,7 +754,7 @@ private[akka] class ShardRegion(
coordinator.map { coordRef =>
val a = coordRef.path.address

if (a.hasLocalScope) cluster.selfMember.address
if (a.hasLocalScope) cluster.selfAddress
else a
}

Expand Down Expand Up @@ -902,6 +907,7 @@ private[akka] class ShardRegion(
}

case ShardHome(shard, shardRegionRef) =>
instrumentation.receivedShardHome(cluster.selfAddress, self, typeName, shard)
receiveShardHome(shard, shardRegionRef)

case ShardHomes(homes) =>
Expand Down Expand Up @@ -942,12 +948,15 @@ private[akka] class ShardRegion(
if (shardBuffers.contains(shard)) {
val dropped = shardBuffers
.drop(shard, "Avoiding reordering of buffered messages at shard handoff", context.system.deadLetters)
if (dropped > 0)
if (dropped > 0) {
log.warning(
"{}: Dropping [{}] buffered messages to shard [{}] during hand off to avoid re-ordering",
typeName,
dropped,
shard)
// better to decrease by "dropped" to avoid calculating the size?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// better to decrease by "dropped" to avoid calculating the size?

instrumentation.shardRegionBufferSize(cluster.selfAddress, self, typeName, shardBuffers.totalSize)
}
loggedFullBufferWarning = false
}

Expand Down Expand Up @@ -1258,7 +1267,7 @@ private[akka] class ShardRegion(
shard,
coord,
buf.size)
coord ! GetShardHome(shard)
requestShardHome(shard)
}

if (retryCount >= 5 && retryCount % 5 == 0 && log.isWarningEnabled) {
Expand Down Expand Up @@ -1290,7 +1299,7 @@ private[akka] class ShardRegion(
context.system.deadLetters ! msg
} else {
shardBuffers.append(shardId, msg, snd)

instrumentation.shardRegionBufferSizeIncremented(cluster.selfAddress, self, typeName)
// log some insight to how buffers are filled up every 10% of the buffer capacity
val tot = totBufSize + 1
if (tot % (bufferSize / 10) == 0) {
Expand Down Expand Up @@ -1323,6 +1332,7 @@ private[akka] class ShardRegion(
}

shardBuffers.remove(shardId)
instrumentation.shardRegionBufferSize(cluster.selfAddress, self, typeName, shardBuffers.totalSize)
}
loggedFullBufferWarning = false
retryCount = 0
Expand Down Expand Up @@ -1356,7 +1366,7 @@ private[akka] class ShardRegion(
case None =>
if (!shardBuffers.contains(shardId)) {
log.debug("{}: Request shard [{}] home. Coordinator [{}]", typeName, shardId, coordinator)
coordinator.foreach(_ ! GetShardHome(shardId))
requestShardHome(shardId)
}
val buf = shardBuffers.getOrEmpty(shardId)
log.debug(
Expand All @@ -1365,6 +1375,7 @@ private[akka] class ShardRegion(
shardId,
buf.size + 1)
shardBuffers.append(shardId, msg, snd)
instrumentation.shardRegionBufferSizeIncremented(cluster.selfAddress, self, typeName)
}

case _ =>
Expand All @@ -1390,7 +1401,7 @@ private[akka] class ShardRegion(
case None =>
if (!shardBuffers.contains(shardId)) {
log.debug("{}: Request shard [{}] home. Coordinator [{}]", typeName, shardId, coordinator)
coordinator.foreach(_ ! GetShardHome(shardId))
requestShardHome(shardId)
}
bufferMessage(shardId, msg, snd)
}
Expand Down Expand Up @@ -1443,4 +1454,9 @@ private[akka] class ShardRegion(
actorSelections.foreach(_ ! GracefulShutdownReq(self))
}
}

private def requestShardHome(shard: ShardId): Unit = {
instrumentation.regionRequestedShardHome(cluster.selfAddress, self, typeName, shard)
coordinator.foreach(_ ! GetShardHome(shard))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Copyright (C) 2025 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.cluster.sharding.internal

import akka.actor.ExtendedActorSystem
import akka.annotation.InternalStableApi
import scala.collection.immutable
import scala.jdk.CollectionConverters._

import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.ClassicActorSystemProvider
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.event.Logging
import akka.util.TopologicalSort.topologicalSort

/**
* INTERNAL API
*/
@InternalStableApi
object ClusterShardingInstrumentationProvider
extends ExtensionId[ClusterShardingInstrumentationProvider]
with ExtensionIdProvider {
override def get(system: ActorSystem): ClusterShardingInstrumentationProvider = super.get(system)

override def get(system: ClassicActorSystemProvider): ClusterShardingInstrumentationProvider = super.get(system)

override def lookup = ClusterShardingInstrumentationProvider

override def createExtension(system: ExtendedActorSystem): ClusterShardingInstrumentationProvider =
new ClusterShardingInstrumentationProvider(system)
}

/**
* INTERNAL API
*/
@InternalStableApi
class ClusterShardingInstrumentationProvider(system: ExtendedActorSystem) extends Extension {
private val fqcnConfigPath = "akka.cluster.sharding.telemetry.instrumentations"

lazy val instrumentation: ClusterShardingInstrumentation = {
if (!system.settings.config.hasPath(fqcnConfigPath)) {
EmptyClusterShardingInstrumentation
} else {
val fqcns = system.settings.config.getStringList(fqcnConfigPath).asScala.toVector
fqcns.size match {
case 0 => EmptyClusterShardingInstrumentation
case 1 => create(fqcns.head)
case _ =>
val instrumentationsByFqcn = fqcns.iterator.map(fqcn => fqcn -> create(fqcn)).toMap
val sortedNames = topologicalSort[String](fqcns, fqcn => instrumentationsByFqcn(fqcn).dependencies.toSet)
val instrumentations = sortedNames.map(instrumentationsByFqcn).toVector
new ClusterShardingTelemetryEnsemble(instrumentations)
}
}
}

private def create(fqcn: String): ClusterShardingInstrumentation = {
try {
system.dynamicAccess
.createInstanceFor[ClusterShardingInstrumentation](fqcn, immutable.Seq(classOf[ExtendedActorSystem] -> system))
.get
} catch {
case t: Throwable => // Throwable, because instrumentation failure should not cause fatal shutdown
Logging(system.classicSystem, classOf[ClusterShardingInstrumentationProvider])
.warning(t, "Cannot create instrumentation [{}]", fqcn)
EmptyClusterShardingInstrumentation
}
}
}

/**
* INTERNAL API
*/
@InternalStableApi
class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterShardingInstrumentation])
extends ClusterShardingInstrumentation {

override def shardRegionBufferSize(
selfAddress: Address,
shardRegionActor: ActorRef,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the selfAddress and shardRegionActor? is that because Cinnamon has that existing structure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the current version both are used. I let @pvlugter share his thoughts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't work on cluster sharding instrumentation originally, but if you're integrating with what's there already, these will be for the identity and for accessing metadata.

But you can also have this new telemetry be completely separate. You'll mostly just want the entity type for a metric label.

Copy link
Contributor

@patriknw patriknw Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it feels like Cinnamon should already know the ActorSystem, and thereby the address, and I don't see why this metric should be coupled to shardRegionActor. The address + typeName should be enough to create a unique key. However, if that is needed because it makes it easier on the Cinnamon side, then so be it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also not sure why self address is being passed and why the actor ref is being used. I see that the address is already accessed automatically from the actor system for some cluster instrumentation, and agree that the entity type name is what should be used for identifying.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, then I remove it again (I thought somewhere Cinnamon needed it).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll likely need it if you're cross-integrating this instrumentation with the existing sharding instrumentation. Otherwise you can define this SPI cleanly, which I think is preferable.

typeName: String,
size: Int): Unit =
instrumentations.foreach(_.shardRegionBufferSize(selfAddress, shardRegionActor, typeName, size))

override def shardRegionBufferSizeIncremented(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String): Unit =
instrumentations.foreach(_.shardRegionBufferSizeIncremented(selfAddress, shardRegionActor, typeName))

override def regionRequestedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit =
instrumentations.foreach(_.regionRequestedShardHome(selfAddress, shardRegionActor, typeName, shardId))

override def receivedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit =
instrumentations.foreach(_.receivedShardHome(selfAddress, shardRegionActor, typeName, shardId))

override def shardHandoffStarted(
selfAddress: Address,
shardCoordinatorActor: ActorRef,
typeName: String,
shard: String): Unit =
instrumentations.foreach(_.shardHandoffStarted(selfAddress, shardCoordinatorActor, typeName, shard))

override def shardHandoffFinished(
selfAddress: Address,
shardCoordinatorActor: ActorRef,
typeName: String,
shard: String,
ok: Boolean): Unit =
instrumentations.foreach(_.shardHandoffFinished(selfAddress, shardCoordinatorActor, typeName, shard, ok))

override def dependencies: immutable.Seq[String] =
instrumentations.flatMap(_.dependencies)

}

/**
* INTERNAL API
*/
@InternalStableApi
object EmptyClusterShardingInstrumentation extends EmptyClusterShardingInstrumentation

/**
* INTERNAL API
*/
@InternalStableApi
class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation {

override def shardRegionBufferSize(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
size: Int): Unit = ()

override def shardRegionBufferSizeIncremented(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String): Unit = ()

override def regionRequestedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit = ()

override def receivedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit = ()

override def shardHandoffStarted(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shard: String): Unit = ()

override def shardHandoffFinished(
selfAddress: Address,
self: ActorRef,
typeName: String,
shard: String,
ok: Boolean): Unit = ()

override def dependencies: immutable.Seq[String] = Nil
}

/**
* INTERNAL API: Instrumentation SPI for Akka Cluster.
*/
@InternalStableApi
trait ClusterShardingInstrumentation {

def shardRegionBufferSize(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, size: Int): Unit

def shardRegionBufferSizeIncremented(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit

def regionRequestedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit

def receivedShardHome(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, shardId: String): Unit

def shardHandoffStarted(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String, shard: String): Unit

def shardHandoffFinished(
selfAddress: Address,
shardCoordinatorActor: ActorRef,
typeName: String,
shard: String,
ok: Boolean): Unit

/**
* Optional dependencies for this instrumentation.
*
* Dependency instrumentations will always be ordered before this instrumentation.
*
* @return list of class names for optional instrumentation dependencies
*/
def dependencies: immutable.Seq[String]
}
Loading