Skip to content

Commit bc9c42b

Browse files
authored
chore: Fix zipWithIndex when use in graphdsl. (#32626)
1 parent b931dfd commit bc9c42b

File tree

2 files changed

+45
-7
lines changed

2 files changed

+45
-7
lines changed

akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithIndexSpec.scala

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
package akka.stream.scaladsl
66

7-
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, Materializer }
7+
import akka.stream.testkit.scaladsl.TestSink
8+
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, Materializer, UniformFanInShape }
89
import akka.stream.testkit.{ StreamSpec, TestSubscriber }
10+
911
import scala.annotation.nowarn
1012

1113
@nowarn // keep unused imports
@@ -39,6 +41,38 @@ class FlowZipWithIndexSpec extends StreamSpec {
3941
probe.expectComplete()
4042
}
4143

44+
"will works in GraphDSL" in {
45+
import akka.stream.ClosedShape
46+
val pickMaxOfThree = GraphDSL.create() { implicit b =>
47+
import GraphDSL.Implicits._
48+
49+
val zip1 = b.add(ZipWith[Int, Int, Int](math.max))
50+
val zip2 = b.add(ZipWith[Int, Int, Int](math.max))
51+
zip1.out ~> zip2.in0
52+
53+
UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
54+
}
55+
56+
val resultSink = TestSink[(Int, Long)]()
57+
58+
val g = RunnableGraph.fromGraph(GraphDSL.createGraph(resultSink) { implicit b => sink =>
59+
import GraphDSL.Implicits._
60+
61+
// importing the partial graph will return its shape (inlets & outlets)
62+
val pm3 = b.add(pickMaxOfThree)
63+
64+
Source.single(1) ~> pm3.in(0)
65+
Source.single(2) ~> pm3.in(1)
66+
Source.single(3) ~> pm3.in(2)
67+
pm3.out.zipWithIndex ~> sink.in
68+
ClosedShape
69+
})
70+
val p = g.run()
71+
p.request(1)
72+
p.expectNext((3, 0L))
73+
p.expectComplete()
74+
}
75+
4276
"work in fruit example" in {
4377
//#zip-with-index
4478
Source(List("apple", "orange", "banana")).zipWithIndex.runWith(Sink.foreach(println))

akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,8 +1080,9 @@ trait FlowOps[+Out, +Mat] {
10801080
* @param onComplete a function that transforms the ongoing state into an optional output element
10811081
*/
10821082
def statefulMap[S, T](create: () => S)(f: (S, Out) => (S, T), onComplete: S => Option[T]): Repr[T] =
1083-
via(new StatefulMap[S, Out, T](create, f, onComplete))
1084-
.withAttributes(DefaultAttributes.statefulMap and SourceLocation.forLambda(f))
1083+
via(
1084+
new StatefulMap[S, Out, T](create, f, onComplete)
1085+
.withAttributes(DefaultAttributes.statefulMap and SourceLocation.forLambda(f)))
10851086

10861087
/**
10871088
* Transform each stream element with the help of a resource.
@@ -1116,8 +1117,8 @@ trait FlowOps[+Out, +Mat] {
11161117
*/
11171118
def mapWithResource[R, T](create: () => R)(f: (R, Out) => T, close: R => Option[T]): Repr[T] =
11181119
via(
1119-
new StatefulMap[R, Out, T](create, (resource, out) => (resource, f(resource, out)), resource => close(resource)))
1120-
.withAttributes(DefaultAttributes.mapWithResource and SourceLocation.forLambda(f))
1120+
new StatefulMap[R, Out, T](create, (resource, out) => (resource, f(resource, out)), resource => close(resource))
1121+
.withAttributes(DefaultAttributes.mapWithResource and SourceLocation.forLambda(f)))
11211122

11221123
/**
11231124
* Transform each input element into an `Iterable` of output elements that is
@@ -2981,8 +2982,11 @@ trait FlowOps[+Out, +Mat] {
29812982
* '''Cancels when''' downstream cancels
29822983
*/
29832984
def zipWithIndex: Repr[(Out, Long)] =
2984-
statefulMap[Long, (Out, Long)](() => 0L)((index, out) => (index + 1L, (out, index)), _ => None)
2985-
.withAttributes(DefaultAttributes.zipWithIndex)
2985+
via(
2986+
new StatefulMap[Long, Out, (Out, Long)](
2987+
() => 0L,
2988+
(index, out) => (index + 1L, (out, index)),
2989+
ConstantFun.scalaAnyToNone).withAttributes(DefaultAttributes.zipWithIndex))
29862990

29872991
/**
29882992
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].

0 commit comments

Comments
 (0)