From b315cb7c80cd1f9f7972abc88086938b33ac9464 Mon Sep 17 00:00:00 2001 From: Massimo Siani Date: Sun, 24 Jul 2022 19:20:09 +0200 Subject: [PATCH] add a client implementation --- build.sbt | 1 + docs/index.md | 36 ++++++++++-- .../vanilla/HttpClientSingleRequest.scala | 56 +++++++++++++++++++ ...{Main.scala => VanillaAkkaHttpRoute.scala} | 36 ++++++------ .../scala/natchez/akka/http/AkkaRequest.scala | 5 ++ .../natchez/akka/http/NatchezAkkaHttp.scala | 46 +++++++++++++++ 6 files changed, 157 insertions(+), 23 deletions(-) create mode 100644 examples/vanilla-akka/shared/src/main/scala/natchez/akka/http/examples/vanilla/HttpClientSingleRequest.scala rename examples/vanilla-akka/shared/src/main/scala/natchez/akka/http/examples/vanilla/{Main.scala => VanillaAkkaHttpRoute.scala} (53%) diff --git a/build.sbt b/build.sbt index e0201ff..e9a29e5 100644 --- a/build.sbt +++ b/build.sbt @@ -32,6 +32,7 @@ lazy val core = crossProject(JVMPlatform) name := "natchez-akka-http", description := "Integration for Natchez and Akka Http", libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-actor" % akka % Optional, "com.typesafe.akka" %% "akka-http" % akkaHttp % Optional, "org.tpolecat" %%% "natchez-core" % natchez, ), diff --git a/docs/index.md b/docs/index.md index 57e495e..1c53e1d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -4,7 +4,7 @@ An integration library for Natchez and Akka Http. -Only a server middleware has been implemented so far. +Inspired by [natchez-http4s](https://github.com/tpolecat/natchez-http4s). ## Installation @@ -13,14 +13,17 @@ Add the following to your `build.sbt` libraryDependencies ++= Seq("io.github.massimosiani" %% "natchez-akka-http" % ) ``` -## Plain Akka Http +## Server side + +### Plain Akka Http If you only have the routes, then follow the example in the -[vanilla akka http example](https://github.com/massimosiani/natchez-akka-http/tree/main/examples/vanilla-akka). +[VanillaAkkaHttpRoute class](https://github.com/massimosiani/natchez-akka-http/tree/main/examples/vanilla-akka). If the request contains a kernel, the entry point will create a continuation, otherwise a root span will be created. -Run `sbt exampleVanillaAkkaJVM/run`, open a browser and go to `localhost:8080/hello`. +Run `sbt "exampleVanillaAkkaJVM/runMain natchez.akka.http.examples.vanilla.VanillaAkkaHttpRoute"`, +open a browser and go to `localhost:8080/hello`. You should see something similar in your console. Notice that the `children` section will always be empty. @@ -42,14 +45,14 @@ Notice that the `children` section will always be empty. } ``` -## Services built with cats effect IO or tagless final +### Services built with cats effect IO or tagless final If you can build your services using cats effect `IO` or tagless final style, e.g. using [tapir](https://tapir.softwaremill.com/en/latest/), build the corresponding services with a `Trace` constraint in scope. In this case, the `children` section will be filled. -### Example: tapir +#### Example: tapir The main idea is writing all services forgetting about `Future`, and mapping the `IO` to `Future` at the very end. @@ -84,3 +87,24 @@ A full, working example can be found in the [tapir example](https://github.com/m ] } ``` + +## Client side + +For an example, see [HttpClientSingleRequest class](https://github.com/massimosiani/natchez-akka-http/tree/main/examples/vanilla-akka). +If you run `sbt "exampleVanillaAkkaJVM/runMain natchez.akka.http.examples.vanilla.HttpClientSingleRequest"`, +you should see something similar in your console: + +```json +{ + "name" : "example-http-client-single-request-root-span", + "service" : "example-http-client-single-request", + "timestamp" : "2022-07-24T17:07:51.490422799Z", + "duration_ms" : 6, + "trace.span_id" : "ce1973e4-eed5-4e38-9201-caae09ea7b1d", + "trace.parent_id" : null, + "trace.trace_id" : "b10a567d-0d80-4ddd-b70d-4e7f00146256", + "exit.case" : "succeeded", + "children" : [ + ] +} +``` diff --git a/examples/vanilla-akka/shared/src/main/scala/natchez/akka/http/examples/vanilla/HttpClientSingleRequest.scala b/examples/vanilla-akka/shared/src/main/scala/natchez/akka/http/examples/vanilla/HttpClientSingleRequest.scala new file mode 100644 index 0000000..3e74c70 --- /dev/null +++ b/examples/vanilla-akka/shared/src/main/scala/natchez/akka/http/examples/vanilla/HttpClientSingleRequest.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2022 Massimo Siani + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package natchez.akka.http.examples.vanilla + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import natchez.akka.http.NatchezAkkaHttp +import natchez.log.Log +import natchez.{EntryPoint, Trace} +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + +object HttpClientSingleRequest { + def main(args: Array[String]): Unit = { + implicit val system: ActorSystem = ActorSystem("SingleRequest") + implicit val executionContext: ExecutionContext = system.dispatcher + + // usually you already have these defined + implicit val log: Logger[IO] = Slf4jLogger.getLoggerFromName("example-logger") + val ep: EntryPoint[IO] = Log.entryPoint[IO]("example-http-client-single-request") + implicit val trace: Trace[IO] = ep + .root("example-http-client-single-request-root-span") + .use { rootSpan => + Trace.ioTrace(rootSpan) + } + .unsafeRunSync() + + // send your request. Note that you get a Future back (plain akka) + val responseFuture: Future[HttpResponse] = + NatchezAkkaHttp.clientSingleRequest(HttpRequest(uri = "http://akka.io")) + + responseFuture.onComplete { + case Success(res) => println(res) + case Failure(_) => sys.error("something wrong") + } + } +} diff --git a/examples/vanilla-akka/shared/src/main/scala/natchez/akka/http/examples/vanilla/Main.scala b/examples/vanilla-akka/shared/src/main/scala/natchez/akka/http/examples/vanilla/VanillaAkkaHttpRoute.scala similarity index 53% rename from examples/vanilla-akka/shared/src/main/scala/natchez/akka/http/examples/vanilla/Main.scala rename to examples/vanilla-akka/shared/src/main/scala/natchez/akka/http/examples/vanilla/VanillaAkkaHttpRoute.scala index 87c4c4f..5bad65b 100644 --- a/examples/vanilla-akka/shared/src/main/scala/natchez/akka/http/examples/vanilla/Main.scala +++ b/examples/vanilla-akka/shared/src/main/scala/natchez/akka/http/examples/vanilla/VanillaAkkaHttpRoute.scala @@ -32,25 +32,27 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import scala.concurrent.ExecutionContextExecutor import scala.io.StdIn -object Main extends App { - // adapted from the Akka Http documentation - implicit private val system: ActorSystem = ActorSystem("my-system") - implicit private val executionContext: ExecutionContextExecutor = system.dispatcher - - private val route = - path("hello") { - get { - complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "

Say hello to akka-http

")) +object VanillaAkkaHttpRoute { + def main(args: Array[String]): Unit = { + // adapted from the Akka Http documentation + implicit val system: ActorSystem = ActorSystem("my-system") + implicit val executionContext: ExecutionContextExecutor = system.dispatcher + + val route = + path("hello") { + get { + complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "

Say hello to akka-http

")) + } } - } - implicit private val log: Logger[IO] = Slf4jLogger.getLoggerFromName("example-logger") - private val ep: EntryPoint[IO] = Log.entryPoint[IO]("example-service") - private val tracedRoute: Route = NatchezAkkaHttp.legacyServer(ep)(route)(IORuntime.global) + implicit val log: Logger[IO] = Slf4jLogger.getLoggerFromName("example-logger") + val ep: EntryPoint[IO] = Log.entryPoint[IO]("example-service") + val tracedRoute: Route = NatchezAkkaHttp.legacyServer(ep)(route)(IORuntime.global) - private val bindingFuture = Http().newServerAt("localhost", 8080).bind(tracedRoute) + val bindingFuture = Http().newServerAt("localhost", 8080).bind(tracedRoute) - println("Server now online. Please navigate to http://localhost:8080/hello\nPress RETURN to stop...") - StdIn.readLine() - bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate()) + println("Server now online. Please navigate to http://localhost:8080/hello\nPress RETURN to stop...") + StdIn.readLine() + bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate()) + } } diff --git a/natchez-akka-http/shared/src/main/scala/natchez/akka/http/AkkaRequest.scala b/natchez-akka-http/shared/src/main/scala/natchez/akka/http/AkkaRequest.scala index 72abc6c..b013bb6 100644 --- a/natchez-akka-http/shared/src/main/scala/natchez/akka/http/AkkaRequest.scala +++ b/natchez-akka-http/shared/src/main/scala/natchez/akka/http/AkkaRequest.scala @@ -17,9 +17,14 @@ package natchez.akka.http import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.headers.RawHeader import natchez.Kernel object AkkaRequest { + private[http] def withKernelHeaders(request: HttpRequest, kernel: Kernel): HttpRequest = request.withHeaders( + kernel.toHeaders.map { case (k, v) => RawHeader(k, v) }.toSeq ++ request.headers + ) // prioritize request headers over kernel ones + private[http] def toKernel(request: HttpRequest): Kernel = { val headers = request.headers val traceId = "X-Natchez-Trace-Id" diff --git a/natchez-akka-http/shared/src/main/scala/natchez/akka/http/NatchezAkkaHttp.scala b/natchez-akka-http/shared/src/main/scala/natchez/akka/http/NatchezAkkaHttp.scala index baff801..f767076 100644 --- a/natchez-akka-http/shared/src/main/scala/natchez/akka/http/NatchezAkkaHttp.scala +++ b/natchez-akka-http/shared/src/main/scala/natchez/akka/http/NatchezAkkaHttp.scala @@ -16,6 +16,8 @@ package natchez.akka.http +import akka.actor.ClassicActorSystemProvider +import akka.http.scaladsl.Http import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.http.scaladsl.server.RouteResult.{Complete, Rejected} import akka.http.scaladsl.server.{RequestContext, Route} @@ -28,8 +30,42 @@ import cats.syntax.all.* import natchez.* import natchez.akka.http.AkkaRequest.toKernel +import scala.concurrent.Future + object NatchezAkkaHttp { + /** Adds the current span's kernel to the outgoing request, performs the request in a span called + * `akka-http-client-request`, and adds the following fields to that span. + * - "client.http.method" -> "GET", "PUT", etc. + * - "client.http.uri" -> request URI + * - "client.http.status_code" -> "200", "403", etc. + */ + def clientSingleRequest( + request: HttpRequest + )(implicit T: Trace[IO], as: ClassicActorSystemProvider, ioRuntime: IORuntime): Future[HttpResponse] = T + .span("akka-http-client-request") { + for { + kernel <- T.kernel + _ <- T.put("client.http.uri" -> request.uri.path.toString(), "client.http.method" -> request.method.name) + sendingReq = AkkaRequest.withKernelHeaders(request, kernel) + res <- IO.fromFuture(IO.delay(Http().singleRequest(sendingReq))) + _ <- T.put("client.http.status_code" -> res.status.intValue().toString) + } yield res + } + .unsafeToFuture() + + /** Does not allow to use the current span beyond the http layer. + * + * Adds the following standard fields to the current span: + * - "http.method" -> "GET", "PUT", etc. + * - "http.url" -> request URI (not URL) + * - "http.status_code" -> "200", "403", etc. + * - "error" -> true // only present in case of error + * + * In addition the following non-standard fields are added in case of error: + * - "error.message" -> Exception message + * - "cancelled" -> true // only present in case of cancellation + */ def legacyServer(entryPoint: EntryPoint[IO])(routes: Route)(implicit ioRuntime: IORuntime): Route = { requestContext => val request = requestContext.request @@ -40,6 +76,16 @@ object NatchezAkkaHttp { .unsafeToFuture() } + /** Adds the following standard fields to the current span: + * - "http.method" -> "GET", "PUT", etc. + * - "http.url" -> request URI (not URL) + * - "http.status_code" -> "200", "403", etc. + * - "error" -> true // only present in case of error + * + * In addition the following non-standard fields are added in case of error: + * - "error.message" -> Exception message + * - "cancelled" -> true // only present in case of cancellation + */ def server[F[_]: Async: Trace](routes: F[Route])(implicit D: Dispatcher[F]): F[Route] = Sync[F].delay { (requestContext: RequestContext) => D.unsafeToFuture(routes.flatMap(route => add(route, requestContext))) }