diff --git a/build.sbt b/build.sbt index 49aa1666..7fb889a7 100644 --- a/build.sbt +++ b/build.sbt @@ -23,7 +23,7 @@ lazy val docs = project .enablePlugins(Http4sOrgSitePlugin) .dependsOn(core) .settings(docsSettings) - .settings(libraryDependencies ++= blazeServer) + .settings(libraryDependencies ++= emberServer) ThisBuild / mergifyStewardConfig := Some( MergifyStewardConfig( @@ -35,21 +35,19 @@ ThisBuild / mergifyRequiredJobs += "site" ThisBuild / mergifyLabelPaths += "docs" -> file("docs") val catsV = "2.9.0" -val catsEffectV = "3.3.14" -val fs2V = "3.3.0" +val catsEffectV = "3.4.1" +val fs2V = "3.4.0" val scodecV = "1.1.34" val http4sV = "1.0.0-M37" val reactiveStreamsV = "1.0.4" val vaultV = "3.3.0" val caseInsensitiveV = "1.3.0" -val http4sBlazeV = "1.0.0-M36" val munitV = "1.0.0-M7" val munitCatsEffectV = "2.0.0-M3" -val javaWebsocketV = "1.5.3" -val blazeServer = Seq( - "org.http4s" %% "http4s-blaze-server" % http4sBlazeV, +val emberServer = Seq( + "org.http4s" %% "http4s-ember-server" % http4sV, "org.http4s" %% "http4s-dsl" % http4sV ) @@ -66,9 +64,8 @@ val coreDeps = Seq( "org.scodec" %% "scodec-bits" % scodecV, "org.typelevel" %% "vault" % vaultV, "org.typelevel" %% "case-insensitive" % caseInsensitiveV -) ++ (blazeServer ++ Seq( +) ++ (emberServer ++ Seq( "org.http4s" %% "http4s-client-testkit" % http4sV, - "org.java-websocket" % "Java-WebSocket" % javaWebsocketV, "org.scalameta" %% "munit" % munitV, "org.typelevel" %% "munit-cats-effect" % munitCatsEffectV )).map(_ % Test) @@ -89,15 +86,6 @@ ThisBuild / githubWorkflowJavaVersions := Seq("11", "17").map(JavaSpec.temurin(_ ThisBuild / tlCiReleaseBranches := Seq("main") ThisBuild / tlSitePublishBranch := Some("main") -ThisBuild / resolvers += "SOSSS".at("https://s01.oss.sonatype.org/content/repositories/snapshots") - -ThisBuild / libraryDependencySchemes ++= Seq( - "org.http4s" %% "http4s-core" % "always", - "org.http4s" %% "http4s-server" % "always", - "org.http4s" %% "http4s-blaze-core" % "always", - "org.http4s" %% "http4s-blaze-server" % "always" -) - lazy val docsSettings = Seq( tlSiteApiModule := Some((core / projectID).value), @@ -108,6 +96,7 @@ lazy val docsSettings = Versions( currentVersion = Version("1.x", "1.x"), olderVersions = Seq( + Version("0.8.x", "0.8"), Version("0.7.x", "0.7"), Version("0.6.x", "0.6.0-M7"), Version("0.5.x", "0.5.0"), diff --git a/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala b/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala index 8dfca0a7..e8ced211 100644 --- a/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala +++ b/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala @@ -18,7 +18,6 @@ package org.http4s.jdkhttpclient import cats._ import cats.effect._ -import cats.effect.std.Dispatcher import cats.effect.syntax.all._ import cats.implicits._ import fs2.Chunk @@ -39,6 +38,7 @@ import org.typelevel.ci.CIString import java.net.URI import java.net.http.HttpClient import java.net.http.HttpRequest +import java.net.http.HttpRequest.BodyPublisher import java.net.http.HttpRequest.BodyPublishers import java.net.http.HttpResponse import java.net.http.HttpResponse.BodyHandlers @@ -49,8 +49,7 @@ import scala.jdk.CollectionConverters._ object JdkHttpClient { - /** Creates a `Client` from an `HttpClient`. Note that the creation of an `HttpClient` is a side - * effect. + /** Creates a `Client` from an `HttpClient`. * * @param jdkHttpClient * The `HttpClient`. @@ -62,39 +61,36 @@ object JdkHttpClient { def apply[F[_]]( jdkHttpClient: HttpClient, ignoredHeaders: Set[CIString] = restrictedHeaders - )(implicit F: Async[F]): Resource[F, Client[F]] = Dispatcher[F].map { dispatcher => - def convertRequest(req: Request[F]): F[HttpRequest] = - convertHttpVersionFromHttp4s[F](req.httpVersion).map { version => - val rb = HttpRequest.newBuilder - .method( - req.method.name, - req.entity match { - case Entity.Empty => BodyPublishers.noBody() - case Entity.Strict(bytes) => - BodyPublishers.ofInputStream(() => bytes.toInputStream) - case Entity.Default(body, _) => - val publisher = FlowAdapters.toFlowPublisher( - StreamUnicastPublisher(body.chunks.map(_.toByteBuffer), dispatcher) - ) - - if (req.isChunked) - BodyPublishers.fromPublisher(publisher) - else - req.contentLength match { - case Some(length) if length > 0L => - BodyPublishers.fromPublisher(publisher, length) - case _ => BodyPublishers.noBody - } + )(implicit F: Async[F]): Client[F] = { + def convertRequest(req: Request[F]): Resource[F, HttpRequest] = for { + version <- Resource.eval(convertHttpVersionFromHttp4s[F](req.httpVersion)) + bodyPublisher <- req.entity match { + case Entity.Empty => Resource.pure[F, BodyPublisher](BodyPublishers.noBody()) + case Entity.Strict(bytes) => + Resource.pure[F, BodyPublisher](BodyPublishers.ofInputStream(() => bytes.toInputStream)) + case Entity.Default(body, _) => + StreamUnicastPublisher(body.chunks.map(_.toByteBuffer)) + .map(FlowAdapters.toFlowPublisher(_)) + .map { publisher => + if (req.isChunked) + BodyPublishers.fromPublisher(publisher) + else + req.contentLength match { + case Some(length) if length > 0L => + BodyPublishers.fromPublisher(publisher, length) + case _ => BodyPublishers.noBody + } } - ) - .uri(URI.create(req.uri.renderString)) - .version(version) - val headers = req.headers.headers.iterator - .filterNot(h => ignoredHeaders.contains(h.name)) - .flatMap(h => Iterator(h.name.toString, h.value)) - .toArray - (if (headers.isEmpty) rb else rb.headers(headers: _*)).build } + rb = HttpRequest.newBuilder + .method(req.method.name, bodyPublisher) + .uri(URI.create(req.uri.renderString)) + .version(version) + headers = req.headers.headers.iterator + .filterNot(h => ignoredHeaders.contains(h.name)) + .flatMap(h => Iterator(h.name.toString, h.value)) + .toArray + } yield (if (headers.isEmpty) rb else rb.headers(headers: _*)).build // Convert the JDK HttpResponse into a http4s Response value. // @@ -247,7 +243,7 @@ object JdkHttpClient { Client[F] { req => for { - req <- Resource.eval(convertRequest(req)) + req <- convertRequest(req) res = F.fromCompletableFuture( F.delay(jdkHttpClient.sendAsync(req, BodyHandlers.ofPublisher)) ) @@ -258,11 +254,11 @@ object JdkHttpClient { /** A `Client` wrapping the default `HttpClient`. */ - def simple[F[_]](implicit F: Async[F]): Resource[F, Client[F]] = - Resource.eval(defaultHttpClient[F]).flatMap(apply(_)) + def simple[F[_]](implicit F: Async[F]): F[Client[F]] = + defaultHttpClient[F].map(apply(_)) private[jdkhttpclient] def defaultHttpClient[F[_]](implicit F: Async[F]): F[HttpClient] = - F.executionContext.flatMap { ec => + F.executor.flatMap { exec => F.delay { val builder = HttpClient.newBuilder() // workaround for https://github.com/http4s/http4s-jdk-http-client/issues/200 @@ -272,10 +268,7 @@ object JdkHttpClient { builder.sslParameters(params) } - ec match { - case exec: util.concurrent.Executor => builder.executor(exec) - case _ => builder.executor(ec.execute(_)) - } + builder.executor(exec) builder.build() } diff --git a/core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala b/core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala index 9982104e..97af9d24 100644 --- a/core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala +++ b/core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala @@ -26,7 +26,6 @@ import fs2.CompositeFailure import fs2.Stream import org.http4s.Header import org.http4s.client.websocket._ -import org.http4s.internal.unsafeToCompletionStage import org.typelevel.ci._ import scodec.bits.ByteVector @@ -46,118 +45,120 @@ object JdkWSClient { /** Create a new `WSClient` backed by a JDK 11+ http client. */ def apply[F[_]]( jdkHttpClient: HttpClient - )(implicit F: Async[F]): Resource[F, WSClient[F]] = Dispatcher[F].map { dispatcher => + )(implicit F: Async[F]): WSClient[F] = WSClient(respondToPings = false) { req => - Resource - .make { - for { - wsBuilder <- F.delay { - val builder = jdkHttpClient.newWebSocketBuilder() - val (subprotocols, hs) = req.headers.headers.partitionEither { - case Header.Raw(ci"Sec-WebSocket-Protocol", p) => Left(p) - case h => Right(h) - } - hs.foreach { h => builder.header(h.name.toString, h.value); () } - subprotocols match { - case head :: tail => builder.subprotocols(head, tail: _*) - case Nil => - } - builder - } - queue <- Queue.unbounded[F, Either[Throwable, WSFrame]] - closedDef <- Deferred[F, Unit] - handleReceive = - (wsf: Either[Throwable, WSFrame]) => - unsafeToCompletionStage( - queue.offer(wsf) *> (wsf match { - case Left(_) | Right(_: WSFrame.Close) => closedDef.complete(()).void - case _ => F.unit - }), - dispatcher - ) - wsListener = new JWebSocket.Listener { - override def onOpen(webSocket: JWebSocket): Unit = () - override def onClose(webSocket: JWebSocket, statusCode: Int, reason: String) - : CompletionStage[_] = - // The output side of this connection will be closed when the returned CompletionStage completes. - // Therefore, we return a never completing CompletionStage, so we can control when the output will - // be closed (as it is allowed to continue sending frames (as few as possible) after a close frame - // has been received). - handleReceive(WSFrame.Close(statusCode, reason).asRight) - .thenCompose[Nothing](_ => new CompletableFuture[Nothing]) - override def onText(webSocket: JWebSocket, data: CharSequence, last: Boolean) - : CompletionStage[_] = - handleReceive(WSFrame.Text(data.toString, last).asRight) - override def onBinary(webSocket: JWebSocket, data: ByteBuffer, last: Boolean) - : CompletionStage[_] = - handleReceive(WSFrame.Binary(ByteVector(data), last).asRight) - override def onPing(webSocket: JWebSocket, message: ByteBuffer): CompletionStage[_] = - handleReceive(WSFrame.Ping(ByteVector(message)).asRight) - override def onPong(webSocket: JWebSocket, message: ByteBuffer): CompletionStage[_] = - handleReceive(WSFrame.Pong(ByteVector(message)).asRight) - override def onError(webSocket: JWebSocket, error: Throwable): Unit = { - handleReceive(error.asLeft); () + Dispatcher.sequential.flatMap { dispatcher => + Resource + .make { + for { + wsBuilder <- F.delay { + val builder = jdkHttpClient.newWebSocketBuilder() + val (subprotocols, hs) = req.headers.headers.partitionEither { + case Header.Raw(ci"Sec-WebSocket-Protocol", p) => Left(p) + case h => Right(h) + } + hs.foreach { h => builder.header(h.name.toString, h.value); () } + subprotocols match { + case head :: tail => builder.subprotocols(head, tail: _*) + case Nil => + } + builder } - } - webSocket <- F.fromCompletableFuture( - F.delay(wsBuilder.buildAsync(URI.create(req.uri.renderString), wsListener)) - ) - sendSem <- Semaphore[F](1L) - } yield (webSocket, queue, closedDef, sendSem) - } { case (webSocket, queue, _, _) => - for { - isOutputOpen <- F.delay(!webSocket.isOutputClosed) - closeOutput = F.fromCompletableFuture( - F.delay(webSocket.sendClose(JWebSocket.NORMAL_CLOSURE, "")) - ) - _ <- - closeOutput - .whenA(isOutputOpen) - .recover { case e: IOException if e.getMessage == "closed output" => () } - .onError { case e: IOException => - for { - errs <- Stream - .repeatEval(queue.tryTake) - .unNoneTerminate - .collect { case Left(e) => e } - .compile - .toList - _ <- F.raiseError[Unit](CompositeFailure.fromList(errs) match { - case Some(cf) => cf - case None => e + queue <- Queue.unbounded[F, Either[Throwable, WSFrame]] + closedDef <- Deferred[F, Unit] + handleReceive = + (wsf: Either[Throwable, WSFrame]) => + dispatcher.unsafeToCompletableFuture( + queue.offer(wsf) *> (wsf match { + case Left(_) | Right(_: WSFrame.Close) => closedDef.complete(()).void + case _ => F.unit }) - } yield () + ) + wsListener = new JWebSocket.Listener { + override def onOpen(webSocket: JWebSocket): Unit = () + override def onClose(webSocket: JWebSocket, statusCode: Int, reason: String) + : CompletionStage[_] = + // The output side of this connection will be closed when the returned CompletionStage completes. + // Therefore, we return a never completing CompletionStage, so we can control when the output will + // be closed (as it is allowed to continue sending frames (as few as possible) after a close frame + // has been received). + handleReceive(WSFrame.Close(statusCode, reason).asRight) + .thenCompose[Nothing](_ => new CompletableFuture[Nothing]) + override def onText(webSocket: JWebSocket, data: CharSequence, last: Boolean) + : CompletionStage[_] = + handleReceive(WSFrame.Text(data.toString, last).asRight) + override def onBinary(webSocket: JWebSocket, data: ByteBuffer, last: Boolean) + : CompletionStage[_] = + handleReceive(WSFrame.Binary(ByteVector(data), last).asRight) + override def onPing(webSocket: JWebSocket, message: ByteBuffer) + : CompletionStage[_] = + handleReceive(WSFrame.Ping(ByteVector(message)).asRight) + override def onPong(webSocket: JWebSocket, message: ByteBuffer) + : CompletionStage[_] = + handleReceive(WSFrame.Pong(ByteVector(message)).asRight) + override def onError(webSocket: JWebSocket, error: Throwable): Unit = { + handleReceive(error.asLeft); () } - } yield () - } - .map { case (webSocket, queue, closedDef, sendSem) => - // sending will throw if done in parallel - val rawSend = (wsf: WSFrame) => - F.fromCompletableFuture(F.delay(wsf match { - case WSFrame.Text(text, last) => webSocket.sendText(text, last) - case WSFrame.Binary(data, last) => webSocket.sendBinary(data.toByteBuffer, last) - case WSFrame.Ping(data) => webSocket.sendPing(data.toByteBuffer) - case WSFrame.Pong(data) => webSocket.sendPong(data.toByteBuffer) - case WSFrame.Close(statusCode, reason) => webSocket.sendClose(statusCode, reason) - })) - .void - new WSConnection[F] { - override def send(wsf: WSFrame) = - sendSem.permit.use(_ => rawSend(wsf)) - override def sendMany[G[_]: Foldable, A <: WSFrame](wsfs: G[A]) = - sendSem.permit.use(_ => wsfs.traverse_(rawSend)) - override def receive = closedDef.tryGet.flatMap { - case None => F.delay(webSocket.request(1)) *> queue.take.rethrow.map(_.some) - case Some(()) => none[WSFrame].pure[F] + } + webSocket <- F.fromCompletableFuture( + F.delay(wsBuilder.buildAsync(URI.create(req.uri.renderString), wsListener)) + ) + sendSem <- Semaphore[F](1L) + } yield (webSocket, queue, closedDef, sendSem) + } { case (webSocket, queue, _, _) => + for { + isOutputOpen <- F.delay(!webSocket.isOutputClosed) + closeOutput = F.fromCompletableFuture( + F.delay(webSocket.sendClose(JWebSocket.NORMAL_CLOSURE, "")) + ) + _ <- + closeOutput + .whenA(isOutputOpen) + .recover { case e: IOException if e.getMessage == "closed output" => () } + .onError { case e: IOException => + for { + errs <- Stream + .repeatEval(queue.tryTake) + .unNoneTerminate + .collect { case Left(e) => e } + .compile + .toList + _ <- F.raiseError[Unit](CompositeFailure.fromList(errs) match { + case Some(cf) => cf + case None => e + }) + } yield () + } + } yield () + } + .map { case (webSocket, queue, closedDef, sendSem) => + // sending will throw if done in parallel + val rawSend = (wsf: WSFrame) => + F.fromCompletableFuture(F.delay(wsf match { + case WSFrame.Text(text, last) => webSocket.sendText(text, last) + case WSFrame.Binary(data, last) => webSocket.sendBinary(data.toByteBuffer, last) + case WSFrame.Ping(data) => webSocket.sendPing(data.toByteBuffer) + case WSFrame.Pong(data) => webSocket.sendPong(data.toByteBuffer) + case WSFrame.Close(statusCode, reason) => webSocket.sendClose(statusCode, reason) + })) + .void + new WSConnection[F] { + override def send(wsf: WSFrame) = + sendSem.permit.use(_ => rawSend(wsf)) + override def sendMany[G[_]: Foldable, A <: WSFrame](wsfs: G[A]) = + sendSem.permit.use(_ => wsfs.traverse_(rawSend)) + override def receive = closedDef.tryGet.flatMap { + case None => F.delay(webSocket.request(1)) *> queue.take.rethrow.map(_.some) + case Some(()) => none[WSFrame].pure[F] + } + override def subprotocol = + webSocket.getSubprotocol.some.filter(_.nonEmpty) } - override def subprotocol = - webSocket.getSubprotocol.some.filter(_.nonEmpty) } - } + } } - } /** A `WSClient` wrapping the default `HttpClient`. */ - def simple[F[_]](implicit F: Async[F]): Resource[F, WSClient[F]] = - Resource.eval(JdkHttpClient.defaultHttpClient[F]).flatMap(apply(_)) + def simple[F[_]](implicit F: Async[F]): F[WSClient[F]] = + JdkHttpClient.defaultHttpClient[F].map(apply(_)) } diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/BodyLeakExample.scala b/core/src/test/scala/org/http4s/jdkhttpclient/BodyLeakExample.scala index a8982ee5..7da879a8 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/BodyLeakExample.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/BodyLeakExample.scala @@ -19,9 +19,10 @@ package org.http4s.jdkhttpclient import cats.data._ import cats.effect._ import cats.syntax.all._ +import com.comcast.ip4s._ import org.http4s._ -import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.client._ +import org.http4s.ember.server.EmberServerBuilder import org.http4s.syntax.all._ // This is a *manual* test for the body leak fixed in #335 @@ -45,11 +46,12 @@ object BodyLeakExample extends IOApp { ) override def run(args: List[String]): IO[ExitCode] = - BlazeServerBuilder[IO] - .bindLocal(8080) + EmberServerBuilder + .default[IO] + .withPort(port"8080") .withHttpApp(app) - .resource - .product(JdkHttpClient.simple[IO]) + .build + .product(Resource.eval(JdkHttpClient.simple[IO])) .use { case (_, client) => for { counter <- Ref.of[IO, Long](0L) diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/CompletableFutureTerminationTest.scala b/core/src/test/scala/org/http4s/jdkhttpclient/CompletableFutureTerminationTest.scala index ba5c7563..18a5b00d 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/CompletableFutureTerminationTest.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/CompletableFutureTerminationTest.scala @@ -20,9 +20,10 @@ import cats.data._ import cats.effect._ import cats.effect.std.Semaphore import cats.syntax.all._ +import com.comcast.ip4s._ import munit.CatsEffectSuite import org.http4s._ -import org.http4s.blaze.server._ +import org.http4s.ember.server._ import org.http4s.server._ import java.net.URI @@ -174,7 +175,8 @@ object CompletableFutureTerminationTest { semaphore: Semaphore[F], gotRequest: Semaphore[F] )(implicit F: Async[F]): Resource[F, Server] = - BlazeServerBuilder[F] + EmberServerBuilder + .default[F] .withHttpApp( Kleisli( Function.const( @@ -183,8 +185,9 @@ object CompletableFutureTerminationTest { ) ) ) - .bindAny() - .resource + .withShutdownTimeout(1.second) + .withPort(port"0") + .build /** Just a scala wrapper class to make it easier to generate a [[java.util.function.BiFunction]]. */ diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala b/core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala index a538a07b..e6d903bb 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala @@ -29,7 +29,7 @@ class DeadlockWorkaround extends CatsEffectSuite { test("fail to connect via TLSv1.3 on Java 11") { if (Runtime.version().feature() > 11) IO.pure(true) else - JdkHttpClient.simple[IO].product(JdkWSClient.simple[IO]).use { case (http, ws) => + (JdkHttpClient.simple[IO], JdkWSClient.simple[IO]).flatMapN { (http, ws) => def testSSLFailure(r: IO[Unit]) = r.intercept[SSLHandshakeException] testSSLFailure(http.expect[Unit](uri"https://tls13.1d.pw")) *> testSSLFailure(ws.connectHighLevel(WSRequest(uri"wss://tls13.1d.pw")).use(_ => IO.unit)) diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala b/core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala index 56fbcadb..7e473f73 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala @@ -26,7 +26,7 @@ import org.http4s.client.testkit.testroutes.GetRoutes import org.typelevel.ci._ class JdkHttpClientSpec extends ClientRouteTestBattery("JdkHttpClient") { - def clientResource: Resource[IO, Client[IO]] = JdkHttpClient.simple[IO] + def clientResource: Resource[IO, Client[IO]] = Resource.eval(JdkHttpClient.simple[IO]) // regression test for https://github.com/http4s/http4s-jdk-http-client/issues/395 test("Don't error with empty body and explicit Content-Length: 0") { diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala b/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala index f10dabad..1ebb3a32 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala @@ -18,40 +18,35 @@ package org.http4s.jdkhttpclient import cats.effect._ import cats.implicits._ +import com.comcast.ip4s._ import fs2.Stream import munit.CatsEffectSuite import munit.catseffect.IOFixture import org.http4s._ -import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.client.websocket._ import org.http4s.dsl.io._ +import org.http4s.ember.server.EmberServerBuilder import org.http4s.implicits._ import org.http4s.websocket.WebSocketFrame -import org.java_websocket.WebSocket -import org.java_websocket.handshake.ClientHandshake -import org.java_websocket.server.WebSocketServer import org.typelevel.ci._ import scodec.bits.ByteVector -import java.net.InetSocketAddress -import java.nio.ByteBuffer -import scala.concurrent.duration._ - class JdkWSClientSpec extends CatsEffectSuite { val webSocket: IOFixture[WSClient[IO]] = - ResourceSuiteLocalFixture("webSocket", JdkWSClient.simple[IO]) + ResourceSuiteLocalFixture("webSocket", Resource.eval(JdkWSClient.simple[IO])) val echoServerUri: IOFixture[Uri] = ResourceSuiteLocalFixture( "echoServerUri", - BlazeServerBuilder[IO] - .bindAny() + EmberServerBuilder + .default[IO] + .withPort(port"0") .withHttpWebSocketApp { wsb => HttpRoutes .of[IO] { case GET -> Root => wsb.build(identity) } .orNotFound } - .resource + .build .map(s => httpToWsUri(s.baseUri)) ) @@ -73,7 +68,7 @@ class JdkWSClientSpec extends CatsEffectSuite { WSFrame.Text("bar"), WSFrame.Binary(ByteVector(3, 99, 12)), WSFrame.Text("foo"), - WSFrame.Close(1000, "goodbye") + WSFrame.Close(1000, "") ) ) } @@ -135,44 +130,37 @@ class JdkWSClientSpec extends CatsEffectSuite { } test("automatically close the connection") { + val closeFrame = WebSocketFrame.Close(1000, "").toTry.get val frames = for { ref <- Ref[IO].of(List.empty[WebSocketFrame]) - finished <- Deferred[IO, Unit] - // we use Java-Websocket because Blaze has a bug concerning the handling of Close frames and shutting down - server = new WebSocketServer(new InetSocketAddress("localhost", 8080)) { - override def onOpen(conn: WebSocket, handshake: ClientHandshake) = () - override def onClose(conn: WebSocket, code: Int, reason: String, remote: Boolean) = - ref - .update(_ :+ WebSocketFrame.Close(code, reason).fold(throw _, identity)) - .unsafeRunSync() - override def onMessage(conn: WebSocket, message: String) = - ref.update(_ :+ WebSocketFrame.Text(message)).unsafeRunSync() - override def onMessage(conn: WebSocket, message: ByteBuffer) = - ref.update(_ :+ WebSocketFrame.Binary(ByteVector(message))).unsafeRunSync() - override def onError(conn: WebSocket, ex: Exception) = println(s"WS error $ex") - override def onStart() = { - val req = WSRequest(uri"ws://localhost:8080") - val p = for { - _ <- webSocket().connect(req).use(conn => conn.send(WSFrame.Text("hi blaze"))) - _ <- IO.sleep(1.seconds) - _ <- webSocket().connectHighLevel(req).use { conn => - conn.send(WSFrame.Text("hey blaze")) + server = EmberServerBuilder + .default[IO] + .withPort(port"0") + .withHttpWebSocketApp { wsb => + HttpRoutes + .of[IO] { case GET -> Root => + wsb + .withOnClose(ref.update(_ :+ closeFrame)) + .build(_.evalTap(f => ref.update(_ :+ f))) } - _ <- IO.sleep(1.seconds) - _ <- finished.complete(()) - } yield () - p.unsafeRunAsync(_ => ()) + .orNotFound } + .build + .map(s => WSRequest(httpToWsUri(s.baseUri))) + _ <- server.use { req => + webSocket().connect(req).use(conn => conn.send(WSFrame.Text("hi ember"))) *> + webSocket().connectHighLevel(req).use { conn => + conn.send(WSFrame.Text("hey ember")) + } } - frames <- IO(server.start()) - .bracket(_ => finished.get *> ref.get)(_ => IO(server.stop(0))) + frames <- ref.get } yield frames frames.assertEquals( List( - WebSocketFrame.Text("hi blaze"), - WebSocketFrame.Close(1000, "").fold(throw _, identity), - WebSocketFrame.Text("hey blaze"), - WebSocketFrame.Close(1000, "").fold(throw _, identity) + WebSocketFrame.Text("hi ember"), + closeFrame, + WebSocketFrame.Text("hey ember"), + closeFrame ) ) } @@ -186,8 +174,9 @@ class JdkWSClientSpec extends CatsEffectSuite { Ref[IO] .of(None: Option[Headers]) .flatMap { ref => - BlazeServerBuilder[IO] - .bindAny() + EmberServerBuilder + .default[IO] + .withPort(port"0") .withHttpWebSocketApp { wsb => HttpRoutes .of[IO] { case r @ GET -> Root => @@ -195,7 +184,7 @@ class JdkWSClientSpec extends CatsEffectSuite { } .orNotFound } - .resource + .build .use { server => webSocket() .connect(WSRequest(httpToWsUri(server.baseUri)).withHeaders(sentHeaders)) diff --git a/docs/README.md b/docs/README.md index 1142c798..f91d96de 100644 --- a/docs/README.md +++ b/docs/README.md @@ -50,7 +50,7 @@ import org.http4s.jdkhttpclient.JdkHttpClient // It comes for free with `cats.effect.IOApp`: import cats.effect.unsafe.implicits.global -val client: Resource[IO, Client[IO]] = JdkHttpClient.simple[IO] +val client: IO[Client[IO]] = JdkHttpClient.simple[IO] ``` #### Custom clients @@ -63,15 +63,15 @@ in an effect, as it creates a default executor and SSL context: import java.net.{InetSocketAddress, ProxySelector} import java.net.http.HttpClient -val client0: Resource[IO, Client[IO]] = Resource.eval(IO.executionContext.flatMap { ec => +val client0: IO[Client[IO]] = IO.executor.flatMap { exec => IO { HttpClient.newBuilder() .version(HttpClient.Version.HTTP_2) .proxy(ProxySelector.of(new InetSocketAddress("www-proxy", 8080))) - .executor(ec.execute(_)) + .executor(exec) .build() } -}).flatMap(JdkHttpClient(_)) +}.map(JdkHttpClient(_)) ``` ### Sharing @@ -89,7 +89,7 @@ def fetchStatus[F[_]](c: Client[F], uri: Uri): F[Status] = c.status(Request[F](Method.GET, uri = uri)) client - .use(c => fetchStatus(c, uri"https://http4s.org/")) + .flatMap(c => fetchStatus(c, uri"https://http4s.org/")) .unsafeRunSync() ``` @@ -101,7 +101,7 @@ create a new `HttpClient` instance on every invocation: ```scala mdoc def fetchStatusInefficiently[F[_]: Async](uri: Uri): F[Status] = - JdkHttpClient.simple[F].use(_.status(Request[F](Method.GET, uri = uri))) + JdkHttpClient.simple[F].flatMap(_.status(Request[F](Method.GET, uri = uri))) ``` @:@ @@ -138,12 +138,11 @@ import org.http4s.client.websocket._ import org.http4s.jdkhttpclient._ val (http, webSocket) = - Resource.eval(IO(HttpClient.newHttpClient())) - .flatMap { httpClient => - (JdkHttpClient[IO](httpClient), JdkWSClient[IO](httpClient)).tupled + IO(HttpClient.newHttpClient()) + .map { httpClient => + (JdkHttpClient[IO](httpClient), JdkWSClient[IO](httpClient)) } - // in almost all cases, it is better to call `use` instead - .allocated.map(_._1).unsafeRunSync() + .unsafeRunSync() ``` If you do not need an HTTP client, you can also call `JdkWSClient.simple[IO]` as above. @@ -186,13 +185,14 @@ We use the "high-level" connection mode to build a simple websocket app. ```scala mdoc:invisible import org.http4s.dsl.io._ import org.http4s.implicits._ -import org.http4s.blaze.server.BlazeServerBuilder -val echoServer = BlazeServerBuilder[IO] - .bindAny() +import org.http4s.ember.server.EmberServerBuilder +import com.comcast.ip4s._ +val echoServer = EmberServerBuilder.default[IO] + .withPort(port"0") .withHttpWebSocketApp(wsb => HttpRoutes.of[IO] { case GET -> Root => wsb.build(identity) }.orNotFound) - .resource + .build .map(s => s.baseUri.copy(scheme = scheme"ws".some)) ``` diff --git a/docs/index.html b/docs/index.html index e312eacc..27e9de85 100644 --- a/docs/index.html +++ b/docs/index.html @@ -1,4 +1,4 @@ - - + +