Skip to content

Commit 29a9085

Browse files
authored
publish gauges with akka cluster members per status (#1189)
* publish gauges with cluster members per status * only apply cluster instrumentation on Akka 2.5/2.6 * remove the bintray plugin * publish status and reachability gauges for Cluster members * use backwards-compatible version of Cluster.subscribe(...) * avoid registering member metrics without tags * put the cluster metrics behind an feature flag
1 parent 1212250 commit 29a9085

File tree

5 files changed

+193
-8
lines changed

5 files changed

+193
-8
lines changed

build.sbt

-4
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ lazy val `kamon-twitter-future` = (project in file("instrumentation/kamon-twitte
196196
.enablePlugins(JavaAgent)
197197
.settings(instrumentationSettings)
198198
.settings(
199-
bintrayPackage := "kamon-futures",
200199
libraryDependencies ++= Seq(
201200
kanelaAgent % "provided",
202201
"com.twitter" %% "util-core" % "20.3.0" % "provided",
@@ -211,7 +210,6 @@ lazy val `kamon-scalaz-future` = (project in file("instrumentation/kamon-scalaz-
211210
.enablePlugins(JavaAgent)
212211
.settings(instrumentationSettings)
213212
.settings(
214-
bintrayPackage := "kamon-futures",
215213
libraryDependencies ++= Seq(
216214
kanelaAgent % "provided",
217215
"org.scalaz" %% "scalaz-concurrent" % "7.2.28" % "provided",
@@ -226,7 +224,6 @@ lazy val `kamon-scala-future` = (project in file("instrumentation/kamon-scala-fu
226224
.enablePlugins(JavaAgent)
227225
.settings(instrumentationSettings)
228226
.settings(
229-
bintrayPackage := "kamon-futures",
230227
libraryDependencies ++=Seq(
231228
kanelaAgent % "provided",
232229
scalatest % "test",
@@ -240,7 +237,6 @@ lazy val `kamon-cats-io` = (project in file("instrumentation/kamon-cats-io"))
240237
.enablePlugins(JavaAgent)
241238
.settings(instrumentationSettings)
242239
.settings(
243-
bintrayPackage := "kamon-futures",
244240
libraryDependencies ++= Seq(
245241
kanelaAgent % "provided",
246242
{

instrumentation/kamon-akka/src/common/resources/reference.conf

+14-1
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,17 @@ kamon.instrumentation.akka {
122122
# shards) will be sampled.
123123
shard-metrics-sample-interval = ${kamon.metric.tick-interval}
124124
}
125+
126+
cluster {
127+
128+
# !! EXPERIMENTAL !!
129+
#
130+
# Decides whether to expose the akka.cluster.[members|datacenters] metrics. These metrics are considered
131+
# experimental and must be explicitly enabled until a future release when they graduate to stable. The name of
132+
# this setting might change in the future.
133+
track-cluster-metrics = no
134+
135+
}
125136
}
126137
# Signals to akka that it should load KamonRemoteInstrument
127138
akka.remote.artery.advanced.instruments += "akka.remote.artery.KamonRemoteInstrument"
@@ -144,14 +155,16 @@ kanela.modules {
144155
"kamon.instrumentation.akka.instrumentations.akka_25.DispatcherInstrumentation",
145156
"kamon.instrumentation.akka.instrumentations.akka_26.DispatcherInstrumentation",
146157
"kamon.instrumentation.akka.instrumentations.akka_26.ActorMonitorInstrumentation",
147-
"kamon.instrumentation.akka.instrumentations.SchedulerInstrumentation"
158+
"kamon.instrumentation.akka.instrumentations.SchedulerInstrumentation",
159+
"kamon.instrumentation.akka.instrumentations.ClusterInstrumentation"
148160
]
149161

150162
within = [
151163
"^akka.dispatch..*",
152164
"^akka.event..*",
153165
"^akka.actor..*",
154166
"^akka.pattern..*",
167+
"^akka.cluster..*",
155168
"^akka.routing..*",
156169
"kamon.instrumentation.akka.instrumentations..*"
157170
]

instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaInstrumentation.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,16 @@ object AkkaInstrumentation {
8484
autoGrouping: Boolean,
8585
allowDoomsdayWildcards: Boolean,
8686
safeActorTrackFilter: Filter,
87-
safeActorStartTraceFilter: Filter
87+
safeActorStartTraceFilter: Filter,
88+
exposeClusterMetrics: Boolean
8889
)
8990

9091
object Settings {
9192

9293
def from(config: Config): Settings = {
9394
val akkaConfig = config.getConfig("kamon.instrumentation.akka")
9495
val allowDoomsdayWildcards = akkaConfig.getBoolean("filters.actors.doomsday-wildcard")
96+
val exposeClusterMetrics = akkaConfig.getBoolean("cluster.track-cluster-metrics")
9597

9698
val askPatternWarning = akkaConfig.getString("ask-pattern-timeout-warning") match {
9799
case "off" => Off
@@ -105,7 +107,8 @@ object AkkaInstrumentation {
105107
akkaConfig.getBoolean("auto-grouping"),
106108
allowDoomsdayWildcards,
107109
safeFilter(config.getConfig(TrackActorFilterName), allowDoomsdayWildcards),
108-
safeFilter(config.getConfig(StartTraceActorFilterName), allowDoomsdayWildcards)
110+
safeFilter(config.getConfig(StartTraceActorFilterName), allowDoomsdayWildcards),
111+
exposeClusterMetrics
109112
)
110113
}
111114

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package kamon.instrumentation
2+
package akka.instrumentations
3+
4+
import _root_.akka.actor.{Actor, Address, ExtendedActorSystem, Props}
5+
import _root_.akka.cluster.{Cluster, ClusterEvent, MemberStatus}
6+
import kamon.Kamon
7+
import kamon.instrumentation.akka.AkkaInstrumentation
8+
import kamon.metric.Gauge
9+
import kamon.tag.TagSet
10+
import kanela.agent.api.instrumentation.InstrumentationBuilder
11+
import kanela.agent.libs.net.bytebuddy.asm.Advice
12+
13+
import scala.collection.mutable
14+
15+
class ClusterInstrumentation extends InstrumentationBuilder with VersionFiltering {
16+
17+
onAkka("2.5", "2.6") {
18+
onType("akka.cluster.Cluster$")
19+
.advise(method("createExtension").and(takesArguments(1)), AfterClusterInitializationAdvice)
20+
}
21+
}
22+
23+
object AfterClusterInitializationAdvice {
24+
25+
@Advice.OnMethodExit
26+
def onClusterExtensionCreated(@Advice.Argument(0) system: ExtendedActorSystem, @Advice.Return clusterExtension: Cluster): Unit = {
27+
val settings = AkkaInstrumentation.settings()
28+
if(settings.exposeClusterMetrics) {
29+
val stateExporter = system.systemActorOf(Props[ClusterInstrumentation.ClusterStateExporter], "kamon-cluster-state-exporter")
30+
clusterExtension.subscribe(stateExporter, classOf[ClusterEvent.ClusterDomainEvent])
31+
}
32+
}
33+
}
34+
35+
object ClusterInstrumentation {
36+
37+
class ClusterStateExporter extends Actor {
38+
private val clusterExtension = Cluster(context.system)
39+
private val clusterTags = TagSet.of("akka.system.name", context.system.name)
40+
41+
private val joiningMembers = ClusterMembersJoining.withTags(clusterTags)
42+
private val weaklyUpMembers = ClusterMembersWeaklyUp.withTags(clusterTags)
43+
private val upMembers = ClusterMembersUp.withTags(clusterTags)
44+
private val leavingMembers = ClusterMembersLeaving.withTags(clusterTags)
45+
private val exitingMembers = ClusterMembersExiting.withTags(clusterTags)
46+
private val downMembers = ClusterMembersDown.withTags(clusterTags)
47+
private val removedMembers = ClusterMembersRemoved.withTags(clusterTags)
48+
private val totalMembers = ClusterMembersTotal.withTags(clusterTags)
49+
private val unreachableMembers = ClusterMembersUnreachable.withTags(clusterTags)
50+
private val unreachableDatacenters = ClusterDatacentersUnreachable.withTags(clusterTags)
51+
private val monitoredNodes = mutable.HashMap.empty[Address, (Gauge, Gauge)]
52+
53+
override def receive: Receive = {
54+
case _: ClusterEvent.ClusterDomainEvent => updateAllStates(clusterExtension.state)
55+
case initialState: ClusterEvent.CurrentClusterState => updateAllStates(initialState)
56+
}
57+
58+
private def updateAllStates(clusterState: ClusterEvent.CurrentClusterState): Unit = {
59+
val membersPerStatus = clusterState.members.groupBy(_.status)
60+
joiningMembers.update(membersPerStatus.getOrElse(MemberStatus.Joining, Set.empty).size)
61+
weaklyUpMembers.update(membersPerStatus.getOrElse(MemberStatus.WeaklyUp, Set.empty).size)
62+
upMembers.update(membersPerStatus.getOrElse(MemberStatus.Up, Set.empty).size)
63+
leavingMembers.update(membersPerStatus.getOrElse(MemberStatus.Leaving, Set.empty).size)
64+
exitingMembers.update(membersPerStatus.getOrElse(MemberStatus.Exiting, Set.empty).size)
65+
downMembers.update(membersPerStatus.getOrElse(MemberStatus.Down, Set.empty).size)
66+
67+
val removedMembersCount = membersPerStatus.getOrElse(MemberStatus.Removed, Set.empty).size
68+
val totalMembersCount = clusterState.members.size - removedMembersCount
69+
removedMembers.update(removedMembersCount)
70+
totalMembers.update(totalMembersCount)
71+
72+
unreachableMembers.update(clusterState.unreachable.size)
73+
unreachableDatacenters.update(clusterState.unreachableDataCenters.size)
74+
75+
// The status and reachability gauges will only be published for the subset of members that are currently being
76+
// monitored by this node.
77+
val currentlyMonitoredMembers = clusterState.members.filter(m => clusterExtension.failureDetector.isMonitoring(m.address))
78+
val currentlyMonitoredAddresses = currentlyMonitoredMembers.map { member =>
79+
val (statusGauge, reachabilityGauge) = monitoredNodes.getOrElseUpdate(member.address, {
80+
val memberTags = clusterTags.withTag("member", member.address.toString)
81+
82+
(
83+
ClusterMemberStatus.withTags(memberTags),
84+
ClusterMemberReachability.withTags(memberTags)
85+
)
86+
})
87+
88+
statusGauge.update(statusToGaugeValue(member.status))
89+
reachabilityGauge.update(if(clusterState.unreachable(member)) 1D else 0D)
90+
member.address
91+
}
92+
93+
// Remove any cached Gauges for members that we might not be monitoring anymore
94+
monitoredNodes.keys.filterNot(a => currentlyMonitoredAddresses(a)).foreach { addressToRemove =>
95+
monitoredNodes.remove(addressToRemove).foreach {
96+
case (statusGauge, reachabilityGauge) =>
97+
statusGauge.remove()
98+
reachabilityGauge.remove()
99+
}
100+
}
101+
}
102+
103+
private def statusToGaugeValue(memberStatus: MemberStatus): Double = memberStatus match {
104+
case MemberStatus.Joining => 1
105+
case MemberStatus.WeaklyUp => 2
106+
case MemberStatus.Up => 3
107+
case MemberStatus.Leaving => 4
108+
case MemberStatus.Exiting => 5
109+
case MemberStatus.Down => 6
110+
case MemberStatus.Removed => 7
111+
case _ => 0 // This should never happen, but covering the bases here
112+
}
113+
}
114+
115+
val ClusterMembersJoining = Kamon.gauge(
116+
name = "akka.cluster.members.joining.count",
117+
description = "Tracks the number of cluster members in the Joining state"
118+
)
119+
120+
val ClusterMembersWeaklyUp = Kamon.gauge(
121+
name = "akka.cluster.members.weakly-up.count",
122+
description = "Tracks the number of cluster members in the Weakly-Up state"
123+
)
124+
125+
val ClusterMembersUp = Kamon.gauge(
126+
name = "akka.cluster.members.up.count",
127+
description = "Tracks the number of cluster members in the Up state"
128+
)
129+
130+
val ClusterMembersLeaving = Kamon.gauge(
131+
name = "akka.cluster.members.leaving.count",
132+
description = "Tracks the number of cluster members in the Leaving state"
133+
)
134+
135+
val ClusterMembersExiting = Kamon.gauge(
136+
name = "akka.cluster.members.exiting.count",
137+
description = "Tracks the number of cluster members in the Exiting state"
138+
)
139+
140+
val ClusterMembersDown = Kamon.gauge(
141+
name = "akka.cluster.members.down.count",
142+
description = "Tracks the number of cluster members in the Down state"
143+
)
144+
145+
val ClusterMembersRemoved = Kamon.gauge(
146+
name = "akka.cluster.members.removed.count",
147+
description = "Tracks the number of cluster members in the Removed state"
148+
)
149+
150+
val ClusterMembersTotal = Kamon.gauge(
151+
name = "akka.cluster.members.total.count",
152+
description = "Tracks the total number of cluster members, without including Removed members"
153+
)
154+
155+
val ClusterMembersUnreachable = Kamon.gauge(
156+
name = "akka.cluster.members.unreachable.count",
157+
description = "Tracks the total number of cluster members marked as unreachable"
158+
)
159+
160+
val ClusterDatacentersUnreachable = Kamon.gauge(
161+
name = "akka.cluster.datacenters.unreachable.count",
162+
description = "Tracks the total number of cluster members marked as unreachable"
163+
)
164+
165+
val ClusterMemberStatus = Kamon.gauge(
166+
name = "akka.cluster.members.status",
167+
description = "Tracks the current status of all monitored nodes by a cluster member"
168+
)
169+
170+
val ClusterMemberReachability = Kamon.gauge(
171+
name = "akka.cluster.members.reachability",
172+
description = "Tracks the current reachability status of all monitored nodes by a cluster member"
173+
)
174+
}

project/plugins.sbt

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.10")
88
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.6")
99
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2")
1010
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.3.4")
11-
addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.4")
1211
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.2.0")
1312
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.8.1")
1413
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")

0 commit comments

Comments
 (0)