Skip to content

Commit 50850b5

Browse files
committed
chore: Tweak withAttribuets in Flow
1 parent def84bf commit 50850b5

File tree

1 file changed

+14
-12
lines changed
  • stream/src/main/scala/org/apache/pekko/stream/scaladsl

1 file changed

+14
-12
lines changed

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala

+14-12
Original file line numberDiff line numberDiff line change
@@ -1149,7 +1149,8 @@ trait FlowOps[+Out, +Mat] {
11491149
* @param onComplete a function that transforms the ongoing state into an optional output element
11501150
*/
11511151
def statefulMap[S, T](create: () => S)(f: (S, Out) => (S, T), onComplete: S => Option[T]): Repr[T] =
1152-
via(new StatefulMap[S, Out, T](create, f, onComplete).withAttributes(DefaultAttributes.statefulMap))
1152+
via(new StatefulMap[S, Out, T](create, f, onComplete)
1153+
.withAttributes(DefaultAttributes.statefulMap and SourceLocation.forLambda(f)))
11531154

11541155
/**
11551156
* Transform each stream element with the help of a resource.
@@ -1358,12 +1359,12 @@ trait FlowOps[+Out, +Mat] {
13581359
def mapAsyncPartitioned[T, P](parallelism: Int)(
13591360
partitioner: Out => P)(
13601361
f: (Out, P) => Future[T]): Repr[T] = {
1361-
(if (parallelism == 1) {
1362-
via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem))))
1363-
} else {
1364-
via(new MapAsyncPartitioned(parallelism, orderedOutput = true, partitioner, f))
1365-
})
1366-
.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f))
1362+
val graph: Graph[FlowShape[Out, T], _] = if (parallelism == 1) {
1363+
MapAsyncUnordered(1, elem => f(elem, partitioner(elem)))
1364+
} else {
1365+
new MapAsyncPartitioned(parallelism, orderedOutput = true, partitioner, f)
1366+
}
1367+
via(graph.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f)))
13671368
}
13681369

13691370
/**
@@ -1396,11 +1397,12 @@ trait FlowOps[+Out, +Mat] {
13961397
def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
13971398
partitioner: Out => P)(
13981399
f: (Out, P) => Future[T]): Repr[T] = {
1399-
(if (parallelism == 1) {
1400-
via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem))))
1401-
} else {
1402-
via(new MapAsyncPartitioned(parallelism, orderedOutput = false, partitioner, f))
1403-
}).withAttributes(DefaultAttributes.mapAsyncPartitionUnordered and SourceLocation.forLambda(f))
1400+
val graph: Graph[FlowShape[Out, T], _] = if (parallelism == 1) {
1401+
MapAsyncUnordered(1, elem => f(elem, partitioner(elem)))
1402+
} else {
1403+
new MapAsyncPartitioned(parallelism, orderedOutput = false, partitioner, f)
1404+
}
1405+
via(graph.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f)))
14041406
}
14051407

14061408
/**

0 commit comments

Comments
 (0)