feat: telemetry for shard handoff#32888
Conversation
| typeName: String): Unit = | ||
| instrumentations.foreach(_.incrementShardRegionBufferSize(selfAddress, shardRegionActor, typeName)) | ||
|
|
||
| override def beginnShardHandoff(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit = |
| import Internal._ | ||
|
|
||
| private val instrumentation = ClusterShardingInstrumentationProvider.get(context.system).instrumentation | ||
| val cluster = Cluster(context.system) |
| private val instrumentation = ClusterShardingInstrumentationProvider.get(context.system).instrumentation | ||
| val cluster = Cluster(context.system) | ||
|
|
||
| instrumentation.beginnShardHandoff(cluster.selfAddress, self, typeName) |
There was a problem hiding this comment.
might be good to include shard since this is a handoff for a specific typeName and shard
| if (ok) { | ||
| instrumentation.finishedShardHandoffOk(cluster.selfAddress, self, typeName) | ||
| } else { | ||
| instrumentation.finishedShardHandoffNotOk(cluster.selfAddress, self, typeName) |
There was a problem hiding this comment.
could consider having only finishedShardHandoff with ok: Boolean as parameter?
| } | ||
|
|
||
| override def beginnShardHandoff(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit = { | ||
| println("ok jea >?>>") |
| selfAddress: Address, | ||
| shardRegionActor: ActorRef, | ||
| typeName: String): Unit = { | ||
| counter.incrementAndGet() |
There was a problem hiding this comment.
perhaps counter should have a more specific name now?
| override def finishedShardHandoffOk(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit = { | ||
| println("ok jea >?>> FINNNNNNN") | ||
| val currentValue = shardHandoffDuration.get() | ||
| shardHandoffDuration.set(System.currentTimeMillis() - currentValue) |
There was a problem hiding this comment.
similar comment here, avoid clock?
f44207d to
0fbb370
Compare
cead4d7 to
4f3f356
Compare
| selfAddress: Address, | ||
| shardCoordinatorActor: ActorRef, | ||
| typeName: String, | ||
| shard: String): Unit = beginShardHandoffDurationCounter.incrementAndGet() |
There was a problem hiding this comment.
That's not latency though, it is a counter. Latency would be a timestamp at begin to and then one at finish to calculate the time it took.
There was a problem hiding this comment.
Ah, it is just the test counting NVM!
There was a problem hiding this comment.
No, thanks for pointing out. I now avoid the term "latency" at all, as you are correct here!
...-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala
Outdated
Show resolved
Hide resolved
9c2e143 to
b0272ea
Compare
9b2a842 to
e5d7987
Compare
No description provided.