Skip to content

Commit 0e99b81

Browse files
committed
chore: Tweak withAttribuets in Flow
1 parent 96f70c4 commit 0e99b81

File tree

3 files changed

+103
-19
lines changed

3 files changed

+103
-19
lines changed

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala

+25-4
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@ import scala.util.control.NoStackTrace
2424

2525
import org.apache.pekko
2626
import pekko.Done
27-
import pekko.stream.AbruptStageTerminationException
28-
import pekko.stream.ActorAttributes
29-
import pekko.stream.ActorMaterializer
30-
import pekko.stream.Supervision
27+
import pekko.stream.{ AbruptStageTerminationException, ActorAttributes, ActorMaterializer, ClosedShape, Supervision }
3128
import pekko.stream.testkit.StreamSpec
3229
import pekko.stream.testkit.TestSubscriber
3330
import pekko.stream.testkit.Utils.TE
@@ -434,4 +431,28 @@ class FlowStatefulMapSpec extends StreamSpec {
434431
closedCounter.get() shouldBe 1
435432
}
436433
}
434+
435+
"support junction output ports" in {
436+
val source = Source(List((1, 1), (2, 2)))
437+
val g = RunnableGraph.fromGraph(GraphDSL.createGraph(TestSink.probe[(Int, Int)]) { implicit b => sink =>
438+
import GraphDSL.Implicits._
439+
val unzip = b.add(Unzip[Int, Int]())
440+
val zip = b.add(Zip[Int, Int]())
441+
val s = b.add(source)
442+
// format: OFF
443+
s ~> unzip.in
444+
unzip.out0 ~> zip.in0
445+
unzip.out1 ~> zip.in1
446+
zip.out.statefulMap(() => None)((_, elem) => (None, elem), _ => None) ~> sink.in
447+
// format: ON
448+
449+
ClosedShape
450+
})
451+
g.run()
452+
.request(2)
453+
.expectNext((1, 1))
454+
.expectNext((2, 2))
455+
.expectComplete()
456+
}
457+
437458
}

stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala

+64-3
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,23 @@
1717

1818
package org.apache.pekko.stream
1919

20-
import org.apache.pekko.actor.typed.ActorSystem
21-
import org.apache.pekko.actor.typed.scaladsl.Behaviors
22-
import org.apache.pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, Sink, Source, SourceWithContext }
20+
import org.apache.pekko
21+
import pekko.actor.typed.ActorSystem
22+
import pekko.actor.typed.scaladsl.Behaviors
23+
import pekko.stream.scaladsl.{
24+
Flow,
25+
FlowWithContext,
26+
GraphDSL,
27+
Keep,
28+
RunnableGraph,
29+
Sink,
30+
Source,
31+
SourceWithContext,
32+
Unzip,
33+
Zip
34+
}
35+
import pekko.stream.testkit.scaladsl.TestSink
36+
2337
import org.scalacheck.{ Arbitrary, Gen }
2438
import org.scalatest.BeforeAndAfterAll
2539
import org.scalatest.concurrent.ScalaFutures
@@ -29,6 +43,7 @@ import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
2943

3044
import java.time.Instant
3145
import java.util.concurrent.Executors
46+
3247
import scala.annotation.nowarn
3348
import scala.concurrent.duration.{ DurationInt, FiniteDuration }
3449
import scala.concurrent.{ blocking, ExecutionContext, Future }
@@ -439,6 +454,52 @@ class MapAsyncPartitionedSpec
439454
.futureValue shouldBe Seq(1 -> "A")
440455
}
441456

457+
it should "support junction output ports with mapAsyncPartitioned" in {
458+
val source = Source(List((1, 1), (2, 2)))
459+
val g = RunnableGraph.fromGraph(GraphDSL.createGraph(TestSink.probe[(Int, Int)](system.classicSystem)) {
460+
implicit b => sink =>
461+
import GraphDSL.Implicits._
462+
val unzip = b.add(Unzip[Int, Int]())
463+
val zip = b.add(Zip[Int, Int]())
464+
val s = b.add(source)
465+
// format: OFF
466+
s ~> unzip.in
467+
unzip.out0 ~> zip.in0
468+
unzip.out1 ~> zip.in1
469+
zip.out.mapAsyncPartitioned(1)(_ => 1)((elem, _) => Future.successful(elem)) ~> sink.in
470+
// format: ON
471+
ClosedShape
472+
})
473+
g.run()
474+
.request(2)
475+
.expectNext((1, 1))
476+
.expectNext((2, 2))
477+
.expectComplete()
478+
}
479+
480+
it should "support junction output ports with mapAsyncPartitionedUnordered" in {
481+
val source = Source(List((1, 1), (2, 2)))
482+
val g = RunnableGraph.fromGraph(GraphDSL.createGraph(TestSink.probe[(Int, Int)](system.classicSystem)) {
483+
implicit b => sink =>
484+
import GraphDSL.Implicits._
485+
val unzip = b.add(Unzip[Int, Int]())
486+
val zip = b.add(Zip[Int, Int]())
487+
val s = b.add(source)
488+
// format: OFF
489+
s ~> unzip.in
490+
unzip.out0 ~> zip.in0
491+
unzip.out1 ~> zip.in1
492+
zip.out.mapAsyncPartitionedUnordered(1)(_ => 1)((elem, _) => Future.successful(elem)) ~> sink.in
493+
// format: ON
494+
ClosedShape
495+
})
496+
g.run()
497+
.request(2)
498+
.expectNext((1, 1))
499+
.expectNext((2, 2))
500+
.expectComplete()
501+
}
502+
442503
private implicit class MapWrapper[K, V](map: Map[K, V]) {
443504
@nowarn("msg=deprecated")
444505
def mapValues2[W](f: V => W) = map.mapValues(f)

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala

+14-12
Original file line numberDiff line numberDiff line change
@@ -1149,7 +1149,8 @@ trait FlowOps[+Out, +Mat] {
11491149
* @param onComplete a function that transforms the ongoing state into an optional output element
11501150
*/
11511151
def statefulMap[S, T](create: () => S)(f: (S, Out) => (S, T), onComplete: S => Option[T]): Repr[T] =
1152-
via(new StatefulMap[S, Out, T](create, f, onComplete).withAttributes(DefaultAttributes.statefulMap))
1152+
via(new StatefulMap[S, Out, T](create, f, onComplete)
1153+
.withAttributes(DefaultAttributes.statefulMap and SourceLocation.forLambda(f)))
11531154

11541155
/**
11551156
* Transform each stream element with the help of a resource.
@@ -1358,12 +1359,12 @@ trait FlowOps[+Out, +Mat] {
13581359
def mapAsyncPartitioned[T, P](parallelism: Int)(
13591360
partitioner: Out => P)(
13601361
f: (Out, P) => Future[T]): Repr[T] = {
1361-
(if (parallelism == 1) {
1362-
via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem))))
1363-
} else {
1364-
via(new MapAsyncPartitioned(parallelism, orderedOutput = true, partitioner, f))
1365-
})
1366-
.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f))
1362+
val graph: Graph[FlowShape[Out, T], _] = if (parallelism == 1) {
1363+
MapAsyncUnordered(1, elem => f(elem, partitioner(elem)))
1364+
} else {
1365+
new MapAsyncPartitioned(parallelism, orderedOutput = true, partitioner, f)
1366+
}
1367+
via(graph.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f)))
13671368
}
13681369

13691370
/**
@@ -1396,11 +1397,12 @@ trait FlowOps[+Out, +Mat] {
13961397
def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
13971398
partitioner: Out => P)(
13981399
f: (Out, P) => Future[T]): Repr[T] = {
1399-
(if (parallelism == 1) {
1400-
via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem))))
1401-
} else {
1402-
via(new MapAsyncPartitioned(parallelism, orderedOutput = false, partitioner, f))
1403-
}).withAttributes(DefaultAttributes.mapAsyncPartitionUnordered and SourceLocation.forLambda(f))
1400+
val graph: Graph[FlowShape[Out, T], _] = if (parallelism == 1) {
1401+
MapAsyncUnordered(1, elem => f(elem, partitioner(elem)))
1402+
} else {
1403+
new MapAsyncPartitioned(parallelism, orderedOutput = false, partitioner, f)
1404+
}
1405+
via(graph.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f)))
14041406
}
14051407

14061408
/**

0 commit comments

Comments
 (0)