Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/src/main/paradox/stream/operators/Sink/asJavaPublisher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Sink.asJavaPublisher

Integration with Java Flow API, materializes into a `java.util.concurrent.Flow.Publisher`.

@ref[Sink operators](../index.md#sink-operators)

## Signature

@apidoc[Sink.asJavaPublisher](Sink$) { scala="#asJavaPublisher[T](fanout:Boolean):org.apache.pekko.stream.scaladsl.Sink[T,java.util.concurrent.Flow.Publisher[T]]" java="#asJavaPublisher(org.apache.pekko.stream.javadsl.asJavaPublisher)" }



## Description

This method gives you the capability to publish the data from the `Sink` through a Java Flow [Publisher](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Publisher.html).
Generally, in Pekko Streams a `Sink` is considered a subscriber, which consumes the data from source. To integrate with other Reactive Stream implementations `Sink.asJavaPublisher` provides a `Publisher` materialized value when run.
Now, the data from this publisher can be consumed by subscribing to it. We can control if we allow more than one downstream subscriber from the single running Pekko stream through the `fanout` parameter.
If you want to support a ReactiveStreams Publisher, there is @ref[Sink.asPublisher](asPublisher.md).

## Example

In the example we are using a source and then creating a Publisher. After that, we see that when `fanout` is true multiple subscribers can subscribe to it,
but when it is false only the first subscriber will be able to subscribe and others will be rejected.

Scala
: @@snip [AsPublisher.scala](/docs/src/test/scala/docs/stream/operators/sink/AsPublisher.scala) { #asPublisher }

Java
: @@snip [SinkDocExamples.java](/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #asJavaPublisher }

## Reactive Streams semantics

@@@div { .callout }

**emits** the materialized publisher

**completes** after the source is consumed and materialized publisher is created

@@@
5 changes: 1 addition & 4 deletions docs/src/main/paradox/stream/operators/Sink/asPublisher.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ Integration with Reactive Streams, materializes into a `org.reactivestreams.Publ
This method gives you the capability to publish the data from the `Sink` through a Reactive Streams [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html).
Generally, in Pekko Streams a `Sink` is considered a subscriber, which consumes the data from source. To integrate with other Reactive Stream implementations `Sink.asPublisher` provides a `Publisher` materialized value when run.
Now, the data from this publisher can be consumed by subscribing to it. We can control if we allow more than one downstream subscriber from the single running Pekko stream through the `fanout` parameter.
In Java 9, the Reactive Stream API was included in the JDK, and `Publisher` is available through [Flow.Publisher](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Publisher.html).
Since those APIs are identical but exist at different package namespaces and does not depend on the Reactive Streams package a separate publisher sink for those is available
through @scala[`org.apache.pekko.stream.scaladsl.JavaFlowSupport.Sink#asPublisher`]@java[`org.apache.pekko.stream.javadsl.JavaFlowSupport.Sink#asPublisher`].

If you want to support [Flow.Publisher](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Publisher.html), there is @ref[Sink.asJavaPublisher](asJavaPublisher.md).

## Example

Expand Down
49 changes: 49 additions & 0 deletions docs/src/main/paradox/stream/operators/Source/asJavaSubscriber.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Source.asJavaSubscriber

Integration with Java Flow API, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).

@ref[Source operators](../index.md#source-operators)

## Signature

Scala
: @@snip[JavaFlowSupport.scala](/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala) { #asSubscriber }

Java
: @@snip[AsSubscriber.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java) { #api }

## Description

If you want to create a @apidoc[Source] that gets its elements from another library that supports
the Java Flow API, you can use `Source.asJavaSubscriber`.
Each time this @apidoc[Source] is materialized, it produces a materialized value of type
@javadoc[java.util.concurrent.Flow.Subscriber](java.util.concurrent.Flow.Subscriber).
This @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) can be attached to a
Java Flow @javadoc[Publisher](java.util.concurrent.Flow.Publisher)
to populate it.

If the API you want to consume elements from provides a @javadoc[Publisher](java.util.concurrent.Flow.Publisher) instead of accepting a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber), see @ref[fromPublisher](fromPublisher.md).

@@@ note

Reactive Streams users: we prefer @javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) but you may still use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with @apidoc[Source.asSubscriber](Source$) { scala="#asSubscriber[T]:org.apache.pekko.stream.scaladsl.Source[T,org.reactivestreams.Subscriber[T]]" java="#asSubscriber()" }.

@@@

## Example

Suppose we use a database client that supports the Java Flow API,
we could create a @apidoc[Source] that queries the database for its rows. That @apidoc[Source] can then
be used for further processing, for example creating a @apidoc[Source] that contains the names of the
rows.

Note that since the database is queried for each materialization, the `rowSource` can be safely re-used.
Because both the database driver and Pekko Streams support Java Flow API,
backpressure is applied throughout the stream, preventing us from running out of memory when the database
rows are consumed slower than they are produced by the database.

Scala
: @@snip [AsSubscriber.scala](/docs/src/test/scala/docs/stream/operators/source/AsSubscriber.scala) { #imports #example }

Java
: @@snip [AsSubscriber.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java) { #imports #example }
20 changes: 7 additions & 13 deletions docs/src/main/paradox/stream/operators/Source/asSubscriber.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Source.asSubscriber

Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).
Integration with Reactive Streams, materializes into a @javadoc[Subscriber](org.reactivestreams.Subscriber).

@ref[Source operators](../index.md#source-operators)

Expand All @@ -10,25 +10,19 @@ Scala
: @@snip[JavaFlowSupport.scala](/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala) { #asSubscriber }

Java
: @@snip[JavaFlowSupport.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java) { #api }
: @@snip[AsSubscriber.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java) { #api }

## Description

If you want to create a @apidoc[Source] that gets its elements from another library that supports
[Reactive Streams](https://www.reactive-streams.org/), you can use `JavaFlowSupport.Source.asSubscriber`.
[Reactive Streams](https://www.reactive-streams.org/), you can use `Source.asSubscriber`.
Each time this @apidoc[Source] is materialized, it produces a materialized value of type
@javadoc[java.util.concurrent.Flow.Subscriber](java.util.concurrent.Flow.Subscriber).
This @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) can be attached to a
[Reactive Streams](https://www.reactive-streams.org/) @javadoc[Publisher](java.util.concurrent.Flow.Publisher)
@javadoc[org.reactivestreams.Subscriber](org.reactivestreams.Subscriber).
This @javadoc[Subscriber](org.reactivestreams.Subscriber) can be attached to a
[Reactive Streams](https://www.reactive-streams.org/) @javadoc[Publisher](org.reactivestreams.Publisher)
to populate it.

If the API you want to consume elements from provides a @javadoc[Publisher](java.util.concurrent.Flow.Publisher) instead of accepting a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber), see @ref[fromPublisher](fromPublisher.md).

@@@ note

Reactive Streams users: we prefer @javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) you may still use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with @apidoc[Source.asSubscriber](Source$) { scala="#asSubscriber[T]:org.apache.pekko.stream.scaladsl.Source[T,org.reactivestreams.Subscriber[T]]" java="#asSubscriber()" }.

@@@
If the API you want to consume elements from provides a @javadoc[Publisher](org.reactivestreams.Publisher) instead of accepting a @javadoc[Subscriber](org.reactivestreams.Subscriber), see @ref[fromPublisher](fromPublisher.md).

## Example

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Source.fromPublisher

Integration with Reactive Streams, subscribes to a @javadoc[Publisher](java.util.concurrent.Flow.Publisher).
Integration with Reactive Streams, subscribes to a @javadoc[Java Flow Publisher](java.util.concurrent.Flow.Publisher) or a @javadoc[org.reactivestreams Publisher](org.reactivestreams.Publisher).

@ref[Source operators](../index.md#source-operators)

Expand All @@ -15,12 +15,10 @@ Java

## Description

If you want to create a @apidoc[Source] that gets its elements from another library that supports
[Reactive Streams](https://www.reactive-streams.org/), you can use `JavaFlowSupport.Source.fromPublisher`.
This source will produce the elements from the @javadoc[Publisher](java.util.concurrent.Flow.Publisher),
This source will produce the elements from the @javadoc[Java Flow Publisher](java.util.concurrent.Flow.Publisher) or the @javadoc[org.reactivestreams Publisher](org.reactivestreams.Publisher),
and coordinate backpressure as needed.

If the API you want to consume elements from accepts a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) instead of providing a @javadoc[Publisher](java.util.concurrent.Flow.Publisher), see @ref[asSubscriber](asSubscriber.md).
If the API you want to consume elements from accepts a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) instead of providing a @javadoc[Publisher](java.util.concurrent.Flow.Publisher), see @ref[asJavaSubscriber](asJavaSubscriber.md).

@@@ note

Expand Down
5 changes: 4 additions & 1 deletion docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad

| |Operator|Description|
|--|--|--|
|Source|<a name="asjavasubscriber"></a>@ref[asJavaSubscriber](Source/asJavaSubscriber.md)|Integration with Java Flow API, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).|
|Source|<a name="assourcewithcontext"></a>@ref[asSourceWithContext](Source/asSourceWithContext.md)|Extracts context data from the elements of a `Source` so that it can be turned into a `SourceWithContext` which can propagate that context per element along a stream.|
|Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).|
|Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a @javadoc[Subscriber](org.reactivestreams.Subscriber).|
|Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.|
|Source|<a name="completionstage"></a>@ref[completionStage](Source/completionStage.md)|Send the single value of the `CompletionStage` when it completes and there is demand.|
|Source|<a name="completionstagesource"></a>@ref[completionStageSource](Source/completionStageSource.md)|Streams the elements of an asynchronous source once its given *completion* operator completes.|
Expand Down Expand Up @@ -51,6 +52,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl

| |Operator|Description|
|--|--|--|
|Sink|<a name="asjavapublisher"></a>@ref[asPublisher](Sink/asJavaPublisher.md)|Integration with Java Flow API, materializes into a `java.util.concurrent.Flow.Publisher`.|
|Sink|<a name="aspublisher"></a>@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.|
|Sink|<a name="cancelled"></a>@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream|
|Sink|<a name="collect"></a>@ref[collect](Sink/collect.md)|Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector).|
Expand Down Expand Up @@ -410,6 +412,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [askWithContext](ActorFlow/askWithContext.md)
* [askWithStatus](ActorFlow/askWithStatus.md)
* [askWithStatusAndContext](ActorFlow/askWithStatusAndContext.md)
* [asJavaPublisher](Sink/asJavaPublisher.md)
* [asOutputStream](StreamConverters/asOutputStream.md)
* [asPublisher](Sink/asPublisher.md)
* [asSourceWithContext](Source/asSourceWithContext.md)
Expand Down
10 changes: 3 additions & 7 deletions docs/src/main/paradox/stream/reactive-streams-interop.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@ back pressure.
Since Java 9 the APIs of Reactive Streams has been included in the Java Standard library, under the `java.util.concurrent.Flow`
namespace. For Java 8 there is instead a separate Reactive Streams artifact with the same APIs in the package `org.reactivestreams`.

Pekko streams provides interoperability for both these two API versions, the Reactive Streams interfaces directly through factories on the
regular `Source` and `Sink` APIs. For the Java 9 and later built in interfaces there is a separate set of factories in
@scala[`org.apache.pekko.stream.scaladsl.JavaFlowSupport`]@java[`org.apache.pekko.stream.javadsl.JavaFlowSupport`].
Pekko streams provides interoperability for both of these API versions directly through factories on the
regular `Source` and `Sink` APIs.

In the following samples the standalone Reactive Stream API factories has been used but each such call can be replaced with the
corresponding method from `JavaFlowSupport` and the JDK @scala[`java.util.concurrent.Flow._`]@java[`java.util.concurrent.Flow.*`] interfaces.

Note that it is not possible to use `JavaFlowSupport` on Java 8 since the needed interfaces simply is not available in the Java standard library.
In the following samples, the standalone Reactive Stream API factories has been used but the code needed to use the `java.util.concurrent.Flow` equivalents is very similar.

The two most important interfaces in Reactive Streams are the `Publisher` and `Subscriber`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,11 @@

// #imports
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import org.apache.pekko.NotUsed;
import org.apache.pekko.stream.javadsl.JavaFlowSupport;
// #imports
import org.apache.pekko.stream.javadsl.Source;

public interface AsSubscriber {
// We are 'faking' the JavaFlowSupport API here so we can include the signature as a snippet in
// the API,
// because we're not publishing those (jdk9+) classes in our API docs yet.
static class JavaFlowSupport {
public static final class Source {
public
// #api
static <T> org.apache.pekko.stream.javadsl.Source<T, Subscriber<T>> asSubscriber()
// #api
{
return org.apache.pekko.stream.javadsl.JavaFlowSupport.Source.<T>asSubscriber();
}
}
}

static class Row {
public String getField(String fieldName) {
throw new UnsupportedOperationException("Not implemented in sample");
Expand All @@ -54,7 +37,7 @@ Publisher<Row> fetchRows() {
// #example
class Example {
Source<Row, NotUsed> rowSource =
JavaFlowSupport.Source.<Row>asSubscriber()
Source.<Row>asJavaSubscriber()
.mapMaterializedValue(
subscriber -> {
// For each materialization, fetch the rows from the database:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,11 @@
// #imports
import java.util.concurrent.Flow.Publisher;
import org.apache.pekko.NotUsed;
import org.apache.pekko.stream.javadsl.JavaFlowSupport;
import org.apache.pekko.stream.javadsl.Source;

// #imports

public interface FromPublisher {
// We are 'faking' the JavaFlowSupport API here so we can include the signature as a snippet in
// the API,
// because we're not publishing those (jdk9+) classes in our API docs yet.
static class JavaFlowSupport {
public static final class Source {
public
// #api
static <T> org.apache.pekko.stream.javadsl.Source<T, NotUsed> fromPublisher(
Publisher<T> publisher)
// #api
{
return org.apache.pekko.stream.javadsl.JavaFlowSupport.Source.<T>fromPublisher(publisher);
}
}
}

static class Row {
public String getField(String fieldName) {
throw new UnsupportedOperationException("Not implemented in sample");
Expand All @@ -58,8 +41,7 @@ public Source<String, NotUsed> names() {
// A new subscriber will subscribe to the supplied publisher for each
// materialization, so depending on whether the database client supports
// this the Source can be materialized more than once.
return JavaFlowSupport.Source.<Row>fromPublisher(databaseClient.fetchRows())
.map(row -> row.getField("name"));
return Source.<Row>fromPublisher(databaseClient.fetchRows()).map(row -> row.getField("name"));
}
}
// #example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.util.concurrent.Flow.Publisher
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
import pekko.stream.scaladsl.JavaFlowSupport

//#imports

Expand All @@ -35,7 +34,7 @@ object AsSubscriber {

// #example
val rowSource: Source[Row, NotUsed] =
JavaFlowSupport.Source.asSubscriber
Source.asJavaSubscriber
.mapMaterializedValue((subscriber: Subscriber[Row]) => {
// For each materialization, fetch the rows from the database:
val rows: Publisher[Row] = databaseClient.fetchRows()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.util.concurrent.Flow.Publisher
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
import pekko.stream.scaladsl.JavaFlowSupport

//#imports

Expand All @@ -38,7 +37,7 @@ object FromPublisher {
// A new subscriber will subscribe to the supplied publisher for each
// materialization, so depending on whether the database client supports
// this the Source can be materialized more than once.
JavaFlowSupport.Source.fromPublisher(databaseClient.fetchRows())
Source.fromPublisher(databaseClient.fetchRows())
.map(row => row.name)
// #example
}
6 changes: 3 additions & 3 deletions project/VerifyJDK9Classes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ object VerifyJDK9Classes {
|object VerifyJDK9Classes {
| def main(args: Array[String]): Unit = {
| import org.apache.pekko.actor.ActorSystem
| import org.apache.pekko.stream.scaladsl.{ JavaFlowSupport, Source }
| import org.apache.pekko.stream.scaladsl.{ Sink, Source }
|
| import java.lang.System.exit
| import scala.concurrent.Await
| import scala.concurrent.duration.DurationInt
| implicit val system: ActorSystem = ActorSystem.create("test")
| val future = Source(1 to 3).runWith(
| JavaFlowSupport.Sink.asPublisher[Int](fanout = false).mapMaterializedValue { p =>
| JavaFlowSupport.Source.fromPublisher(p).runFold(0)(_ + _)
| Sink.asJavaPublisher[Int](fanout = false).mapMaterializedValue { p =>
| Source.fromPublisher(p).runFold(0)(_ + _)
| })
|
| val result = Await.result(future, 3.seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ import java.util.concurrent.{ Flow => JavaFlow }

import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.{ JavaFlowSupport, Sink, Source }
import pekko.stream.scaladsl.{ Sink, Source }

import org.reactivestreams._

class IterablePublisherViaJavaFlowPublisherTest extends PekkoPublisherVerification[Int] {

override def createPublisher(elements: Long): Publisher[Int] = {
val sourceViaJavaFlowPublisher: JavaFlow.Publisher[Int] = Source(iterable(elements))
.runWith(JavaFlowSupport.Sink.asPublisher(fanout = false))
.runWith(Sink.asJavaPublisher(fanout = false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, much clear


val javaFlowPublisherIntoPekkoSource: Source[Int, NotUsed] =
JavaFlowSupport.Source.fromPublisher(sourceViaJavaFlowPublisher)
Source.fromPublisher(sourceViaJavaFlowPublisher)

javaFlowPublisherIntoPekkoSource
.runWith(Sink.asPublisher(false)) // back as RS Publisher
Expand Down
Loading
Loading