Conversation
| private val verboseDebug = context.system.settings.config.getBoolean("akka.cluster.sharding.verbose-debug-logging") | ||
|
|
||
| private val instrumentation = | ||
| ClusterShardingInstrumentationProvider.get(context.system).instrumentation("shard_region", typeName) |
There was a problem hiding this comment.
What is the purpose of the scope parameter?
Wouldn't it be easier to include the typeName in the shardBufferSize and increaseShardBufferSize methods?
Then a single ClusterShardingInstrumentationProvider instance can be used, instead of creating a new one for each shard.
There was a problem hiding this comment.
What is the purpose of the scope parameter?
Idea is to have it as an attribute in the metric, to drill down by component ("shard_region" or "shard").
Yes, I can move it to a single instance in the extension and pass in the params.
| } else { | ||
| shardBuffers.append(shardId, msg, snd) | ||
|
|
||
| instrumentation.shardBufferSize(totBufSize + 1) |
There was a problem hiding this comment.
increaseShardBufferSize isn't used?
what's the plan should it always report the size, or increment/decrement
There was a problem hiding this comment.
Idea was that, instead of having to calculate the size each time, to use +1 / -1 where possible.
| } | ||
|
|
||
| val typeName = "GiveMeYourHome" | ||
| val initiallyOnForth = "on-fourth" |
...-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala
Show resolved
Hide resolved
...-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala
Show resolved
Hide resolved
| runOn(second) { | ||
| val probe = TestProbe() | ||
| (1 to 100).foreach { n => | ||
| shardRegion.tell(Get(s"id-$n"), probe.ref) |
There was a problem hiding this comment.
In the warmup you use "id1" and here "id-1". Those are different shard ids, but perhaps make it more clear by using something completely different in warmup, such as "a", "b", "c"
There was a problem hiding this comment.
I think there is a race condition in this test. You use a shared counter for all shards, which is reset to 0 when any shard is started. So it could increase the counter for id-1, but then the shard actor for id-2 is started, and resetting the counter to 0 again.
There was a problem hiding this comment.
That would trigger if we would create more than node in addition to the coordinator? Or are the more than one instance of ShardRegion in this test?
There was a problem hiding this comment.
Right, I was thinking wrong, it's reset when the region is started, and we only have one region (typeName) per jvm here.
| } | ||
| eventually { | ||
| ClusterShardInstrumentatioSpecConfig.counter.get() shouldBe 100 | ||
| } |
There was a problem hiding this comment.
the test could continue by removing the blackhole, and see that the buffer size decrease to 0 again
There was a problem hiding this comment.
Yes, good! And also dropping messages if the buffer is full.
| @InternalStableApi | ||
| class EventsourcedInstrumentationProvider(system: ExtendedActorSystem) extends Extension { | ||
| private val fqcnConfigPath = "akka.persistence.telemetry.eventsourced.instrumentations" | ||
| private val fqcnConfigPath = "akka.persistence.telemetry.eventsourced.instrumentatiffons" |
| runOn(second) { | ||
| val probe = TestProbe() | ||
| (1 to 100).foreach { n => | ||
| shardRegion.tell(Get(s"id-$n"), probe.ref) |
There was a problem hiding this comment.
Right, I was thinking wrong, it's reset when the region is started, and we only have one region (typeName) per jvm here.
| val second = role("second") | ||
| testTransport(on = true) | ||
|
|
||
| val counter = new AtomicInteger() |
There was a problem hiding this comment.
event though we have isolation by separate JVMs for each node here, it would be nice to not use a global counter, but place the counter inside SpecClusterShardingTelemetry.
From the test you can access it with
ClusterShardingInstrumentationProvider(system).instrumentation.asInstanceOf[SpecClusterShardingTelemetry].counter
| dropped, | ||
| shard) | ||
| // better to decrease by "dropped" to avoid calculating the size? | ||
| instrumentation.shardBufferSize(scope, typeName, shardBuffers.size) |
There was a problem hiding this comment.
What is the purpose of the scope parameter? Isn't that always "shard_region"? Is it some kind of "might be good in the future"? If we have more buffers in sharding that we want to instrument, we can have explicit methods for them in the SPI?
There was a problem hiding this comment.
shouldn't this be shardBuffers.totalSize instead of shardBuffers.size?
// better to decrease by "dropped" to avoid calculating the size?
this drop should be rare, so performance of calculating the size is not a a reason, but might be better to have symmetry in the SPI with increase and decrease:
def shardBufferSize(typeName: String, size: Int): Unit
def incrementShardBufferSize(typeName: String, delta: Int): Unit
def decrementShardBufferSize(typeName: String, delta: Int): Unit
Then you can use decrement from deliver too
There was a problem hiding this comment.
Sounds good.
But then this would be shardRegionBufferSize for now here, and later we add shardBufferSize once we add instrumentation to akka.cluster.sharding.Shard?
There was a problem hiding this comment.
shouldn't this be shardBuffers.totalSize instead of shardBuffers.size
Ouch, jea! Good catch. I think this scenario is just not triggered yet in this test case?
akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala
Show resolved
Hide resolved
|
|
||
| override def shardRegionBufferSize( | ||
| selfAddress: Address, | ||
| shardRegionActor: ActorRef, |
There was a problem hiding this comment.
do we need the selfAddress and shardRegionActor? is that because Cinnamon has that existing structure?
There was a problem hiding this comment.
Yes, in the current version both are used. I let @pvlugter share his thoughts.
There was a problem hiding this comment.
I didn't work on cluster sharding instrumentation originally, but if you're integrating with what's there already, these will be for the identity and for accessing metadata.
But you can also have this new telemetry be completely separate. You'll mostly just want the entity type for a metric label.
There was a problem hiding this comment.
To me it feels like Cinnamon should already know the ActorSystem, and thereby the address, and I don't see why this metric should be coupled to shardRegionActor. The address + typeName should be enough to create a unique key. However, if that is needed because it makes it easier on the Cinnamon side, then so be it.
There was a problem hiding this comment.
Also not sure why self address is being passed and why the actor ref is being used. I see that the address is already accessed automatically from the actor system for some cluster instrumentation, and agree that the entity type name is what should be used for identifying.
There was a problem hiding this comment.
Ok, then I remove it again (I thought somewhere Cinnamon needed it).
There was a problem hiding this comment.
You'll likely need it if you're cross-integrating this instrumentation with the existing sharding instrumentation. Otherwise you can define this SPI cleanly, which I think is preferable.
patriknw
left a comment
There was a problem hiding this comment.
LGTM, even though I don't see the full rational for the address and shardRegionActor parameters
|
|
||
| override def shardRegionBufferSize( | ||
| selfAddress: Address, | ||
| shardRegionActor: ActorRef, |
There was a problem hiding this comment.
To me it feels like Cinnamon should already know the ActorSystem, and thereby the address, and I don't see why this metric should be coupled to shardRegionActor. The address + typeName should be enough to create a unique key. However, if that is needed because it makes it easier on the Cinnamon side, then so be it.
f44207d to
0fbb370
Compare
9c2e143 to
b0272ea
Compare
johanandren
left a comment
There was a problem hiding this comment.
LGTM with a few left over todos dropped
| typeName, | ||
| dropped, | ||
| shard) | ||
| // better to decrease by "dropped" to avoid calculating the size? |
There was a problem hiding this comment.
| // better to decrease by "dropped" to avoid calculating the size? |
...-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala
Outdated
Show resolved
Hide resolved
Co-authored-by: Johan Andrén <johan@markatta.com>
* feat: metric for dropped messages in Shard Region buffer
|
Running a nightly base on this branch: https://github.com/akka/akka-core/actions/runs/22094044273 |
No description provided.