Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,7 @@ private[akka] class ShardRegion(
loggedFullBufferWarning = true
}
context.system.deadLetters ! msg
instrumentation.messageDropped(cluster.selfAddress, self, typeName)
} else {
shardBuffers.append(shardId, msg, snd)
instrumentation.shardRegionBufferSizeIncremented(cluster.selfAddress, self, typeName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding
shardId: String): Unit =
instrumentations.foreach(_.regionRequestedShardHome(selfAddress, shardRegionActor, typeName, shardId))

override def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit =
instrumentations.foreach(_.messageDropped(selfAddress, self, typeName))

override def receivedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
Expand Down Expand Up @@ -163,6 +166,8 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation
typeName: String,
shardId: String): Unit = ()

override def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit = ()

override def shardHandoffStarted(
selfAddress: Address,
shardRegionActor: ActorRef,
Expand Down Expand Up @@ -197,6 +202,8 @@ trait ClusterShardingInstrumentation {

def receivedShardHome(selfAddress: Address, shardRegionActor: ActorRef, typeName: String, shardId: String): Unit

def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another scaladoc to drop


def shardHandoffStarted(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String, shard: String): Unit

def shardHandoffFinished(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import scala.concurrent.duration._

import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Seconds, Span }

import org.scalatest.concurrent.Eventually.eventually

import akka.actor.{ Actor, ActorLogging, Address, Props }
Expand Down Expand Up @@ -86,16 +85,15 @@ abstract class ClusterShardingInstrumentationSpec
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
.shardRegionBufferSizeCounter

private val dropMessageCounter = ClusterShardingInstrumentationProvider(system).instrumentation
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
.dropMessageCounter

val shardHomeRequests =
ClusterShardingInstrumentationProvider(system).instrumentation
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
.shardHomeRequests

val shardHomeResponses =
ClusterShardingInstrumentationProvider(system).instrumentation
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
.shardHomeResponses

override implicit val patienceConfig: PatienceConfig = {
import akka.testkit.TestDuration
PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated, Span(1000, org.scalatest.time.Millis))
Expand Down Expand Up @@ -173,6 +171,7 @@ abstract class ClusterShardingInstrumentationSpec
// we have 100 in the buffer, and our cap is 120 (per config in this test)
// 10 are dropped. Should we have a metric on this? Or custom events?
shardRegionBufferSizeCounter.get() shouldBe 120
dropMessageCounter.get() shouldBe 10
}
}
enterBarrier("buffer-overflow")
Expand All @@ -187,7 +186,21 @@ abstract class ClusterShardingInstrumentationSpec

"record latency of requesting ShardHome" in {
runOn(second) {
shardHomeRequests.get() shouldBe shardHomeResponses.get()
eventually(timeout(Span(5, Seconds))) {
shardHomeRequests.size > 0 shouldBe true
shardHomeRequests.foreach {
case (key, value) =>
if (key.startsWith("id")) {
// The "id-0", "id-1" ... messages were send during the blackhole.
// This means they were requested twice, but only received back once.
value.get() shouldBe 1
} else {
// The "a", "b", "c" messages were send before the blackhole.
// This means they got a proper send-and-request cycle
value.get() shouldBe 0
}
}
}
}
enterBarrier("measure-shard-home-latency")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.cluster.sharding
import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.nowarn
import scala.collection.concurrent.TrieMap

import akka.actor.ActorRef
import akka.actor.Address
Expand All @@ -19,8 +20,10 @@ class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") syst
val shardRegionBufferSizeCounter = new AtomicInteger(0)
val beginShardHandoffDurationCounter = new AtomicInteger(0)
val finishShardHandoffDurationCounter = new AtomicInteger(0)
val shardHomeRequests = new AtomicInteger(0)
val shardHomeResponses = new AtomicInteger(0)

val shardHomeRequests = new TrieMap[String, AtomicInteger]()

val dropMessageCounter = new AtomicInteger(0)

override def shardRegionBufferSize(
selfAddress: Address,
Expand Down Expand Up @@ -54,14 +57,21 @@ class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") syst
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit =
shardHomeRequests.incrementAndGet()
shardId: String): Unit = shardHomeRequests.getOrElseUpdate(shardId, new AtomicInteger(0)).incrementAndGet()

override def receivedShardHome(
selfAddress: Address,
shardRegionActor: ActorRef,
typeName: String,
shardId: String): Unit = shardHomeResponses.incrementAndGet()
shardId: String): Unit =
shardHomeRequests.get(shardId) match {
case Some(value) =>
value.decrementAndGet()
case None => () // should not happen
}

override def messageDropped(selfAddress: Address, self: ActorRef, typeName: String): Unit =
dropMessageCounter.incrementAndGet()

override def dependencies: Seq[String] = Nil
}