diff --git a/build.sbt b/build.sbt index c28e0e22..66851106 100644 --- a/build.sbt +++ b/build.sbt @@ -36,7 +36,7 @@ ThisBuild / mergifyLabelPaths += "docs" -> file("docs") val catsV = "2.9.0" val catsEffectV = "3.4.6" -val fs2V = "3.5.0" +val fs2V = "3.6.0" val scodecV = "1.1.35" val http4sV = "1.0.0-M39" val reactiveStreamsV = "1.0.4" @@ -57,10 +57,8 @@ val coreDeps = Seq( "org.typelevel" %% "cats-effect-kernel" % catsEffectV, "org.typelevel" %% "cats-effect-std" % catsEffectV, "co.fs2" %% "fs2-core" % fs2V, - "co.fs2" %% "fs2-reactive-streams" % fs2V, "org.http4s" %% "http4s-client" % http4sV, "org.http4s" %% "http4s-core" % http4sV, - "org.reactivestreams" % "reactive-streams" % reactiveStreamsV, "org.scodec" %% "scodec-bits" % scodecV, "org.typelevel" %% "vault" % vaultV, "org.typelevel" %% "case-insensitive" % caseInsensitiveV diff --git a/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala b/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala index e8ced211..5c94137e 100644 --- a/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala +++ b/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala @@ -23,7 +23,7 @@ import cats.implicits._ import fs2.Chunk import fs2.Stream import fs2.concurrent.SignallingRef -import fs2.interop.reactivestreams._ +import fs2.interop.flow import org.http4s.Entity import org.http4s.Header import org.http4s.Headers @@ -32,7 +32,6 @@ import org.http4s.Request import org.http4s.Response import org.http4s.Status import org.http4s.client.Client -import org.reactivestreams.FlowAdapters import org.typelevel.ci.CIString import java.net.URI @@ -69,8 +68,8 @@ object JdkHttpClient { case Entity.Strict(bytes) => Resource.pure[F, BodyPublisher](BodyPublishers.ofInputStream(() => bytes.toInputStream)) case Entity.Default(body, _) => - StreamUnicastPublisher(body.chunks.map(_.toByteBuffer)) - .map(FlowAdapters.toFlowPublisher(_)) + flow + .toPublisher(body.chunks.map(_.toByteBuffer)) .map { publisher => if (req.isChunked) BodyPublishers.fromPublisher(publisher) @@ -198,24 +197,20 @@ object JdkHttpClient { }.uncancelable } .flatMap { case (subscription, res) => - val body: Stream[F, util.List[ByteBuffer]] = - Stream - .eval(StreamSubscriber[F, util.List[ByteBuffer]](1)) - .flatMap(s => - s.sub.stream( - // Complete the TrybleDeferred so that we indicate we have - // subscribed to the Publisher. - // - // This only happens _after_ someone attempts to pull from the - // body and will never happen if the body is never pulled - // from. In that case, the AlwaysCancelingSubscriber handles - // cleanup. - F.uncancelable { _ => - subscription.complete(()) *> - F.delay(FlowAdapters.toPublisher(res.body).subscribe(s)) - } - ) - ) + val body = + flow.fromPublisher[F, util.List[ByteBuffer]](1) { subscriber => + // Complete the TrybleDeferred so that we indicate we have + // subscribed to the Publisher. + // + // This only happens _after_ someone attempts to pull from the + // body and will never happen if the body is never pulled + // from. In that case, the AlwaysCancelingSubscriber handles + // cleanup. + F.uncancelable { _ => + subscription.complete(()) *> + F.delay(res.body.subscribe(subscriber)) + } + } Resource( (F.fromEither(Status.fromInt(res.statusCode)), SignallingRef[F, Boolean](false)).mapN { case (status, signal) => diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala b/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala index 1ebb3a32..15d8284b 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala @@ -31,6 +31,8 @@ import org.http4s.websocket.WebSocketFrame import org.typelevel.ci._ import scodec.bits.ByteVector +import scala.concurrent.duration._ + class JdkWSClientSpec extends CatsEffectSuite { val webSocket: IOFixture[WSClient[IO]] = @@ -149,6 +151,7 @@ class JdkWSClientSpec extends CatsEffectSuite { .map(s => WSRequest(httpToWsUri(s.baseUri))) _ <- server.use { req => webSocket().connect(req).use(conn => conn.send(WSFrame.Text("hi ember"))) *> + IO.sleep(100.millis) *> // quick sleep to collect the close frame webSocket().connectHighLevel(req).use { conn => conn.send(WSFrame.Text("hey ember")) } diff --git a/docs/index.html b/docs/index.html index 27e9de85..7138aa18 100644 --- a/docs/index.html +++ b/docs/index.html @@ -1,4 +1,4 @@ - - + +