Skip to content

Commit aa70383

Browse files
committed
feat: Add asJavaStream to Sink
1 parent 6e43658 commit aa70383

File tree

4 files changed

+41
-0
lines changed

4 files changed

+41
-0
lines changed

stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,12 @@ public void mustBeAbleToUseAsJavaStream() throws Exception {
4444
java.util.stream.Stream<Integer> javaStream = Source.from(list).runWith(streamSink, system);
4545
assertEquals(list, javaStream.collect(Collectors.toList()));
4646
}
47+
48+
@Test
49+
public void mustBeAbleToUseAsJavaStreamOnSink() throws Exception {
50+
final List<Integer> list = Arrays.asList(1, 2, 3);
51+
java.util.stream.Stream<Integer> javaStream =
52+
Source.from(list).runWith(Sink.asJavaStream(), system);
53+
assertEquals(list, javaStream.collect(Collectors.toList()));
54+
}
4755
}

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ class SinkAsJavaStreamSpec extends StreamSpec(UnboundedMailboxConfig) {
3232
"work in happy case" in {
3333
val javaSource = Source(1 to 100).runWith(StreamConverters.asJavaStream())
3434
javaSource.count() should ===(100L)
35+
//
36+
Source(1 to 100).runWith(Sink.asJavaStream())
37+
.count() should ===(100L)
3538
}
3639

3740
"fail if parent stream is failed" in {

stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,21 @@ object Sink {
212212
def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] =
213213
new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT))
214214

215+
/**
216+
* Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink.
217+
* Elements emitted through the stream will be available for reading through the Java 8 ``Stream``.
218+
*
219+
* The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
220+
* ``Stream`` will cancel the inflow of this ``Sink``.
221+
*
222+
* Java 8 ``Stream`` throws exception in case reactive stream failed.
223+
*
224+
* Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
225+
* As it is interacting wit blocking API the implementation runs on a separate dispatcher
226+
* configured through the ``pekko.stream.blocking-io-dispatcher``.
227+
*/
228+
def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = new Sink(scaladsl.StreamConverters.asJavaStream())
229+
215230
/**
216231
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
217232
* into a [[java.util.concurrent.CompletionStage]] which will be completed with `Success` when reaching the

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,21 @@ object Sink {
292292
if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink"))
293293
else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink")))
294294

295+
/**
296+
* Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink.
297+
* Elements emitted through the stream will be available for reading through the Java 8 ``Stream``.
298+
*
299+
* The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
300+
* ``Stream`` will cancel the inflow of this ``Sink``.
301+
*
302+
* If the Java 8 ``Stream`` throws exception the Pekko stream is cancelled.
303+
*
304+
* Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
305+
* As it is interacting wit blocking API the implementation runs on a separate dispatcher
306+
* configured through the ``pekko.stream.blocking-io-dispatcher``.
307+
*/
308+
def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = StreamConverters.asJavaStream()
309+
295310
/**
296311
* A `Sink` that will consume the stream and discard the elements.
297312
*/

0 commit comments

Comments
 (0)