Skip to content

Commit ea2d058

Browse files
committed
perf: Optimization for already completed future sources
1 parent 2567c41 commit ea2d058

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,14 @@
55
package akka.stream.scaladsl
66

77
import java.util.concurrent.CompletionStage
8-
98
import scala.annotation.nowarn
109
import scala.annotation.tailrec
1110
import scala.annotation.unchecked.uncheckedVariance
1211
import scala.collection.immutable
1312
import scala.jdk.FutureConverters._
1413
import scala.concurrent.{ Future, Promise }
1514
import scala.concurrent.duration.FiniteDuration
16-
1715
import org.reactivestreams.{ Publisher, Subscriber }
18-
1916
import akka.{ Done, NotUsed }
2017
import akka.actor.{ ActorRef, Cancellable }
2118
import akka.annotation.InternalApi
@@ -27,6 +24,9 @@ import akka.stream.impl.fusing.GraphStages._
2724
import akka.stream.stage.GraphStageWithMaterializedValue
2825
import akka.util.ConstantFun
2926

27+
import scala.util.Failure
28+
import scala.util.Success
29+
3030
/**
3131
* A `Source` is a set of stream processing steps that has one open output. It can comprise
3232
* any number of internal sources and transformations that are wired together, or it can be
@@ -505,7 +505,11 @@ object Source {
505505
* The stream fails if the `Future` is completed with a failure.
506506
*/
507507
def future[T](futureElement: Future[T]): Source[T, NotUsed] =
508-
fromGraph(new FutureSource[T](futureElement))
508+
futureElement.value match {
509+
case Some(Success(value)) => single(value)
510+
case Some(Failure(cause)) => failed(cause)
511+
case None => fromGraph(new FutureSource[T](futureElement))
512+
}
509513

510514
/**
511515
* Never emits any elements, never completes and never fails.
@@ -527,8 +531,12 @@ object Source {
527531
* Turn a `Future[Source]` into a source that will emit the values of the source when the future completes successfully.
528532
* If the `Future` is completed with a failure the stream is failed.
529533
*/
530-
def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] =
531-
fromGraph(new FutureFlattenSource(futureSource))
534+
def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] = {
535+
futureSource.value match {
536+
case Some(Success(source)) => source.mapMaterializedValue(Future.successful)
537+
case _ => fromGraph(new FutureFlattenSource(futureSource))
538+
}
539+
}
532540

533541
/**
534542
* Defers invoking the `create` function to create a single element until there is downstream demand.

0 commit comments

Comments
 (0)