Skip to content

Commit 025892d

Browse files
authored
Merge pull request #828 from amesgen/merge-into-main
Merge `series/0.9` into `main`
2 parents c6d52c4 + 7eb8467 commit 025892d

File tree

4 files changed

+23
-27
lines changed

4 files changed

+23
-27
lines changed

build.sbt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ ThisBuild / mergifyLabelPaths += "docs" -> file("docs")
3636

3737
val catsV = "2.9.0"
3838
val catsEffectV = "3.4.6"
39-
val fs2V = "3.5.0"
39+
val fs2V = "3.6.0"
4040
val scodecV = "1.1.35"
4141
val http4sV = "1.0.0-M39"
4242
val reactiveStreamsV = "1.0.4"
@@ -57,10 +57,8 @@ val coreDeps = Seq(
5757
"org.typelevel" %% "cats-effect-kernel" % catsEffectV,
5858
"org.typelevel" %% "cats-effect-std" % catsEffectV,
5959
"co.fs2" %% "fs2-core" % fs2V,
60-
"co.fs2" %% "fs2-reactive-streams" % fs2V,
6160
"org.http4s" %% "http4s-client" % http4sV,
6261
"org.http4s" %% "http4s-core" % http4sV,
63-
"org.reactivestreams" % "reactive-streams" % reactiveStreamsV,
6462
"org.scodec" %% "scodec-bits" % scodecV,
6563
"org.typelevel" %% "vault" % vaultV,
6664
"org.typelevel" %% "case-insensitive" % caseInsensitiveV

core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import cats.implicits._
2323
import fs2.Chunk
2424
import fs2.Stream
2525
import fs2.concurrent.SignallingRef
26-
import fs2.interop.reactivestreams._
26+
import fs2.interop.flow
2727
import org.http4s.Entity
2828
import org.http4s.Header
2929
import org.http4s.Headers
@@ -32,7 +32,6 @@ import org.http4s.Request
3232
import org.http4s.Response
3333
import org.http4s.Status
3434
import org.http4s.client.Client
35-
import org.reactivestreams.FlowAdapters
3635
import org.typelevel.ci.CIString
3736

3837
import java.net.URI
@@ -69,8 +68,8 @@ object JdkHttpClient {
6968
case Entity.Strict(bytes) =>
7069
Resource.pure[F, BodyPublisher](BodyPublishers.ofInputStream(() => bytes.toInputStream))
7170
case Entity.Default(body, _) =>
72-
StreamUnicastPublisher(body.chunks.map(_.toByteBuffer))
73-
.map(FlowAdapters.toFlowPublisher(_))
71+
flow
72+
.toPublisher(body.chunks.map(_.toByteBuffer))
7473
.map { publisher =>
7574
if (req.isChunked)
7675
BodyPublishers.fromPublisher(publisher)
@@ -198,24 +197,20 @@ object JdkHttpClient {
198197
}.uncancelable
199198
}
200199
.flatMap { case (subscription, res) =>
201-
val body: Stream[F, util.List[ByteBuffer]] =
202-
Stream
203-
.eval(StreamSubscriber[F, util.List[ByteBuffer]](1))
204-
.flatMap(s =>
205-
s.sub.stream(
206-
// Complete the TrybleDeferred so that we indicate we have
207-
// subscribed to the Publisher.
208-
//
209-
// This only happens _after_ someone attempts to pull from the
210-
// body and will never happen if the body is never pulled
211-
// from. In that case, the AlwaysCancelingSubscriber handles
212-
// cleanup.
213-
F.uncancelable { _ =>
214-
subscription.complete(()) *>
215-
F.delay(FlowAdapters.toPublisher(res.body).subscribe(s))
216-
}
217-
)
218-
)
200+
val body =
201+
flow.fromPublisher[F, util.List[ByteBuffer]](1) { subscriber =>
202+
// Complete the TrybleDeferred so that we indicate we have
203+
// subscribed to the Publisher.
204+
//
205+
// This only happens _after_ someone attempts to pull from the
206+
// body and will never happen if the body is never pulled
207+
// from. In that case, the AlwaysCancelingSubscriber handles
208+
// cleanup.
209+
F.uncancelable { _ =>
210+
subscription.complete(()) *>
211+
F.delay(res.body.subscribe(subscriber))
212+
}
213+
}
219214
Resource(
220215
(F.fromEither(Status.fromInt(res.statusCode)), SignallingRef[F, Boolean](false)).mapN {
221216
case (status, signal) =>

core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import org.http4s.websocket.WebSocketFrame
3131
import org.typelevel.ci._
3232
import scodec.bits.ByteVector
3333

34+
import scala.concurrent.duration._
35+
3436
class JdkWSClientSpec extends CatsEffectSuite {
3537

3638
val webSocket: IOFixture[WSClient[IO]] =
@@ -149,6 +151,7 @@ class JdkWSClientSpec extends CatsEffectSuite {
149151
.map(s => WSRequest(httpToWsUri(s.baseUri)))
150152
_ <- server.use { req =>
151153
webSocket().connect(req).use(conn => conn.send(WSFrame.Text("hi ember"))) *>
154+
IO.sleep(100.millis) *> // quick sleep to collect the close frame
152155
webSocket().connectHighLevel(req).use { conn =>
153156
conn.send(WSFrame.Text("hey ember"))
154157
}

docs/index.html

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
<!DOCTYPE html>
22
<meta charset="utf-8">
3-
<meta http-equiv="refresh" content="0; URL=0.8">
4-
<link rel="canonical" href="0.8">
3+
<meta http-equiv="refresh" content="0; URL=0.9">
4+
<link rel="canonical" href="0.9">

0 commit comments

Comments
 (0)