Skip to content

Commit eea5865

Browse files
committed
Document new interop with reactive streams
The documentation still mentions only the deprecated reactive streams interop features. Now interop with Java Flow can be done directly on the `Stream` class and companion object. This should be reflected in the documentation.
1 parent a3f737b commit eea5865

File tree

1 file changed

+40
-11
lines changed

1 file changed

+40
-11
lines changed

Diff for: site/guide.md

+40-11
Original file line numberDiff line numberDiff line change
@@ -782,43 +782,72 @@ def rows[F[_]](h: CSVHandle)(implicit F: Async[F]): Stream[F,Row] = {
782782
See [`Queue`](https://github.com/typelevel/cats-effect/blob/series/3.x/std/shared/src/main/scala/cats/effect/std/Queue.scala)
783783
for more useful methods. Most concurrent queues in cats effect support tracking their size, which is handy for implementing size-based throttling of the producer.
784784

785-
### Reactive streams
785+
### Java Flow and reactive streams
786786

787-
The [reactive streams initiative](http://www.reactive-streams.org/) is complicated, mutable and unsafe - it is not something that is desired for use over fs2.
787+
The [reactive streams initiative](http://www.reactive-streams.org/) and its successor `java.util.concurrent.Flow` are complicated, mutable and unsafe - it is not something that is desired for use over fs2.
788788
But there are times when we need use fs2 in conjunction with a different streaming library, and this is where reactive streams shines.
789789

790-
Any reactive streams system can interoperate with any other reactive streams system by exposing an `org.reactivestreams.Publisher` or an `org.reactivestreams.Subscriber`.
790+
Any reactive streams system can interoperate with any other reactive streams system by exposing a `Publisher` or a `Subscriber`.
791791

792-
The `reactive-streams` library provides instances of reactive streams compliant publishers and subscribers to ease interoperability with other streaming libraries.
792+
FS2 provides instances of reactive streams compliant publishers and subscribers to ease interoperability with other streaming libraries.
793793

794-
#### Usage
794+
#### Usage with Java Flow
795795

796796
You may require the following imports:
797797

798798
```scala mdoc:reset
799799
import fs2._
800-
import fs2.interop.reactivestreams._
801800
import cats.effect.{IO, Resource}
801+
import java.util.concurrent.Flow.Publisher
802+
```
803+
804+
To convert a `Stream` into a downstream `java.util.concurrent.Flow.Publisher`:
805+
806+
```scala mdoc
807+
val stream = Stream(1, 2, 3).covary[IO]
808+
stream.toPublisherResource
809+
```
810+
811+
To convert an upstream `java.util.concurrent.Publisher` into a `Stream`:
812+
813+
```scala mdoc
814+
val publisher: Resource[IO, Publisher[Int]] = Stream(1, 2, 3).covary[IO].toPublisherResource
815+
publisher.use { p =>
816+
Stream.fromPublisher[IO](p, chunkSize = 10).compile.toList
817+
}
818+
```
819+
820+
#### Usage with reactive streams
821+
822+
If your are integrating a library that is still using `org.reactivestreams.Publisher`, you can use `org.reactivestreams.FlowAdapters` to convert to Java Flow.
823+
824+
You may require the following imports:
825+
826+
```scala mdoc:reset
827+
import fs2._
828+
import cats.effect.{IO, Resource}
829+
import org.reactivestreams.{FlowAdapters, Publisher}
802830
```
803831

804832
To convert a `Stream` into a downstream unicast `org.reactivestreams.Publisher`:
805833

806834
```scala mdoc
807835
val stream = Stream(1, 2, 3).covary[IO]
808-
stream.toUnicastPublisher
836+
stream.toPublisherResource.map(FlowAdapters.toPublisher[Int])
809837
```
810838

811839
To convert an upstream `org.reactivestreams.Publisher` into a `Stream`:
812840

813841
```scala mdoc
814-
val publisher: Resource[IO, StreamUnicastPublisher[IO, Int]] = Stream(1, 2, 3).covary[IO].toUnicastPublisher
842+
val publisher: Resource[IO, Publisher[Int]] = Stream(1, 2, 3).covary[IO].toPublisherResource.map(FlowAdapters.toPublisher[Int])
815843
publisher.use { p =>
816-
p.toStream[IO].compile.toList
844+
Stream
845+
.fromPublisher[IO](FlowAdapters.toFlowPublisher(p), chunkSize = 10)
846+
.compile
847+
.toList
817848
}
818849
```
819850

820-
A unicast publisher must have a single subscriber only.
821-
822851
### Learning more
823852

824853
Want to learn more?

0 commit comments

Comments
 (0)