diff --git a/build.sbt b/build.sbt index 8d464f8..1519042 100644 --- a/build.sbt +++ b/build.sbt @@ -13,20 +13,20 @@ * ========================================================================================= */ -val kamonCore = "io.kamon" %% "kamon-core" % "2.1.0" -val kamonTestkit = "io.kamon" %% "kamon-testkit" % "2.1.0" -val kamonCommon = "io.kamon" %% "kamon-instrumentation-common" % "2.1.0" +val kamonCore = "io.kamon" %% "kamon-core" % "2.2.3" +val kamonTestkit = "io.kamon" %% "kamon-testkit" % "2.2.3" +val kamonCommon = "io.kamon" %% "kamon-instrumentation-common" % "2.2.3" -val server = "org.http4s" %% "http4s-blaze-server" % "0.21.3" -val client = "org.http4s" %% "http4s-blaze-client" % "0.21.3" -val dsl = "org.http4s" %% "http4s-dsl" % "0.21.3" +val server = "org.http4s" %% "http4s-blaze-server" % "0.23.0-RC1" +val client = "org.http4s" %% "http4s-blaze-client" % "0.23.0-RC1" +val dsl = "org.http4s" %% "http4s-dsl" % "0.23.0-RC1" lazy val root = (project in file(".")) .settings(Seq( name := "kamon-http4s", - scalaVersion := "2.13.1", - crossScalaVersions := Seq("2.12.11", "2.13.1"))) + scalaVersion := "2.13.6", + crossScalaVersions := Seq("2.12.14", "2.13.6"))) .settings(resolvers += Resolver.bintrayRepo("kamon-io", "snapshots")) .settings(resolvers += Resolver.mavenLocal) .settings(scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match { diff --git a/project/build.properties b/project/build.properties index 06703e3..10fd9ee 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.9 +sbt.version=1.5.5 diff --git a/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala b/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala index ca802f2..c302cfe 100644 --- a/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala +++ b/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala @@ -38,13 +38,10 @@ object KamonSupport { Kamon.onReconfigure(newConfig => _instrumentation = instrumentation(newConfig)) - - def apply[F[_]](underlying: Client[F])(implicit F:Sync[F]): Client[F] = Client { request => - - for { - ctx <- Resource.liftF(F.delay(Kamon.currentContext())) - k <- kamonClient(underlying)(request)(ctx)(_instrumentation) - } yield k + def apply[F[_]](underlying: Client[F])(implicit F: Sync[F]): Client[F] = Client { request => + // this needs to run on the same thread as the caller, so can't be suspended in F + val ctx = Kamon.currentContext() + kamonClient(underlying)(request)(ctx)(_instrumentation) } @@ -54,9 +51,9 @@ object KamonSupport { (instrumentation: HttpClientInstrumentation) (implicit F:Sync[F]): Resource[F, Response[F]] = for { - requestHandler <- Resource.liftF(F.delay(instrumentation.createHandler(getRequestBuilder(request), ctx))) + requestHandler <- Resource.eval(F.delay(instrumentation.createHandler(getRequestBuilder(request), ctx))) response <- underlying.run(requestHandler.request).attempt - trackedResponse <- Resource.liftF(handleResponse(response, requestHandler)) + trackedResponse <- Resource.eval(handleResponse(response, requestHandler)) } yield trackedResponse def handleResponse[F[_]]( diff --git a/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala b/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala index a47f121..f88ae0f 100644 --- a/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala +++ b/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala @@ -56,7 +56,7 @@ object KamonSupport { private def getHandler[F[_]](instrumentation: HttpServerInstrumentation)(request: Request[F])(implicit F: Sync[F]): Resource[F, RequestHandler] = for { - handler <- Resource.liftF(F.delay(instrumentation.createHandler(buildRequestMessage(request)))) + handler <- Resource.eval(F.delay(instrumentation.createHandler(buildRequestMessage(request)))) _ <- processRequest(handler) _ <- withContext(handler) } yield handler diff --git a/src/main/scala/kamon/http4s/package.scala b/src/main/scala/kamon/http4s/package.scala index f08d786..9da40b5 100644 --- a/src/main/scala/kamon/http4s/package.scala +++ b/src/main/scala/kamon/http4s/package.scala @@ -3,7 +3,7 @@ package kamon import org.http4s.{Header, Headers, Request, Response, Status} import kamon.instrumentation.http.HttpMessage import kamon.instrumentation.http.HttpMessage.ResponseBuilder -import org.http4s.util.CaseInsensitiveString +import org.typelevel.ci.CIString package object http4s { @@ -11,7 +11,7 @@ package object http4s { def buildRequestMessage[F[_]](inner: Request[F]): HttpMessage.Request = new HttpMessage.Request { override def url: String = inner.uri.toString() - override def path: String = inner.uri.path + override def path: String = inner.uri.path.renderString override def method: String = inner.method.name @@ -19,11 +19,11 @@ package object http4s { override def port: Int = inner.uri.authority.flatMap(_.port).getOrElse(0) - override def read(header: String): Option[String] = inner.headers.get(CaseInsensitiveString(header)).map(_.value) + override def read(header: String): Option[String] = inner.headers.get(CIString(header)).map(_.head.value) override def readAll(): Map[String, String] = { val builder = Map.newBuilder[String, String] - inner.headers.foreach(h => builder += (h.name.value -> h.value)) + inner.headers.foreach(h => builder += (h.name.toString -> h.value)) builder.result() } } @@ -31,7 +31,7 @@ package object http4s { def errorResponseBuilder[F[_]]: HttpMessage.ResponseBuilder[Response[F]] = new ResponseBuilder[Response[F]] { override def write(header: String, value: String): Unit = () override def statusCode: Int = 500 - override def build(): Response[F] = new Response[F](status = Status.InternalServerError) + override def build(): Response[F] = Response[F](status = Status.InternalServerError) } //TODO both of these @@ -39,13 +39,13 @@ package object http4s { private var _headers = Headers.empty override def write(header: String, value: String): Unit = - _headers = _headers.put(Header(header, value)) + _headers = _headers.put(Header.Raw(CIString(header), value)) override def statusCode: Int = 404 - override def build(): Response[F] = new Response[F](status = Status.NotFound, headers = _headers) + override def build(): Response[F] = Response[F](status = Status.NotFound, headers = _headers) } - def getResponseBuilder[F[_]](response: Response[F]) = new HttpMessage.ResponseBuilder[Response[F]] { + def getResponseBuilder[F[_]](response: Response[F]): ResponseBuilder[Response[F]] = new HttpMessage.ResponseBuilder[Response[F]] { private var _headers = response.headers override def statusCode: Int = response.status.code @@ -53,7 +53,7 @@ package object http4s { override def build(): Response[F] = response.withHeaders(_headers) override def write(header: String, value: String): Unit = - _headers = _headers.put(Header(header, value)) + _headers = _headers.put(Header.Raw(CIString(header), value)) } @@ -63,11 +63,11 @@ package object http4s { override def build(): Request[F] = request.withHeaders(_headers) override def write(header: String, value: String): Unit = - _headers = _headers.put(Header(header, value)) + _headers = _headers.put(Header.Raw(CIString(header), value)) override def url: String = request.uri.toString() - override def path: String = request.uri.path + override def path: String = request.uri.path.renderString override def method: String = request.method.name @@ -75,11 +75,11 @@ package object http4s { override def port: Int = request.uri.authority.flatMap(_.port).getOrElse(0) - override def read(header: String): Option[String] = _headers.get(CaseInsensitiveString(header)).map(_.value) + override def read(header: String): Option[String] = _headers.get(CIString(header)).map(_.head.value) override def readAll(): Map[String, String] = { val builder = Map.newBuilder[String, String] - request.headers.foreach(h => builder += (h.name.value -> h.value)) + request.headers.foreach(h => builder += (h.name.toString -> h.value)) builder.result() } } diff --git a/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala b/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala index 16b8112..e65ba09 100644 --- a/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala +++ b/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala @@ -16,22 +16,22 @@ package kamon.http4s -import java.net.ConnectException - +import cats.effect.unsafe.implicits.global import cats.effect.{IO, Resource} import kamon.Kamon import kamon.http4s.middleware.client.KamonSupport +import kamon.tag.Lookups.{plain, plainLong} import kamon.testkit.TestSpanReporter import kamon.trace.Span -import org.http4s.{HttpRoutes, Response} import org.http4s.client._ import org.http4s.dsl.io._ import org.http4s.implicits._ +import org.http4s.{HttpRoutes, Response} import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec} -import kamon.tag.Lookups.{plainLong, plain} +import java.net.ConnectException class ClientInstrumentationSpec extends WordSpec with Matchers @@ -57,7 +57,7 @@ class ClientInstrumentationSpec extends WordSpec client.expect[String]("/tracing/ok").unsafeRunSync() shouldBe "ok" } - eventually(timeout(2 seconds)) { + eventually(timeout(3 seconds)) { val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/ok" @@ -74,7 +74,7 @@ class ClientInstrumentationSpec extends WordSpec "close and finish a span even if an exception is thrown by the client" in { val okSpan = Kamon.spanBuilder("client-exception").start() val client: Client[IO] = KamonSupport[IO]( - Client(_ => Resource.liftF(IO.raiseError[Response[IO]](new ConnectException("Connection Refused.")))) + Client(_ => Resource.eval(IO.raiseError[Response[IO]](new ConnectException("Connection Refused.")))) ) Kamon.runWithSpan(okSpan) { @@ -83,7 +83,7 @@ class ClientInstrumentationSpec extends WordSpec } } - eventually(timeout(2 seconds)) { + eventually(timeout(3 seconds)) { val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/ok" span.kind shouldBe Span.Kind.Client @@ -102,7 +102,7 @@ class ClientInstrumentationSpec extends WordSpec client.expect[String]("/tracing/not-found").attempt.unsafeRunSync().isLeft shouldBe true } - eventually(timeout(2 seconds)) { + eventually(timeout(3 seconds)) { val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/not-found" span.kind shouldBe Span.Kind.Client @@ -124,7 +124,7 @@ class ClientInstrumentationSpec extends WordSpec client.expect[String]("/tracing/error").attempt.unsafeRunSync().isLeft shouldBe true } - eventually(timeout(2 seconds)) { + eventually(timeout(3 seconds)) { val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/error" diff --git a/src/test/scala/kamon/http4s/HttpMetricsSpec.scala b/src/test/scala/kamon/http4s/HttpMetricsSpec.scala index 1509476..c5cb9b0 100644 --- a/src/test/scala/kamon/http4s/HttpMetricsSpec.scala +++ b/src/test/scala/kamon/http4s/HttpMetricsSpec.scala @@ -17,22 +17,21 @@ package kamon.http4s import cats.effect._ +import cats.effect.unsafe.implicits.global +import cats.implicits._ +import kamon.http4s.middleware.server.KamonSupport +import kamon.instrumentation.http.HttpServerMetrics import kamon.testkit.InstrumentInspection import org.http4s.HttpRoutes +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.blaze.server.BlazeServerBuilder +import org.http4s.client.Client import org.http4s.dsl.io._ +import org.http4s.implicits._ import org.http4s.server.Server -import org.http4s.server.blaze.BlazeServerBuilder import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar import org.scalatest.{Matchers, OptionValues, WordSpec} -import cats.implicits._ -import kamon.http4s.middleware.server.KamonSupport -import kamon.instrumentation.http.HttpServerMetrics -import org.http4s.client.blaze.BlazeClientBuilder -import org.http4s.client.Client - -import scala.concurrent.ExecutionContext -import org.http4s.implicits._ class HttpMetricsSpec extends WordSpec with Matchers @@ -42,11 +41,8 @@ class HttpMetricsSpec extends WordSpec with OptionValues { - implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - val srv = - BlazeServerBuilder[IO] + BlazeServerBuilder[IO](global.compute) .bindLocal(43567) .withHttpApp(KamonSupport(HttpRoutes.of[IO] { case GET -> Root / "tracing" / "ok" => Ok("ok") @@ -56,16 +52,16 @@ class HttpMetricsSpec extends WordSpec .resource val client = - BlazeClientBuilder[IO](ExecutionContext.global).withMaxTotalConnections(10).resource + BlazeClientBuilder[IO](global.compute).withMaxTotalConnections(10).resource val metrics = - Resource.liftF(IO(HttpServerMetrics.of("http4s.server", "/127.0.0.1", 43567))) + Resource.eval(IO(HttpServerMetrics.of("http4s.server", "/127.0.0.1", 43567))) - def withServerAndClient[A](f: (Server[IO], Client[IO], HttpServerMetrics.HttpServerInstruments) => IO[A]): A = + def withServerAndClient[A](f: (Server, Client[IO], HttpServerMetrics.HttpServerInstruments) => IO[A]): A = (srv, client, metrics).tupled.use(f.tupled).unsafeRunSync() - private def get[F[_]: Sync](path: String)(server: Server[F], client: Client[F]): F[String] = { + private def get[F[_]: Concurrent](path: String)(server: Server, client: Client[F]): F[String] = { client.expect[String](s"http://127.0.0.1:${server.address.getPort}$path") } @@ -88,7 +84,7 @@ class HttpMetricsSpec extends WordSpec "track the response time with status code 2xx" in withServerAndClient { (server, client, serverMetrics) => val requests: IO[Unit] = List.fill(100)(get("/tracing/ok")(server, client)).sequence_ - val test = IO(serverMetrics.requestsSuccessful.value should be >= 0L) + val test = IO(serverMetrics.requestsSuccessful.value() should be >= 0L) requests *> test } @@ -96,7 +92,7 @@ class HttpMetricsSpec extends WordSpec "track the response time with status code 4xx" in withServerAndClient { (server, client, serverMetrics) => val requests: IO[Unit] = List.fill(100)(get("/tracing/not-found")(server, client).attempt).sequence_ - val test = IO(serverMetrics.requestsClientError.value should be >= 0L) + val test = IO(serverMetrics.requestsClientError.value() should be >= 0L) requests *> test } @@ -104,7 +100,7 @@ class HttpMetricsSpec extends WordSpec "track the response time with status code 5xx" in withServerAndClient { (server, client, serverMetrics) => val requests: IO[Unit] = List.fill(100)(get("/tracing/error")(server, client).attempt).sequence_ - val test = IO(serverMetrics.requestsServerError.value should be >= 0L) + val test = IO(serverMetrics.requestsServerError.value() should be >= 0L) requests *> test } diff --git a/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala b/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala index ef4e568..04e4446 100644 --- a/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala +++ b/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala @@ -16,25 +16,24 @@ package kamon.http4s -import cats.effect.{ContextShift, IO, Sync, Timer} +import cats.effect.unsafe.implicits.global +import cats.effect.{Concurrent, IO} +import cats.implicits._ import kamon.http4s.middleware.server.KamonSupport +import kamon.tag.Lookups.{plain, plainLong} +import kamon.testkit.TestSpanReporter import kamon.trace.Span -import org.http4s.{Headers, HttpRoutes} +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.client.Client -import org.http4s.client.blaze.BlazeClientBuilder import org.http4s.dsl.io._ -import org.http4s.server.{Server} -import org.http4s.server.blaze.BlazeServerBuilder +import org.http4s.implicits._ +import org.http4s.server.Server +import org.http4s.{Headers, HttpRoutes} import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec} - -import scala.concurrent.ExecutionContext -import org.http4s.implicits._ -import cats.implicits._ -import kamon.testkit.TestSpanReporter -import kamon.tag.Lookups.{plain, plainLong} -import org.http4s.util.CaseInsensitiveString +import org.typelevel.ci.CIString class ServerInstrumentationSpec extends WordSpec with Matchers @@ -44,13 +43,9 @@ class ServerInstrumentationSpec extends WordSpec with TestSpanReporter with BeforeAndAfterAll { - implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - val srv = - BlazeServerBuilder[IO] + BlazeServerBuilder[IO](global.compute) .bindAny() - .withExecutionContext(ExecutionContext.global) .withHttpApp(KamonSupport(HttpRoutes.of[IO] { case GET -> Root / "tracing" / "ok" => Ok("ok") case GET -> Root / "tracing" / "error" => InternalServerError("error!") @@ -60,22 +55,21 @@ class ServerInstrumentationSpec extends WordSpec ,"", 0).orNotFound) .resource - val client = - BlazeClientBuilder[IO](ExecutionContext.global).resource + val client = BlazeClientBuilder[IO](global.compute).resource - def withServerAndClient[A](f: (Server[IO], Client[IO]) => IO[A]): A = + def withServerAndClient[A](f: (Server, Client[IO]) => IO[A]): A = (srv, client).tupled.use(f.tupled).unsafeRunSync() - private def getResponse[F[_]: Sync](path: String)(server: Server[F], client: Client[F]): F[(String, Headers)] = { - client.get(s"http://127.0.0.1:${server.address.getPort}$path"){ r => - r.bodyAsText.compile.toList.map(_.mkString).map(_ -> r.headers) + private def getResponse[F[_]: Concurrent](path: String)(server: Server, client: Client[F]): F[(String, Headers)] = { + client.get(s"http://127.0.0.1:${server.address.getPort}$path") { r => + r.bodyText.compile.toList.map(_.mkString).map(_ -> r.headers) } } "The Server instrumentation" should { "propagate the current context and respond to the ok action" in withServerAndClient { (server, client) => val request = getResponse("/tracing/ok")(server, client).map { case (body, headers) => - headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true + headers.get(CIString("trace-id")).nonEmpty shouldBe true body should startWith("ok") } @@ -96,12 +90,12 @@ class ServerInstrumentationSpec extends WordSpec "propagate the current context and respond to the not-found action" in withServerAndClient { (server, client) => val request = getResponse("/tracing/not-found")(server, client).map { case (body, headers) => - headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true + headers.get(CIString("trace-id")).nonEmpty shouldBe true } val test = IO { eventually(timeout(5.seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value span.operationName shouldBe "unhandled" span.kind shouldBe Span.Kind.Server @@ -116,13 +110,13 @@ class ServerInstrumentationSpec extends WordSpec "propagate the current context and respond to the error action" in withServerAndClient { (server, client) => val request = getResponse("/tracing/error")(server, client).map { case (body, headers) => - headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true + headers.get(CIString("trace-id")).nonEmpty shouldBe true body should startWith("error!") } val test = IO { eventually(timeout(5.seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/error" span.kind shouldBe Span.Kind.Server @@ -139,13 +133,13 @@ class ServerInstrumentationSpec extends WordSpec val request = getResponse("/tracing/errorinternal")(server, client) /* TODO serviceErrorHandler kicks in and rewrites response, loosing trace information .map { case (body, headers) => - headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true + headers.get(CIString("trace-id")).nonEmpty shouldBe true } */ val test = IO { eventually(timeout(5.seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/errorinternal" span.kind shouldBe Span.Kind.Server @@ -164,7 +158,7 @@ class ServerInstrumentationSpec extends WordSpec getResponse("/tracing/bazz/ok")(server, client) val test = IO { eventually(timeout(5.seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/:name/ok" span.kind shouldBe Span.Kind.Server