File tree Expand file tree Collapse file tree 4 files changed +45
-0
lines changed
java/org/apache/pekko/stream/io
scala/org/apache/pekko/stream/scaladsl
stream/src/main/scala/org/apache/pekko/stream Expand file tree Collapse file tree 4 files changed +45
-0
lines changed Original file line number Diff line number Diff line change @@ -44,4 +44,12 @@ public void mustBeAbleToUseAsJavaStream() throws Exception {
4444 java .util .stream .Stream <Integer > javaStream = Source .from (list ).runWith (streamSink , system );
4545 assertEquals (list , javaStream .collect (Collectors .toList ()));
4646 }
47+
48+ @ Test
49+ public void mustBeAbleToUseAsJavaStreamOnSink () throws Exception {
50+ final List <Integer > list = Arrays .asList (1 , 2 , 3 );
51+ java .util .stream .Stream <Integer > javaStream =
52+ Source .from (list ).runWith (Sink .asJavaStream (), system );
53+ assertEquals (list , javaStream .collect (Collectors .toList ()));
54+ }
4755}
Original file line number Diff line number Diff line change @@ -32,6 +32,9 @@ class SinkAsJavaStreamSpec extends StreamSpec(UnboundedMailboxConfig) {
3232 " work in happy case" in {
3333 val javaSource = Source (1 to 100 ).runWith(StreamConverters .asJavaStream())
3434 javaSource.count() should === (100L )
35+ //
36+ Source (1 to 100 ).runWith(Sink .asJavaStream())
37+ .count() should === (100L )
3538 }
3639
3740 " fail if parent stream is failed" in {
Original file line number Diff line number Diff line change @@ -212,6 +212,23 @@ object Sink {
212212 def asPublisher [T ](fanout : AsPublisher ): Sink [T , Publisher [T ]] =
213213 new Sink (scaladsl.Sink .asPublisher(fanout == AsPublisher .WITH_FANOUT ))
214214
215+ /**
216+ * Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink.
217+ * Elements emitted through the stream will be available for reading through the Java 8 ``Stream``.
218+ *
219+ * The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
220+ * ``Stream`` will cancel the inflow of this ``Sink``.
221+ *
222+ * Java 8 ``Stream`` throws exception in case reactive stream failed.
223+ *
224+ * Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
225+ * As it is interacting wit blocking API the implementation runs on a separate dispatcher
226+ * configured through the ``pekko.stream.blocking-io-dispatcher``.
227+ *
228+ * @since 2.0.0
229+ */
230+ def asJavaStream [T ](): Sink [T , java.util.stream.Stream [T ]] = new Sink (scaladsl.StreamConverters .asJavaStream())
231+
215232 /**
216233 * A `Sink` that will invoke the given procedure for each received element. The sink is materialized
217234 * into a [[java.util.concurrent.CompletionStage ]] which will be completed with `Success` when reaching the
Original file line number Diff line number Diff line change @@ -292,6 +292,23 @@ object Sink {
292292 if (fanout) new FanoutPublisherSink [T ](DefaultAttributes .fanoutPublisherSink, shape(" FanoutPublisherSink" ))
293293 else new PublisherSink [T ](DefaultAttributes .publisherSink, shape(" PublisherSink" )))
294294
295+ /**
296+ * Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink.
297+ * Elements emitted through the stream will be available for reading through the Java 8 ``Stream``.
298+ *
299+ * The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
300+ * ``Stream`` will cancel the inflow of this ``Sink``.
301+ *
302+ * If the Java 8 ``Stream`` throws exception the Pekko stream is cancelled.
303+ *
304+ * Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
305+ * As it is interacting wit blocking API the implementation runs on a separate dispatcher
306+ * configured through the ``pekko.stream.blocking-io-dispatcher``.
307+ *
308+ * @since 2.0.0
309+ */
310+ def asJavaStream [T ](): Sink [T , java.util.stream.Stream [T ]] = StreamConverters .asJavaStream()
311+
295312 /**
296313 * A `Sink` that will consume the stream and discard the elements.
297314 */
You can’t perform that action at this time.
0 commit comments