Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,7 @@ private[akka] class ShardRegion(
}

case ShardHome(shard, shardRegionRef) =>
instrumentation.receivedShardHome(cluster.selfAddress, self, typeName, shard)
receiveShardHome(shard, shardRegionRef)

case ShardHomes(homes) =>
Expand Down Expand Up @@ -1266,7 +1267,7 @@ private[akka] class ShardRegion(
shard,
coord,
buf.size)
coord ! GetShardHome(shard)
requestShardHome(shard)
}

if (retryCount >= 5 && retryCount % 5 == 0 && log.isWarningEnabled) {
Expand Down Expand Up @@ -1365,7 +1366,7 @@ private[akka] class ShardRegion(
case None =>
if (!shardBuffers.contains(shardId)) {
log.debug("{}: Request shard [{}] home. Coordinator [{}]", typeName, shardId, coordinator)
coordinator.foreach(_ ! GetShardHome(shardId))
requestShardHome(shardId)
}
val buf = shardBuffers.getOrEmpty(shardId)
log.debug(
Expand Down Expand Up @@ -1400,7 +1401,7 @@ private[akka] class ShardRegion(
case None =>
if (!shardBuffers.contains(shardId)) {
log.debug("{}: Request shard [{}] home. Coordinator [{}]", typeName, shardId, coordinator)
coordinator.foreach(_ ! GetShardHome(shardId))
requestShardHome(shardId)
}
bufferMessage(shardId, msg, snd)
}
Expand Down Expand Up @@ -1453,4 +1454,9 @@ private[akka] class ShardRegion(
actorSelections.foreach(_ ! GracefulShutdownReq(self))
}
}

private def requestShardHome(shard: ShardId): Unit = {
instrumentation.regionRequestedShardHome(cluster.selfAddress, self, typeName, shard)
coordinator.foreach(_ ! GetShardHome(shard))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding

override def dependencies: immutable.Seq[String] =
instrumentations.flatMap(_.dependencies)

override def regionRequestedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit =
instrumentations.foreach(_.regionRequestedShardHome(selfAddress, shardRegionActor, typeName, shardId))

override def receivedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit =
instrumentations.foreach(_.receivedShardHome(selfAddress, shardRegionActor, typeName, shardId))
}

/**
Expand All @@ -122,6 +136,18 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation
typeName: String): Unit = ()

override def dependencies: immutable.Seq[String] = Nil

override def regionRequestedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit = ()

override def receivedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit = ()
}

/**
Expand All @@ -140,6 +166,14 @@ trait ClusterShardingInstrumentation {
*/
def incrementShardRegionBufferSize(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit

def regionRequestedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit

def receivedShardHome(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, shardId: String): Unit

/**
* Optional dependencies for this instrumentation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ package akka.cluster.sharding
import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.nowarn
import scala.concurrent.duration._

import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Seconds, Span }
import scala.concurrent.duration._

import org.scalatest.concurrent.Eventually.eventually

Expand Down Expand Up @@ -51,6 +51,8 @@ class SpecClusterShardingTelemetry(@nowarn("msg=never used") system: ExtendedAct
extends ClusterShardingInstrumentation {

val counter = new AtomicInteger(0)
val shardHomeRequests = new AtomicInteger(0)
val shardHomeResponses = new AtomicInteger(0)

override def shardRegionBufferSize(
selfAddress: Address,
Expand All @@ -68,6 +70,19 @@ class SpecClusterShardingTelemetry(@nowarn("msg=never used") system: ExtendedAct
}

override def dependencies: Seq[String] = Nil

override def regionRequestedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit =
shardHomeRequests.incrementAndGet()

override def receivedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit = shardHomeResponses.incrementAndGet()
}

object ClusterShardingInstrumentationSpec {
Expand Down Expand Up @@ -114,6 +129,16 @@ abstract class ClusterShardingInstrumentationSpec
.asInstanceOf[SpecClusterShardingTelemetry]
.counter

val shardHomeRequests =
ClusterShardingInstrumentationProvider(system).instrumentation
.asInstanceOf[SpecClusterShardingTelemetry]
.shardHomeRequests

val shardHomeResponses =
ClusterShardingInstrumentationProvider(system).instrumentation
.asInstanceOf[SpecClusterShardingTelemetry]
.shardHomeResponses

override implicit val patienceConfig: PatienceConfig = {
import akka.testkit.TestDuration
PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated, Span(1000, org.scalatest.time.Millis))
Expand Down Expand Up @@ -203,6 +228,13 @@ abstract class ClusterShardingInstrumentationSpec
enterBarrier("blackhole-removed")
}

"record latency of requesting ShardHome" in {
runOn(second) {
shardHomeRequests.get() shouldBe shardHomeResponses.get()
}
enterBarrier("measure-shard-home-latency")
}

"clear buffered messages" in {
runOn(second) {
eventually(timeout(Span(5, Seconds))) {
Expand Down