Skip to content

Commit 5ee022b

Browse files
committed
ProducerMessage instrumentation for Alpakka + Kafka applications
1 parent 3f0ff88 commit 5ee022b

File tree

4 files changed

+102
-2
lines changed

4 files changed

+102
-2
lines changed

build.sbt

+20-2
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ val instrumentationProjects = Seq[ProjectReference](
143143
`kamon-caffeine`,
144144
`kamon-lagom`,
145145
`kamon-finagle`,
146-
`kamon-aws-sdk`
146+
`kamon-aws-sdk`,
147+
`kamon-alpakka-kafka`
147148
)
148149

149150
lazy val instrumentation = (project in file("instrumentation"))
@@ -526,7 +527,7 @@ lazy val `kamon-tapir` = (project in file("instrumentation/kamon-tapir"))
526527
.enablePlugins(JavaAgent)
527528
.settings(
528529
instrumentationSettings,
529-
crossScalaVersions := Seq("2.12.11", "2.13.1"),
530+
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
530531
libraryDependencies ++= Seq(
531532
kanelaAgent % "provided",
532533
"com.softwaremill.sttp.tapir" %% "tapir-core" % "0.17.9" % "provided",
@@ -615,6 +616,22 @@ lazy val `kamon-aws-sdk` = (project in file("instrumentation/kamon-aws-sdk"))
615616
)
616617
).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")
617618

619+
lazy val `kamon-alpakka-kafka` = (project in file("instrumentation/kamon-alpakka-kafka"))
620+
.disablePlugins(AssemblyPlugin)
621+
.enablePlugins(JavaAgent)
622+
.settings(instrumentationSettings)
623+
.settings(
624+
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
625+
libraryDependencies ++= Seq(
626+
kanelaAgent % "provided",
627+
"com.typesafe.akka" %% "akka-stream-kafka" % "2.1.1" % "provided",
628+
"com.typesafe.akka" %% "akka-stream" % "2.6.19" % "provided",
629+
630+
scalatest % "test",
631+
logbackClassic % "test"
632+
)
633+
).dependsOn(`kamon-core`, `kamon-akka`, `kamon-testkit` % "test")
634+
618635
/**
619636
* Reporters
620637
*/
@@ -912,6 +929,7 @@ lazy val `kamon-bundle-dependencies-2-12-and-up` = (project in file("bundle/kamo
912929
`kamon-akka-grpc`,
913930
`kamon-finagle`,
914931
`kamon-tapir`,
932+
`kamon-alpakka-kafka`
915933
)
916934

917935
lazy val `kamon-bundle` = (project in file("bundle/kamon-bundle"))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# ===================================== #
2+
# Kamon Alpakka Reference Configuration #
3+
# ===================================== #
4+
5+
kamon.instrumentation.alpakka {
6+
7+
}
8+
9+
kanela {
10+
modules {
11+
alpakka {
12+
13+
name = "Alpakka"
14+
description = "PREVIEW. Provides context propagation for Alpakka applications"
15+
instrumentations = [
16+
"kamon.instrumentation.alpakka.kafka.ProducerMessageInstrumentation"
17+
]
18+
19+
within = [
20+
"akka.kafka.ProducerMessage\\$Message",
21+
"akka.kafka.ProducerMessage\\$MultiMessage",
22+
"akka.kafka.internal.DefaultProducerStageLogic"
23+
]
24+
}
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* ==========================================================================================
3+
* Copyright © 2013-2022 The Kamon Project <https://kamon.io/>
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
6+
* except in compliance with the License. You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the
11+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12+
* either express or implied. See the License for the specific language governing permissions
13+
* and limitations under the License.
14+
* ==========================================================================================
15+
*/
16+
17+
package kamon
18+
package instrumentation
19+
package alpakka
20+
package kafka
21+
22+
import kamon.Kamon
23+
import kamon.context.Storage
24+
import kamon.context.Storage.Scope
25+
import kamon.instrumentation.context.HasContext
26+
import kanela.agent.api.instrumentation.InstrumentationBuilder
27+
import kanela.agent.libs.net.bytebuddy.asm.Advice
28+
29+
class ProducerMessageInstrumentation extends InstrumentationBuilder {
30+
31+
/**
32+
* Captures the current context the a Message or MultiMessage is created and restores it while
33+
* the ProducerLogic is running, so the proper context gets propagated to the Kafka Producer.
34+
*/
35+
onTypes("akka.kafka.ProducerMessage$Message", "akka.kafka.ProducerMessage$MultiMessage")
36+
.mixin(classOf[HasContext.MixinWithInitializer])
37+
38+
onTypes("akka.kafka.internal.DefaultProducerStageLogic", "akka.kafka.internal.CommittingProducerSinkStageLogic")
39+
.advise(method("produce"), ProduceWithEnvelopeContext)
40+
}
41+
42+
object ProduceWithEnvelopeContext {
43+
44+
@Advice.OnMethodEnter
45+
def enter(@Advice.Argument(0) envelope: Any): Storage.Scope = {
46+
envelope match {
47+
case hasContext: HasContext => Kamon.storeContext(hasContext.context)
48+
case _ => Scope.Empty
49+
}
50+
}
51+
52+
@Advice.OnMethodExit(onThrowable = classOf[Throwable])
53+
def exit(@Advice.Enter scope: Storage.Scope): Unit =
54+
scope.close()
55+
}

project/Build.scala

+1
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ object AssemblyTweaks extends AutoPlugin {
272272
assembly / assemblyMergeStrategy := {
273273
case s if s.startsWith("LICENSE") => MergeStrategy.discard
274274
case s if s.startsWith("about") => MergeStrategy.discard
275+
case "version.conf" => MergeStrategy.concat
275276
case x => (assembly / assemblyMergeStrategy).value(x)
276277
}
277278
) ++ inConfig(Shaded)(Defaults.configSettings)

0 commit comments

Comments
 (0)