Skip to content

Commit 10d86aa

Browse files
authored
perf: Optimize source apply when empty or one elem seq (#32858)
1 parent 85f128b commit 10d86aa

File tree

3 files changed

+46
-5
lines changed

3 files changed

+46
-5
lines changed

akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import akka.stream.Attributes.Attribute
2020
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
2121
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
2222
import akka.stream.testkit.Utils.TE
23+
import akka.stream.testkit.scaladsl.TestSource
2324

2425
// Debug loglevel to diagnose https://github.com/akka/akka-core/issues/30469
2526
class FlowFlatMapPrefixSpec extends StreamSpec("akka.loglevel = debug") {
@@ -654,13 +655,14 @@ class FlowFlatMapPrefixSpec extends StreamSpec("akka.loglevel = debug") {
654655
}
655656

656657
"complete when downstream cancels before pulling and upstream does not produce" in {
657-
val fSeq = Source(List.empty[Int])
658+
val (probe, fSeq) = TestSource[Int]()
658659
.flatMapPrefixMat(1) { prefix =>
659660
Flow[Int].mapMaterializedValue(_ => prefix)
660-
}(Keep.right)
661-
.to(Sink.cancelled)
661+
}(Keep.both)
662+
.toMat(Sink.cancelled)(Keep.left)
662663
.withAttributes(attributes)
663664
.run()
665+
probe.sendComplete()
664666

665667
if (att.propagateToNestedMaterialization) {
666668
fSeq.futureValue should equal(Nil)
@@ -670,7 +672,7 @@ class FlowFlatMapPrefixSpec extends StreamSpec("akka.loglevel = debug") {
670672
}
671673

672674
"complete when downstream cancels before pulling and upstream does not produce, prefix=0" in {
673-
val fSeq = Source(List.empty[Int])
675+
val fSeq = TestSource[Int]()
674676
.flatMapPrefixMat(0) { prefix =>
675677
Flow[Int].mapMaterializedValue(_ => prefix)
676678
}(Keep.right)

akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,35 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
8484
}
8585
}
8686

87+
"Source from iterable" must {
88+
"produce optimized source for no elements" in {
89+
val source = Source(Nil)
90+
source should ===(Source.empty)
91+
val result = source.runWith(Sink.seq)
92+
result.futureValue should ===(Seq.empty)
93+
}
94+
95+
"produce optimized source for one element Vector" in {
96+
val source = Source(Vector(1))
97+
val result = source.runWith(Sink.seq)
98+
result.futureValue should ===(immutable.Seq(1))
99+
source.getAttributes.nameLifted should ===(Some("singleSource"))
100+
}
101+
102+
"produce optimized source for one element List" in {
103+
val source = Source(List(1))
104+
val result = source.runWith(Sink.seq)
105+
result.futureValue should ===(immutable.Seq(1))
106+
source.getAttributes.nameLifted should ===(Some("singleSource"))
107+
}
108+
109+
"produce all elements fed to it" in {
110+
val source = Source(List(1, 2, 3))
111+
val result = source.runWith(Sink.seq)
112+
result.futureValue should ===(immutable.Seq(1, 2, 3))
113+
}
114+
}
115+
87116
"Composite Source" must {
88117
"merge from many inputs" in {
89118
val probes = immutable.Seq.fill(5)(TestPublisher.manualProbe[Int]())

akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,17 @@ object Source {
353353
* beginning) regardless of when they subscribed.
354354
*/
355355
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
356-
fromGraph(new IterableSource[T](iterable))
356+
iterable match {
357+
case Nil => empty
358+
case head :: Nil => single(head)
359+
case s: IterableOnce[T] =>
360+
s.knownSize match {
361+
case 0 => empty
362+
case 1 => single(s.head)
363+
case _ => fromGraph(new IterableSource[T](iterable))
364+
}
365+
case _ => fromGraph(new IterableSource[T](iterable))
366+
}
357367

358368
/**
359369
* Starts a new `Source` from the given `Future`. The stream will consist of

0 commit comments

Comments
 (0)