Skip to content

Commit f9ad446

Browse files
committed
Fix uncurried stream ops in javadsl
1 parent cd55767 commit f9ad446

File tree

7 files changed

+42
-43
lines changed

7 files changed

+42
-43
lines changed

stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1254,7 +1254,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
12541254
*
12551255
* '''Cancels when''' downstream cancels
12561256
*/
1257-
def groupedWeighted(minWeight: Long)(
1257+
def groupedWeighted(minWeight: Long,
12581258
costFn: java.util.function.Function[Out, java.lang.Long]): javadsl.Flow[In, java.util.List[Out], Mat] =
12591259
new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
12601260

@@ -1311,7 +1311,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
13111311
*
13121312
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
13131313
*/
1314-
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.Flow[In, Out, Mat] = {
1314+
def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.Flow[In, Out, Mat] = {
13151315
new Flow(delegate.limitWeighted(n)(costFn.apply))
13161316
}
13171317

@@ -1355,7 +1355,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
13551355
*
13561356
* '''Cancels when''' downstream cancels
13571357
*/
1358-
def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
1358+
def scan[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
13591359
new Flow(delegate.scan(zero)(f.apply))
13601360

13611361
/**
@@ -1386,7 +1386,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
13861386
*
13871387
* See also [[#scan]]
13881388
*/
1389-
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
1389+
def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
13901390
new Flow(delegate.scanAsync(zero) { (out, in) =>
13911391
f(out, in).asScala
13921392
})
@@ -1412,7 +1412,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
14121412
*
14131413
* '''Cancels when''' downstream cancels
14141414
*/
1415-
def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
1415+
def fold[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
14161416
new Flow(delegate.fold(zero)(f.apply))
14171417

14181418
/**
@@ -1464,7 +1464,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
14641464
*
14651465
* '''Cancels when''' downstream cancels
14661466
*/
1467-
def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
1467+
def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
14681468
new Flow(delegate.foldAsync(zero) { (out, in) =>
14691469
f(out, in).asScala
14701470
})
@@ -2636,7 +2636,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
26362636
@deprecated(
26372637
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
26382638
since = "1.1.0")
2639-
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
2639+
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
26402640
new SubFlow(delegate.splitWhen(substreamCancelStrategy)(p.test))
26412641

26422642
/**
@@ -2697,7 +2697,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
26972697
@deprecated(
26982698
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
26992699
since = "1.1.0")
2700-
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
2700+
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
27012701
new SubFlow(delegate.splitAfter(substreamCancelStrategy)(p.test))
27022702

27032703
/**
@@ -3497,7 +3497,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
34973497
*
34983498
* '''Cancels when''' downstream cancels
34993499
*/
3500-
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)(
3500+
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U,
35013501
matF: (Mat, Mat2) => Mat3): Flow[In, Pair[A, U], Mat3] =
35023502
new Flow(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, u) => Pair.create(a, u) })
35033503

@@ -4169,7 +4169,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
41694169
* from downstream. It fails with the same error when received error message from
41704170
* downstream.
41714171
*/
4172-
def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] =
4172+
def watchTermination[M](matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] =
41734173
new Flow(delegate.watchTermination()((left, right) => matF(left, right.asJava)))
41744174

41754175
/**
@@ -4180,7 +4180,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
41804180
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
41814181
*/
41824182
@deprecated("Use monitor() or monitorMat(combine) instead", "Akka 2.5.17")
4183-
def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] =
4183+
def monitor[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] =
41844184
new Flow(delegate.monitorMat(combinerToScala(combine)))
41854185

41864186
/**
@@ -4504,7 +4504,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
45044504
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
45054505
*/
45064506
@ApiMayChange
4507-
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
4507+
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
45084508
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
45094509
harvest: function.Function[Agg, Emit],
45104510
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Flow[In, Emit, Mat] =

stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,7 @@ object GraphDSL extends GraphCreate {
742742
new GenericGraph(s, gbuilder.delegate.result(s))
743743
}
744744

745-
final class Builder[+Mat]()(private[stream] implicit val delegate: scaladsl.GraphDSL.Builder[Mat]) { self =>
745+
final class Builder[+Mat](private[stream] implicit val delegate: scaladsl.GraphDSL.Builder[Mat]) { self =>
746746
import pekko.stream.scaladsl.GraphDSL.Implicits._
747747

748748
/**

stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ object Sink {
204204
* normal end of the stream, or completed with `Failure` if there is a failure signaled in
205205
* the stream.
206206
*/
207-
def foreachAsync[T](parallelism: Int)(
207+
def foreachAsync[T](parallelism: Int,
208208
f: function.Function[T, CompletionStage[Void]]): Sink[T, CompletionStage[Done]] =
209209
new Sink(
210210
scaladsl.Sink
@@ -225,7 +225,7 @@ object Sink {
225225
@deprecated(
226226
"Use `foreachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a CompletionStage or using CompletableFuture.supplyAsync.",
227227
since = "Akka 2.5.17")
228-
def foreachParallel[T](parallel: Int)(f: function.Procedure[T])(
228+
def foreachParallel[T](parallel: Int, f: function.Procedure[T],
229229
ec: ExecutionContext): Sink[T, CompletionStage[Done]] =
230230
new Sink(scaladsl.Sink.foreachParallel(parallel)(f.apply)(ec).toCompletionStage())
231231

stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1987,7 +1987,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
19871987
*
19881988
* '''Cancels when''' downstream cancels
19891989
*/
1990-
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)(
1990+
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U,
19911991
matF: (Mat, Mat2) => Mat3): Source[Pair[A, U], Mat3] =
19921992
new Source(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, u) => Pair.create(a, u) })
19931993

@@ -3002,7 +3002,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
30023002
*
30033003
* '''Cancels when''' downstream cancels
30043004
*/
3005-
def groupedWeighted(minWeight: Long)(costFn: java.util.function.Function[Out, java.lang.Long])
3005+
def groupedWeighted(minWeight: Long, costFn: java.util.function.Function[Out, java.lang.Long])
30063006
: javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
30073007
new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava))
30083008

@@ -3055,9 +3055,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
30553055
*
30563056
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
30573057
*/
3058-
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.Source[Out, Mat] = {
3058+
def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.Source[Out, Mat] =
30593059
new Source(delegate.limitWeighted(n)(costFn.apply))
3060-
}
30613060

30623061
/**
30633062
* Apply a sliding window over the stream and return the windows as groups of elements, with the last group
@@ -3099,7 +3098,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
30993098
*
31003099
* '''Cancels when''' downstream cancels
31013100
*/
3102-
def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
3101+
def scan[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
31033102
new Source(delegate.scan(zero)(f.apply))
31043103

31053104
/**
@@ -3130,7 +3129,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
31303129
*
31313130
* See also [[FlowOps#scan]]
31323131
*/
3133-
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
3132+
def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
31343133
new Source(delegate.scanAsync(zero) { (out, in) =>
31353134
f(out, in).asScala
31363135
})
@@ -3156,7 +3155,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
31563155
*
31573156
* '''Cancels when''' downstream cancels
31583157
*/
3159-
def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
3158+
def fold[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
31603159
new Source(delegate.fold(zero)(f.apply))
31613160

31623161
/**
@@ -3206,7 +3205,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
32063205
*
32073206
* '''Cancels when''' downstream cancels
32083207
*/
3209-
def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
3208+
def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
32103209
new Source(delegate.foldAsync(zero) { (out, in) =>
32113210
f(out, in).asScala
32123211
})
@@ -4142,7 +4141,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
41424141
@deprecated(
41434142
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
41444143
since = "1.1.0")
4145-
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] =
4144+
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubSource[Out, Mat] =
41464145
new SubSource(delegate.splitWhen(substreamCancelStrategy)(p.test))
41474146

41484147
/**
@@ -4202,7 +4201,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
42024201
@deprecated(
42034202
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
42044203
since = "1.1.0")
4205-
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] =
4204+
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubSource[Out, Mat] =
42064205
new SubSource(delegate.splitAfter(substreamCancelStrategy)(p.test))
42074206

42084207
/**
@@ -4738,7 +4737,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
47384737
* from downstream. It fails with the same error when received error message from
47394738
* downstream.
47404739
*/
4741-
def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] =
4740+
def watchTermination[M](matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] =
47424741
new Source(delegate.watchTermination()((left, right) => matF(left, right.asJava)))
47434742

47444743
/**
@@ -4748,7 +4747,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
47484747
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
47494748
*/
47504749
@deprecated("Use monitor() or monitorMat(combine) instead", "Akka 2.5.17")
4751-
def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] =
4750+
def monitor[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] =
47524751
new Source(delegate.monitorMat(combinerToScala(combine)))
47534752

47544753
/**
@@ -5052,7 +5051,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
50525051
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
50535052
*/
50545053
@ApiMayChange
5055-
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
5054+
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
50565055
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
50575056
harvest: function.Function[Agg, Emit],
50585057
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] =

stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ object StreamConverters {
267267
* Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able
268268
* to handle multiple invocations.
269269
*/
270-
def javaCollectorParallelUnordered[T, R](parallelism: Int)(
270+
def javaCollectorParallelUnordered[T, R](parallelism: Int,
271271
collector: function.Creator[Collector[T, _ <: Any, R]]): Sink[T, CompletionStage[R]] =
272272
new Sink(
273273
scaladsl.StreamConverters

stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ class SubFlow[In, Out, Mat](
638638
*
639639
* '''Cancels when''' downstream cancels
640640
*/
641-
def groupedWeighted(minWeight: Long)(
641+
def groupedWeighted(minWeight: Long,
642642
costFn: function.Function[Out, java.lang.Long]): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
643643
new SubFlow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
644644

@@ -691,7 +691,7 @@ class SubFlow[In, Out, Mat](
691691
*
692692
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
693693
*/
694-
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.SubFlow[In, Out, Mat] = {
694+
def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.SubFlow[In, Out, Mat] = {
695695
new SubFlow(delegate.limitWeighted(n)(costFn.apply))
696696
}
697697

@@ -735,7 +735,7 @@ class SubFlow[In, Out, Mat](
735735
*
736736
* '''Cancels when''' downstream cancels
737737
*/
738-
def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
738+
def scan[T](zero: T, f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
739739
new SubFlow(delegate.scan(zero)(f.apply))
740740

741741
/**
@@ -766,7 +766,7 @@ class SubFlow[In, Out, Mat](
766766
*
767767
* See also [[#scan]]
768768
*/
769-
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
769+
def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
770770
new SubFlow(delegate.scanAsync(zero) { (out, in) =>
771771
f(out, in).asScala
772772
})
@@ -792,7 +792,7 @@ class SubFlow[In, Out, Mat](
792792
*
793793
* '''Cancels when''' downstream cancels
794794
*/
795-
def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
795+
def fold[T](zero: T, f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
796796
new SubFlow(delegate.fold(zero)(f.apply))
797797

798798
/**
@@ -844,7 +844,7 @@ class SubFlow[In, Out, Mat](
844844
*
845845
* '''Cancels when''' downstream cancels
846846
*/
847-
def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
847+
def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
848848
new SubFlow(delegate.foldAsync(zero) { (out, in) =>
849849
f(out, in).asScala
850850
})
@@ -3031,7 +3031,7 @@ class SubFlow[In, Out, Mat](
30313031
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
30323032
*/
30333033
@ApiMayChange
3034-
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
3034+
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
30353035
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
30363036
harvest: function.Function[Agg, Emit],
30373037
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubFlow[In, Emit, Mat] =

stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,7 @@ class SubSource[Out, Mat](
629629
*
630630
* '''Cancels when''' downstream cancels
631631
*/
632-
def groupedWeighted(minWeight: Long)(
632+
def groupedWeighted(minWeight: Long,
633633
costFn: function.Function[Out, java.lang.Long]): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
634634
new SubSource(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
635635

@@ -697,7 +697,7 @@ class SubSource[Out, Mat](
697697
*
698698
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
699699
*/
700-
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.SubSource[Out, Mat] = {
700+
def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.SubSource[Out, Mat] = {
701701
new SubSource(delegate.limitWeighted(n)(costFn.apply))
702702
}
703703

@@ -726,7 +726,7 @@ class SubSource[Out, Mat](
726726
*
727727
* '''Cancels when''' downstream cancels
728728
*/
729-
def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
729+
def scan[T](zero: T, f: function.Function2[T, Out, T]): SubSource[T, Mat] =
730730
new SubSource(delegate.scan(zero)(f.apply))
731731

732732
/**
@@ -757,7 +757,7 @@ class SubSource[Out, Mat](
757757
*
758758
* See also [[#scan]]
759759
*/
760-
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
760+
def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
761761
new SubSource(delegate.scanAsync(zero) { (out, in) =>
762762
f(out, in).asScala
763763
})
@@ -783,7 +783,7 @@ class SubSource[Out, Mat](
783783
*
784784
* '''Cancels when''' downstream cancels
785785
*/
786-
def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
786+
def fold[T](zero: T, f: function.Function2[T, Out, T]): SubSource[T, Mat] =
787787
new SubSource(delegate.fold(zero)(f.apply))
788788

789789
/**
@@ -831,7 +831,7 @@ class SubSource[Out, Mat](
831831
*
832832
* '''Cancels when''' downstream cancels
833833
*/
834-
def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
834+
def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
835835
new SubSource(delegate.foldAsync(zero) { (out, in) =>
836836
f(out, in).asScala
837837
})
@@ -3002,7 +3002,7 @@ class SubSource[Out, Mat](
30023002
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
30033003
*/
30043004
@ApiMayChange
3005-
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
3005+
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
30063006
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
30073007
harvest: function.Function[Agg, Emit],
30083008
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubSource[Emit, Mat] =

0 commit comments

Comments
 (0)