Open
Description
As originally raised in http4s/http4s#7502
The following code:
Click to expand
import cats.effect.{IO, IOApp}
import cats.effect.IO.asyncForIO
import cats.effect.kernel.Resource
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client
import scala.concurrent.duration.*
object Demo extends IOApp.Simple:
val clientResource: Resource[IO, Client[IO]] = BlazeClientBuilder[IO].resource
def query: IO[Int] = clientResource.use(
_
.statusFromString(s"https://postman-echo.com/get")
.map(_.code)
.flatTap(s => IO.println(s"request: returned with status: $s"))
)
override def run: IO[Unit] =
fs2.Stream
.range(1, 100)
.metered(1.seconds)
.evalMap(_ => query)
.compile
.drain
throws the following logs:
java.lang.IllegalStateException: supervisor already shutdown
at get @ fs2.internal.Scope.openScope(Scope.scala:275)
at get @ fs2.internal.Scope.openScope(Scope.scala:275)
at unique @ fs2.Compiler$Target$ConcurrentTarget.unique(Compiler.scala:194)
Note that you also see after each request:
INFO org.http4s.blaze.client.PoolManager - Shutting down connection pool: curAllocated=1 idleQueues.size=1 waitQueue.size=0 maxWaitQueueLimit=256 closed=false
if I change the code to:
Click to expand
import cats.effect.{IO, IOApp, Resource}
import cats.effect.IO.asyncForIO
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client
import scala.concurrent.duration.*
object Demo extends IOApp.Simple:
val clientResource: Resource[IO, Client[IO]] = BlazeClientBuilder[IO].resource
def query(client: Client[IO]): IO[Int] = client
.statusFromString(s"https://postman-echo.com/get")
.map(_.code)
.flatTap(s => IO.println(s"request: returned with status: $s"))
override def run: IO[Unit] =
clientResource.use(c =>
fs2.Stream
.range(1, 100)
.metered(1.seconds)
.evalMap(_ => query(c))
.compile
.drain
)
ie,
- fs2.Stream
- .range(1, 100)
- .metered(1.seconds)
- .evalMap(_ => query)
- .compile
- .drain
+ clientResource.use(c =>
+ fs2.Stream
+ .range(1, 100)
+ .metered(1.seconds)
+ .evalMap(_ => query(c))
+ .compile
+ .drain
+ )
The problem goes away. I also checked this with the ember client and not seeing the issue there:
Click to expand
import cats.effect.{IO, IOApp, Resource}
import cats.effect.IO.asyncForIO
import org.http4s.ember.client.EmberClientBuilder
import org.http4s.client.Client
import scala.concurrent.duration.*
object Demo extends IOApp.Simple:
val clientResource: Resource[IO, Client[IO]] = EmberClientBuilder.default[IO].build
def query: IO[Int] = clientResource.use(
_
.statusFromString(s"https://postman-echo.com/get")
.map(_.code)
.flatTap(s => IO.println(s"request: returned with status: $s"))
)
override def run: IO[Unit] =
fs2.Stream
.range(1, 100)
.metered(1.seconds)
.evalMap(_ => query)
.compile
.drain
Metadata
Assignees
Labels
No labels