Skip to content

Commit cead4d7

Browse files
more tweaks based on pr review
1 parent 63256c6 commit cead4d7

File tree

5 files changed

+52
-47
lines changed

5 files changed

+52
-47
lines changed

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ object ShardCoordinator {
648648
private val instrumentation = ClusterShardingInstrumentationProvider.get(context.system).instrumentation
649649
private val cluster = Cluster(context.system)
650650

651-
instrumentation.beginShardHandoff(cluster.selfAddress, self, typeName)
651+
instrumentation.beginShardHandoff(cluster.selfAddress, self, typeName, shard)
652652

653653
regions.foreach { region =>
654654
region ! BeginHandOff(shard)
@@ -713,11 +713,7 @@ object ShardCoordinator {
713713
}
714714

715715
def done(ok: Boolean): Unit = {
716-
if (ok) {
717-
instrumentation.finishedShardHandoffOk(cluster.selfAddress, self, typeName)
718-
} else {
719-
instrumentation.finishedShardHandoffNotOk(cluster.selfAddress, self, typeName)
720-
}
716+
instrumentation.finishedShardHandoff(cluster.selfAddress, self, typeName, shard, ok)
721717
context.parent ! RebalanceDone(shard, ok)
722718
context.stop(self)
723719
}

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/ClusterShardingInstrumentation.scala

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -94,17 +94,20 @@ class ClusterShardingTelemetryEnsemble(val instrumentations: Seq[ClusterSharding
9494
typeName: String): Unit =
9595
instrumentations.foreach(_.incrementShardRegionBufferSize(selfAddress, shardRegionActor, typeName))
9696

97-
override def beginShardHandoff(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit =
98-
instrumentations.foreach(_.beginShardHandoff(selfAddress, shardCoordinatorActor, typeName))
99-
100-
override def finishedShardHandoffOk(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit =
101-
instrumentations.foreach(_.finishedShardHandoffOk(selfAddress, shardCoordinatorActor, typeName))
97+
override def beginShardHandoff(
98+
selfAddress: Address,
99+
shardCoordinatorActor: ActorRef,
100+
typeName: String,
101+
shard: String): Unit =
102+
instrumentations.foreach(_.beginShardHandoff(selfAddress, shardCoordinatorActor, typeName, shard))
102103

103-
override def finishedShardHandoffNotOk(
104+
override def finishedShardHandoff(
104105
selfAddress: Address,
105106
shardCoordinatorActor: ActorRef,
106-
typeName: String): Unit =
107-
instrumentations.foreach(_.finishedShardHandoffNotOk(selfAddress, shardCoordinatorActor, typeName))
107+
typeName: String,
108+
shard: String,
109+
ok: Boolean): Unit =
110+
instrumentations.foreach(_.finishedShardHandoff(selfAddress, shardCoordinatorActor, typeName, shard, ok))
108111

109112
override def dependencies: immutable.Seq[String] =
110113
instrumentations.flatMap(_.dependencies)
@@ -133,11 +136,18 @@ class EmptyClusterShardingInstrumentation extends ClusterShardingInstrumentation
133136
shardRegionActor: ActorRef,
134137
typeName: String): Unit = ()
135138

136-
override def beginShardHandoff(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit = ()
137-
138-
override def finishedShardHandoffOk(selfAddress: Address, self: ActorRef, typeName: String): Unit = ()
139+
override def beginShardHandoff(
140+
selfAddress: Address,
141+
shardRegionActor: ActorRef,
142+
typeName: String,
143+
shard: String): Unit = ()
139144

140-
override def finishedShardHandoffNotOk(selfAddress: Address, self: ActorRef, typeName: String): Unit = ()
145+
override def finishedShardHandoff(
146+
selfAddress: Address,
147+
self: ActorRef,
148+
typeName: String,
149+
shard: String,
150+
ok: Boolean): Unit = ()
141151

142152
override def dependencies: immutable.Seq[String] = Nil
143153
}
@@ -158,11 +168,14 @@ trait ClusterShardingInstrumentation {
158168
*/
159169
def incrementShardRegionBufferSize(selfAddress: Address, shardRegionActor: ActorRef, typeName: String): Unit
160170

161-
def beginShardHandoff(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit
171+
def beginShardHandoff(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String, shard: String): Unit
162172

163-
def finishedShardHandoffOk(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit
164-
165-
def finishedShardHandoffNotOk(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit
173+
def finishedShardHandoff(
174+
selfAddress: Address,
175+
shardCoordinatorActor: ActorRef,
176+
typeName: String,
177+
shard: String,
178+
ok: Boolean): Unit
166179

167180
/**
168181
* Optional dependencies for this instrumentation.

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ abstract class ClusterShardingInstrumentationSpec
8383

8484
private val counter = ClusterShardingInstrumentationProvider(system).instrumentation
8585
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
86-
.counter
86+
.shardRegionBufferSizeCounter
8787

8888
override implicit val patienceConfig: PatienceConfig = {
8989
import akka.testkit.TestDuration

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingInstrumentationSpecTelemetry.scala

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package akka.cluster.sharding
66

77
import java.util.concurrent.atomic.AtomicInteger
8-
import java.util.concurrent.atomic.AtomicLong
98

109
import scala.annotation.nowarn
1110

@@ -17,43 +16,37 @@ import akka.cluster.sharding.internal.ClusterShardingInstrumentation
1716
class ClusterShardingInstrumentationSpecTelemetry(@nowarn("msg=never used") system: ExtendedActorSystem)
1817
extends ClusterShardingInstrumentation {
1918

20-
val counter = new AtomicInteger(0)
21-
val shardHandoffDuration = new AtomicLong(0)
19+
val shardRegionBufferSizeCounter = new AtomicInteger(0)
20+
val beginShardHandoffDurationCounter = new AtomicInteger(0)
21+
val finishShardHandoffDurationCounter = new AtomicInteger(0)
2222

2323
override def shardRegionBufferSize(
2424
selfAddress: Address,
2525
shardRegionActor: ActorRef,
2626
typeName: String,
2727
size: Int): Unit = {
28-
counter.set(size)
28+
shardRegionBufferSizeCounter.set(size)
2929
}
3030

3131
override def incrementShardRegionBufferSize(
3232
selfAddress: Address,
3333
shardRegionActor: ActorRef,
3434
typeName: String): Unit = {
35-
counter.incrementAndGet()
35+
shardRegionBufferSizeCounter.incrementAndGet()
3636
}
3737

38-
override def beginShardHandoff(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit = {
39-
println("ok jea >?>>")
40-
shardHandoffDuration.set(System.currentTimeMillis())
41-
println(s"ok jea >?>> ${shardHandoffDuration.get()}")
42-
}
43-
44-
override def finishedShardHandoffOk(selfAddress: Address, shardCoordinatorActor: ActorRef, typeName: String): Unit = {
45-
println("ok jea >?>> FINNNNNNN")
46-
val currentValue = shardHandoffDuration.get()
47-
shardHandoffDuration.set(System.currentTimeMillis() - currentValue)
48-
println(s"ok jea >?>> FINNNNNNN ${shardHandoffDuration.get()}")
49-
}
38+
override def beginShardHandoff(
39+
selfAddress: Address,
40+
shardCoordinatorActor: ActorRef,
41+
typeName: String,
42+
shard: String): Unit = beginShardHandoffDurationCounter.incrementAndGet()
5043

51-
override def finishedShardHandoffNotOk(
44+
override def finishedShardHandoff(
5245
selfAddress: Address,
5346
shardCoordinatorActor: ActorRef,
54-
typeName: String): Unit = {
55-
println("ok jea >?>> NOT FINNNNNNN")
56-
}
47+
typeName: String,
48+
shard: String,
49+
ok: Boolean): Unit = finishShardHandoffDurationCounter.incrementAndGet()
5750

5851
override def dependencies: Seq[String] = Nil
5952
}

akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/RollingUpdateShardAllocationSpec.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,14 @@ abstract class RollingUpdateShardAllocationSpec
226226
"verify instrumentation" in {
227227
// only run on oldest - Shard Coordinator
228228
runOn(third) {
229-
val shardHandoffDuration = ClusterShardingInstrumentationProvider(system).instrumentation
229+
val beginShardHandoffDurationCounter = ClusterShardingInstrumentationProvider(system).instrumentation
230230
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
231-
.shardHandoffDuration
231+
.beginShardHandoffDurationCounter
232+
val finishShardHandoffDurationCounter = ClusterShardingInstrumentationProvider(system).instrumentation
233+
.asInstanceOf[ClusterShardingInstrumentationSpecTelemetry]
234+
.finishShardHandoffDurationCounter
232235

233-
shardHandoffDuration.get() > 0 shouldBe true
236+
beginShardHandoffDurationCounter.get() shouldBe finishShardHandoffDurationCounter.get()
234237
}
235238
enterBarrier("shard-handoff-latency")
236239
}

0 commit comments

Comments
 (0)