Skip to content

Commit abcdc63

Browse files
authored
Merge pull request #1 from kamon-io/master
merge master from kamon-io
2 parents c661016 + afd6c82 commit abcdc63

File tree

19 files changed

+573
-695
lines changed

19 files changed

+573
-695
lines changed

build.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -748,8 +748,8 @@ lazy val `kamon-opentelemetry` = (project in file("reporters/kamon-opentelemetry
748748
.settings(
749749
crossScalaVersions += `scala_3_version`,
750750
libraryDependencies ++= Seq(
751-
"io.opentelemetry" % "opentelemetry-proto" % "0.17.1",
752-
"io.grpc" % "grpc-netty" % "1.36.0",
751+
"io.opentelemetry" % "opentelemetry-exporter-otlp-http-trace" % "1.11.0",
752+
"io.opentelemetry" % "opentelemetry-exporter-otlp-trace" % "1.11.0",
753753

754754
scalatest % "test",
755755
logbackClassic % "test"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package kamon.instrumentation.akka.http;
2+
3+
import akka.NotUsed;
4+
import akka.http.scaladsl.model.HttpRequest;
5+
import akka.http.scaladsl.model.HttpResponse;
6+
import akka.stream.scaladsl.Flow;
7+
import kanela.agent.libs.net.bytebuddy.asm.Advice;
8+
9+
public class FlowOpsMapAsyncAdvice {
10+
11+
public static class EndpointInfo {
12+
public final String listenInterface;
13+
public final int listenPort;
14+
15+
public EndpointInfo(String listenInterface, int listenPort) {
16+
this.listenInterface = listenInterface;
17+
this.listenPort = listenPort;
18+
}
19+
}
20+
21+
public static ThreadLocal<EndpointInfo> currentEndpoint = new ThreadLocal<>();
22+
23+
@Advice.OnMethodExit
24+
public static void onExit(@Advice.Return(readOnly = false) akka.stream.scaladsl.FlowOps returnedFlow) {
25+
EndpointInfo bindAndHandlerEndpoint = currentEndpoint.get();
26+
27+
if(bindAndHandlerEndpoint != null) {
28+
returnedFlow = ServerFlowWrapper.apply(
29+
(Flow<HttpRequest, HttpResponse, NotUsed>) returnedFlow,
30+
bindAndHandlerEndpoint.listenInterface,
31+
bindAndHandlerEndpoint.listenPort
32+
);
33+
}
34+
}
35+
}

instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Functio
2929
@Advice.Argument(1) String iface,
3030
@Advice.Argument(2) Integer port) {
3131

32+
FlowOpsMapAsyncAdvice.currentEndpoint.set(new FlowOpsMapAsyncAdvice.EndpointInfo(iface, port));
3233
handler = new Http2BlueprintInterceptor.HandlerWithEndpoint(iface, port, handler);
3334
}
35+
36+
@Advice.OnMethodExit
37+
public static void onExit() {
38+
FlowOpsMapAsyncAdvice.currentEndpoint.remove();
39+
}
3440
}

instrumentation/kamon-akka-http/src/main/resources/reference.conf

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,9 @@ kanela.modules {
245245

246246
within = [
247247
"akka.http.*",
248-
"akka.grpc.internal.*"
248+
"akka.grpc.internal.*",
249+
"akka.stream.scaladsl.Flow",
250+
"akka.stream.scaladsl.FlowOps"
249251
]
250252
}
251253
}

instrumentation/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,13 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {
7777
.intercept(method("redirect"), classOf[ResolveOperationNameOnRouteInterceptor])
7878
.intercept(method("failWith"), classOf[ResolveOperationNameOnRouteInterceptor])
7979

80+
/**
81+
* Support for HTTP/1 and HTTP/2 at the same time.
82+
*/
83+
84+
onType("akka.stream.scaladsl.Flow")
85+
.advise(method("mapAsync"), classOf[FlowOpsMapAsyncAdvice])
86+
8087
}
8188

8289
trait HasMatchingContext {

instrumentation/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import akka.NotUsed
4141
import akka.http.scaladsl.server.RouteResult.Rejected
4242
import akka.stream.scaladsl.Flow
4343
import kamon.context.Context
44-
import kanela.agent.libs.net.bytebuddy.asm.Advice
4544
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic
4645

4746
import scala.collection.immutable
@@ -103,6 +102,14 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {
103102
onType("akka.http.scaladsl.Http2Ext")
104103
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])
105104

105+
106+
/**
107+
* Support for HTTP/1 and HTTP/2 at the same time.
108+
*
109+
*/
110+
111+
onType("akka.stream.scaladsl.FlowOps")
112+
.advise(method("mapAsync"), classOf[FlowOpsMapAsyncAdvice])
106113
}
107114

108115
trait HasMatchingContext {

instrumentation/kamon-akka-http/src/main/scala-2.13/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import akka.NotUsed
2525
import akka.http.scaladsl.server.RouteResult.Rejected
2626
import akka.stream.scaladsl.Flow
2727
import kamon.context.Context
28-
import kanela.agent.libs.net.bytebuddy.asm.Advice
2928
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic
3029

3130
import scala.collection.immutable
@@ -86,6 +85,13 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {
8685
onType("akka.http.scaladsl.Http2Ext")
8786
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])
8887

88+
/**
89+
* Support for HTTP/1 and HTTP/2 at the same time.
90+
*
91+
*/
92+
93+
onType("akka.stream.scaladsl.FlowOps")
94+
.advise(method("mapAsync"), classOf[FlowOpsMapAsyncAdvice])
8995
}
9096

9197
trait HasMatchingContext {

instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import akka.stream.ActorMaterializer
2121
import kamon.tag.Lookups.{plain, plainBoolean, plainLong}
2222
import kamon.testkit._
2323
import kamon.trace.Span.Mark
24-
import okhttp3.{OkHttpClient, Request}
24+
import okhttp3.{OkHttpClient, Protocol, Request}
2525
import org.scalatest._
2626
import org.scalatest.concurrent.{Eventually, ScalaFutures}
2727
import org.scalatest.matchers.should.Matchers
@@ -30,6 +30,7 @@ import org.scalatest.wordspec.AnyWordSpecLike
3030
import java.util.UUID
3131
import javax.net.ssl.{HostnameVerifier, SSLSession}
3232
import scala.concurrent.duration._
33+
import scala.collection.JavaConverters._
3334

3435
class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with ScalaFutures with Inside with InitAndStopKamonAfterAll
3536
with MetricInspection.Syntax with Reconfigure with TestWebServer with Eventually with OptionValues with TestSpanReporter {
@@ -46,27 +47,35 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
4647
.hostnameVerifier(new HostnameVerifier { override def verify(s: String, sslSession: SSLSession): Boolean = true })
4748
.build()
4849

50+
val okHttp1ONly = new OkHttpClient.Builder()
51+
.sslSocketFactory(sslSocketFactory, trustManager)
52+
.protocols(List(Protocol.HTTP_1_1).asJava)
53+
.hostnameVerifier(new HostnameVerifier { override def verify(s: String, sslSession: SSLSession): Boolean = true })
54+
.build()
55+
4956
val timeoutTest: FiniteDuration = 5 second
5057
val interface = "127.0.0.1"
51-
val http1WebServer = startServer(interface, 8081, https = false)
52-
val http2WebServer = startServer(interface, 8082, https = true)
58+
val httpWebServer = startServer(interface, 8081, https = false)
59+
val httpsWebServer = startServer(interface, 8082, https = true)
5360

54-
testSuite("HTTP/1", http1WebServer)
55-
testSuite("HTTP/2", http2WebServer)
61+
testSuite("HTTP", httpWebServer, okHttp)
62+
testSuite("HTTPS", httpsWebServer, okHttp)
63+
testSuite("HTTPS with HTTP/1 only clients", httpsWebServer, okHttp1ONly)
5664

57-
def testSuite(httpVersion: String, server: WebServer) = {
65+
def testSuite(httpVersion: String, server: WebServer, client: OkHttpClient) = {
5866
val interface = server.interface
5967
val port = server.port
6068
val protocol = server.protocol
6169

6270
s"the Akka HTTP server instrumentation with ${httpVersion}" should {
6371
"create a server Span when receiving requests" in {
6472
val target = s"$protocol://$interface:$port/$dummyPathOk"
65-
okHttp.newCall(new Request.Builder().url(target).build()).execute()
73+
client.newCall(new Request.Builder().url(target).build()).execute()
74+
6675

6776
eventually(timeout(10 seconds)) {
6877
val span = testSpanReporter().nextSpan().value
69-
span.tags.get(plain("http.url")) shouldBe target
78+
span.tags.get(plain("http.url")) should endWith(s"$interface:$port/$dummyPathOk")
7079
span.metricTags.get(plain("component")) shouldBe "akka.http.server"
7180
span.metricTags.get(plain("http.method")) shouldBe "GET"
7281
span.metricTags.get(plainLong("http.status_code")) shouldBe 200L
@@ -76,7 +85,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
7685
"return the correct operation name with overloaded route" in {
7786
val target = s"$protocol://$interface:$port/some_endpoint"
7887

79-
okHttp.newCall(new Request.Builder()
88+
client.newCall(new Request.Builder()
8089
.get()
8190
.url(target).build())
8291
.execute()
@@ -89,7 +98,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
8998
val path = s"extraction/nested/42/fixed/anchor/32/${UUID.randomUUID().toString}/fixed/44/CafE"
9099
val expected = "/extraction/nested/{}/fixed/anchor/{}/{}/fixed/{}/{}"
91100
val target = s"$protocol://$interface:$port/$path"
92-
okHttp.newCall(new Request.Builder().url(target).build()).execute()
101+
client.newCall(new Request.Builder().url(target).build()).execute()
93102

94103
eventually(timeout(10 seconds)) {
95104
val span = testSpanReporter().nextSpan().value
@@ -101,7 +110,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
101110
val path = "extraction/segment/special**"
102111
val expected = "/extraction/segment/{}"
103112
val target = s"$protocol://$interface:$port/$path"
104-
val response = okHttp.newCall(new Request.Builder().url(target).build()).execute()
113+
val response = client.newCall(new Request.Builder().url(target).build()).execute()
105114

106115
response.code() shouldBe 200
107116
response.body().string() shouldBe "special**"
@@ -116,7 +125,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
116125
val path = "extraction/on-complete/42/more-path"
117126
val expected = "/extraction/on-complete/{}/more-path"
118127
val target = s"$protocol://$interface:$port/$path"
119-
okHttp.newCall(new Request.Builder().url(target).build()).execute()
128+
client.newCall(new Request.Builder().url(target).build()).execute()
120129

121130
eventually(timeout(10 seconds)) {
122131
val span = testSpanReporter().nextSpan().value
@@ -128,7 +137,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
128137
val path = "extraction/on-success/42/after"
129138
val expected = "/extraction/on-success/{}/after"
130139
val target = s"$protocol://$interface:$port/$path"
131-
okHttp.newCall(new Request.Builder().url(target).build()).execute()
140+
client.newCall(new Request.Builder().url(target).build()).execute()
132141

133142
eventually(timeout(10 seconds)) {
134143
val span = testSpanReporter().nextSpan().value
@@ -140,7 +149,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
140149
val path = "extraction/complete-or-recover-with/42/after"
141150
val expected = "/extraction/complete-or-recover-with/{}/after"
142151
val target = s"$protocol://$interface:$port/$path"
143-
okHttp.newCall(new Request.Builder().url(target).build()).execute()
152+
client.newCall(new Request.Builder().url(target).build()).execute()
144153

145154
eventually(timeout(10 seconds)) {
146155
val span = testSpanReporter().nextSpan().value
@@ -152,7 +161,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
152161
val path = "extraction/complete-or-recover-with-success/42/after"
153162
val expected = "/extraction/complete-or-recover-with-success/{}"
154163
val target = s"$protocol://$interface:$port/$path"
155-
okHttp.newCall(new Request.Builder().url(target).build()).execute()
164+
client.newCall(new Request.Builder().url(target).build()).execute()
156165

157166
eventually(timeout(10 seconds)) {
158167
val span = testSpanReporter().nextSpan().value
@@ -164,7 +173,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
164173
val path = s"v3/user/3/post/3"
165174
val expected = "/v3/user/{}/post/{}"
166175
val target = s"$protocol://$interface:$port/$path"
167-
okHttp.newCall(new Request.Builder().url(target).build()).execute()
176+
client.newCall(new Request.Builder().url(target).build()).execute()
168177

169178
eventually(timeout(10 seconds)) {
170179
val span = testSpanReporter().nextSpan().value
@@ -175,12 +184,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
175184

176185
"change the Span operation name when using the operationName directive" in {
177186
val target = s"$protocol://$interface:$port/$traceOk"
178-
okHttp.newCall(new Request.Builder().url(target).build()).execute()
187+
client.newCall(new Request.Builder().url(target).build()).execute()
179188

180189
eventually(timeout(10 seconds)) {
181190
val span = testSpanReporter().nextSpan().value
182191
span.operationName shouldBe "user-supplied-operation"
183-
span.tags.get(plain("http.url")) shouldBe target
192+
span.tags.get(plain("http.url")) should endWith(s"$interface:$port/$traceOk")
184193
span.metricTags.get(plain("component")) shouldBe "akka.http.server"
185194
span.metricTags.get(plain("http.method")) shouldBe "GET"
186195
span.metricTags.get(plainLong("http.status_code")) shouldBe 200L
@@ -189,12 +198,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
189198

190199
"mark spans as failed when request fails" in {
191200
val target = s"$protocol://$interface:$port/$dummyPathError"
192-
okHttp.newCall(new Request.Builder().url(target).build()).execute()
201+
client.newCall(new Request.Builder().url(target).build()).execute()
193202

194203
eventually(timeout(10 seconds)) {
195204
val span = testSpanReporter().nextSpan().value
196205
span.operationName shouldBe s"/$dummyPathError"
197-
span.tags.get(plain("http.url")) shouldBe target
206+
span.tags.get(plain("http.url")) should endWith(s"$interface:$port/$dummyPathError")
198207
span.metricTags.get(plain("component")) shouldBe "akka.http.server"
199208
span.metricTags.get(plain("http.method")) shouldBe "GET"
200209
span.metricTags.get(plainBoolean("error")) shouldBe true
@@ -204,12 +213,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
204213

205214
"change the operation name to 'unhandled' when the response status code is 404" in {
206215
val target = s"$protocol://$interface:$port/unknown-path"
207-
okHttp.newCall(new Request.Builder().url(target).build()).execute()
216+
client.newCall(new Request.Builder().url(target).build()).execute()
208217

209218
eventually(timeout(10 seconds)) {
210219
val span = testSpanReporter().nextSpan().value
211220
span.operationName shouldBe "unhandled"
212-
span.tags.get(plain("http.url")) shouldBe target
221+
span.tags.get(plain("http.url")) should endWith(s"$interface:$port/unknown-path")
213222
span.metricTags.get(plain("component")) shouldBe "akka.http.server"
214223
span.metricTags.get(plain("http.method")) shouldBe "GET"
215224
span.metricTags.get(plainBoolean("error")) shouldBe false
@@ -219,7 +228,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
219228

220229
"correctly time entity transfer timings" in {
221230
val target = s"$protocol://$interface:$port/$stream"
222-
okHttp.newCall(new Request.Builder().url(target).build()).execute()
231+
client.newCall(new Request.Builder().url(target).build()).execute()
223232

224233
val span = eventually(timeout(10 seconds)) {
225234
val span = testSpanReporter().nextSpan().value
@@ -231,14 +240,14 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
231240
case List(_ @ Mark(_, "http.response.ready")) =>
232241
}
233242

234-
span.tags.get(plain("http.url")) shouldBe target
243+
span.tags.get(plain("http.url")) should endWith(s"$interface:$port/$stream")
235244
span.metricTags.get(plain("component")) shouldBe "akka.http.server"
236245
span.metricTags.get(plain("http.method")) shouldBe "GET"
237246
}
238247

239248
"include the trace-id and keep all user-provided headers in the responses" in {
240249
val target = s"$protocol://$interface:$port/extra-header"
241-
val response = okHttp.newCall(new Request.Builder().url(target).build()).execute()
250+
val response = client.newCall(new Request.Builder().url(target).build()).execute()
242251

243252
response.headers().names() should contain allOf (
244253
"trace-id",
@@ -248,7 +257,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
248257

249258
"keep operation names provided by the HTTP Server instrumentation" in {
250259
val target = s"$protocol://$interface:$port/name-will-be-changed"
251-
okHttp.newCall(new Request.Builder().url(target).build()).execute()
260+
client.newCall(new Request.Builder().url(target).build()).execute()
252261

253262
eventually(timeout(10 seconds)) {
254263
val span = testSpanReporter().nextSpan().value
@@ -260,8 +269,8 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
260269

261270
override protected def afterAll(): Unit = {
262271
super.afterAll()
263-
http1WebServer.shutdown()
264-
http2WebServer.shutdown()
272+
httpWebServer.shutdown()
273+
httpsWebServer.shutdown()
265274
}
266275
}
267276

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
# Kamon OpenTelemetry Exporter
22
The exporter currently only provides a OpenTelemetry (OTLP) exporter for Kamon spans (metrics to be supported)
33

4-
The reporter relies on the [opentelemetry-proto](https://github.com/open-telemetry/opentelemetry-proto) library for the gRPC communication with an OpenTelemetry (OTLP) service.
4+
The reporter relies on the http/protobuf and grpc exporter modules from the [opentelemetry-java](https://github.com/open-telemetry/opentelemetry-java) project
55

66
## Trace Exporter
7-
Converts internal finished Kamon spans to OTEL proto format and exports them the to configured endpoint.
7+
Converts internal finished Kamon spans to OTEL proto format and exports them to the configured endpoint. See reference.conf for available configuration options
88

99
## Metrics Exporter
10-
To be implemented
10+
To be implemented

0 commit comments

Comments
 (0)