Skip to content

Document new interop with reactive streams #3558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions site/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ This is the official FS2 guide. It gives an overview of the library and its feat
* [Exercises (concurrency)](#exercises-concurrency)
* [Interruption](#interruption)
* [Talking to the external world](#talking-to-the-external-world)
* [Reactive streams](#reactive-streams)
* [Java Flow and reactive streams](#java-flow-and-reactive-streams)
* [Learning more](#learning-more)
* [Appendixes](#appendixes)

Expand Down Expand Up @@ -782,43 +782,72 @@ def rows[F[_]](h: CSVHandle)(implicit F: Async[F]): Stream[F,Row] = {
See [`Queue`](https://github.com/typelevel/cats-effect/blob/series/3.x/std/shared/src/main/scala/cats/effect/std/Queue.scala)
for more useful methods. Most concurrent queues in cats effect support tracking their size, which is handy for implementing size-based throttling of the producer.

### Reactive streams
### Java Flow and reactive streams

The [reactive streams initiative](http://www.reactive-streams.org/) is complicated, mutable and unsafe - it is not something that is desired for use over fs2.
The [reactive streams initiative](http://www.reactive-streams.org/) and its successor `java.util.concurrent.Flow` are complicated, mutable and unsafe - it is not something that is desired for use over fs2.
But there are times when we need use fs2 in conjunction with a different streaming library, and this is where reactive streams shines.

Any reactive streams system can interoperate with any other reactive streams system by exposing an `org.reactivestreams.Publisher` or an `org.reactivestreams.Subscriber`.
Any reactive streams system can interoperate with any other reactive streams system by exposing a `Publisher` or a `Subscriber`.

The `reactive-streams` library provides instances of reactive streams compliant publishers and subscribers to ease interoperability with other streaming libraries.
FS2 provides instances of reactive streams compliant publishers and subscribers to ease interoperability with other streaming libraries.

#### Usage
#### Usage with Java Flow

You may require the following imports:

```scala mdoc:reset
import fs2._
import fs2.interop.reactivestreams._
import cats.effect.{IO, Resource}
import java.util.concurrent.Flow.Publisher
```

To convert a `Stream` into a downstream `java.util.concurrent.Flow.Publisher`:

```scala mdoc
val stream = Stream(1, 2, 3).covary[IO]
stream.toPublisherResource
```

To convert an upstream `java.util.concurrent.Publisher` into a `Stream`:

```scala mdoc
val publisher: Resource[IO, Publisher[Int]] = Stream(1, 2, 3).covary[IO].toPublisherResource
publisher.use { p =>
Stream.fromPublisher[IO](p, chunkSize = 10).compile.toList
}
```

#### Usage with reactive streams

If your are integrating a library that is still using `org.reactivestreams.Publisher`, you can use `org.reactivestreams.FlowAdapters` to convert to Java Flow.

You may require the following imports:

```scala mdoc:reset
import fs2._
import cats.effect.{IO, Resource}
import org.reactivestreams.{FlowAdapters, Publisher}
```

To convert a `Stream` into a downstream unicast `org.reactivestreams.Publisher`:

```scala mdoc
val stream = Stream(1, 2, 3).covary[IO]
stream.toUnicastPublisher
stream.toPublisherResource.map(FlowAdapters.toPublisher[Int])
```

To convert an upstream `org.reactivestreams.Publisher` into a `Stream`:

```scala mdoc
val publisher: Resource[IO, StreamUnicastPublisher[IO, Int]] = Stream(1, 2, 3).covary[IO].toUnicastPublisher
val publisher: Resource[IO, Publisher[Int]] = Stream(1, 2, 3).covary[IO].toPublisherResource.map(FlowAdapters.toPublisher[Int])
publisher.use { p =>
p.toStream[IO].compile.toList
Stream
.fromPublisher[IO](FlowAdapters.toFlowPublisher(p), chunkSize = 10)
.compile
.toList
}
```

A unicast publisher must have a single subscriber only.

### Learning more

Want to learn more?
Expand Down