Skip to content

Commit c572708

Browse files
authored
perf: Optimization for already completed future sources (#32857)
* perf: Optimization for already completed future sources * test coverage * special handling of future null for Java Void case
1 parent b4cc42e commit c572708

File tree

2 files changed

+73
-6
lines changed

2 files changed

+73
-6
lines changed

akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.scalatest.time.Span
1313
import scala.annotation.nowarn
1414
import scala.concurrent.Await
1515
import scala.concurrent.Future
16+
import scala.concurrent.Promise
1617
//#imports
1718
import akka.stream._
1819

@@ -517,7 +518,63 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
517518
}
518519
}
519520

521+
"Source.future" must {
522+
"optimize already completed future" in {
523+
val future = Future.successful("done")
524+
val source = Source.future(future)
525+
source.getAttributes.nameLifted should ===(Some("singleSource"))
526+
source.runWith(Sink.head).futureValue should ===("done")
527+
}
528+
529+
"optimize already failed future" in {
530+
val future = Future.failed(TE("boom"))
531+
val source = Source.future(future)
532+
source.getAttributes.nameLifted should ===(Some("failedSource"))
533+
source.runWith(Sink.head).failed.futureValue shouldBe a[TE]
534+
}
535+
536+
"handle regular future" in {
537+
val promise = Promise[String]()
538+
val source = Source.future(promise.future)
539+
source.getAttributes.nameLifted should ===(Some("futureSource"))
540+
promise.success("done")
541+
source.runWith(Sink.head).futureValue should ===("done")
542+
}
543+
}
544+
520545
"Source.futureSource" must {
546+
"optimize already completed future" in {
547+
val future = Future.successful(Source.single("done"))
548+
val source = Source.futureSource(future)
549+
source.getAttributes.nameLifted should ===(Some("singleSource"))
550+
source.runWith(Sink.head).futureValue should ===("done")
551+
}
552+
553+
"pass along materialized value for already completed future" in {
554+
val future = Future.successful(Source.single("done").mapMaterializedValue(_ => "materializedValue"))
555+
val source = Source.futureSource(future)
556+
source.toMat(Sink.ignore)(Keep.left).run().futureValue should ===("materializedValue")
557+
}
558+
559+
"handle already failed future" in {
560+
val future = Future.failed[Source[String, NotUsed]](TE("boom"))
561+
val source = Source.futureSource(future)
562+
val (futureMat, streamResult) = source.toMat(Sink.head)(Keep.both).run()
563+
564+
streamResult.failed.futureValue should ===(TE("boom"))
565+
futureMat.failed.futureValue should ===(TE("boom"))
566+
}
567+
568+
"handle later failed future" in {
569+
val promise = Promise[Source[String, NotUsed]]()
570+
val source = Source.futureSource(promise.future)
571+
promise.failure(TE("boom"))
572+
573+
val (futureMat, streamResult) = source.toMat(Sink.head)(Keep.both).run()
574+
575+
streamResult.failed.futureValue should ===(TE("boom"))
576+
futureMat.failed.futureValue should ===(TE("boom"))
577+
}
521578

522579
"not cancel substream twice" in {
523580
val result = Source

akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,14 @@
55
package akka.stream.scaladsl
66

77
import java.util.concurrent.CompletionStage
8-
98
import scala.annotation.nowarn
109
import scala.annotation.tailrec
1110
import scala.annotation.unchecked.uncheckedVariance
1211
import scala.collection.immutable
1312
import scala.jdk.FutureConverters._
1413
import scala.concurrent.{ Future, Promise }
1514
import scala.concurrent.duration.FiniteDuration
16-
1715
import org.reactivestreams.{ Publisher, Subscriber }
18-
1916
import akka.{ Done, NotUsed }
2017
import akka.actor.{ ActorRef, Cancellable }
2118
import akka.annotation.InternalApi
@@ -27,6 +24,9 @@ import akka.stream.impl.fusing.GraphStages._
2724
import akka.stream.stage.GraphStageWithMaterializedValue
2825
import akka.util.ConstantFun
2926

27+
import scala.util.Failure
28+
import scala.util.Success
29+
3030
/**
3131
* A `Source` is a set of stream processing steps that has one open output. It can comprise
3232
* any number of internal sources and transformations that are wired together, or it can be
@@ -515,7 +515,12 @@ object Source {
515515
* The stream fails if the `Future` is completed with a failure.
516516
*/
517517
def future[T](futureElement: Future[T]): Source[T, NotUsed] =
518-
fromGraph(new FutureSource[T](futureElement))
518+
futureElement.value match {
519+
case Some(Success(null)) => empty
520+
case Some(Success(value)) => single(value)
521+
case Some(Failure(cause)) => failed(cause)
522+
case None => fromGraph(new FutureSource[T](futureElement))
523+
}
519524

520525
/**
521526
* Never emits any elements, never completes and never fails.
@@ -537,8 +542,13 @@ object Source {
537542
* Turn a `Future[Source]` into a source that will emit the values of the source when the future completes successfully.
538543
* If the `Future` is completed with a failure the stream is failed.
539544
*/
540-
def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] =
541-
fromGraph(new FutureFlattenSource(futureSource))
545+
def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] = {
546+
futureSource.value match {
547+
case Some(Success(source)) => source.mapMaterializedValue(Future.successful)
548+
case Some(Failure(exc)) => failed(exc).mapMaterializedValue(_ => Future.failed(exc))
549+
case _ => fromGraph(new FutureFlattenSource(futureSource))
550+
}
551+
}
542552

543553
/**
544554
* Defers invoking the `create` function to create a single element until there is downstream demand.

0 commit comments

Comments
 (0)