Skip to content

Commit ea3ca90

Browse files
committed
chore: Avoid materialization FailedSource
1 parent 8103cbc commit ea3ca90

File tree

3 files changed

+8
-5
lines changed

3 files changed

+8
-5
lines changed

akka-stream/src/main/scala/akka/stream/impl/FailedSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
1212
/**
1313
* INTERNAL API
1414
*/
15-
@InternalApi private[akka] final class FailedSource[T](failure: Throwable) extends GraphStage[SourceShape[T]] {
15+
@InternalApi private[akka] final class FailedSource[T](val failure: Throwable) extends GraphStage[SourceShape[T]] {
1616
val out = Outlet[T]("FailedSource.out")
1717
override val shape = SourceShape(out)
1818

akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,8 @@ import akka.util.OptionVal
374374
*/
375375
def getValuePresentedSource[A >: Null](graph: Graph[SourceShape[A], _]): OptionVal[Graph[SourceShape[A], _]] = {
376376
def isValuePresentedSource(graph: Graph[SourceShape[_ <: A], _]): Boolean = graph match {
377-
case _: SingleSource[_] | _: FutureSource[_] | _: IterableSource[_] | _: JavaStreamSource[_, _] | EmptySource =>
377+
case _: SingleSource[_] | _: FutureSource[_] | _: IterableSource[_] | _: JavaStreamSource[_, _] | EmptySource |
378+
_: FailedSource[_] =>
378379
true
379380
case _ => false
380381
}

akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import akka.stream._
1313
import akka.stream.impl.{
1414
ActorSubscriberMessage,
1515
EmptySource,
16+
FailedSource,
1617
JavaStreamSource,
1718
SubscriptionTimeoutException,
1819
TraversalBuilder,
@@ -32,7 +33,7 @@ import scala.annotation.{ nowarn, tailrec }
3233
import scala.collection.immutable
3334
import scala.concurrent.{ ExecutionContext, Future }
3435
import scala.concurrent.duration.FiniteDuration
35-
import scala.util.Try
36+
import scala.util.{ Failure, Try }
3637
import scala.util.control.NonFatal
3738

3839
/**
@@ -402,8 +403,9 @@ private[akka] final class FlattenConcat[T, M](parallelism: Int)
402403
case iterable: IterableSource[T] @unchecked => addSourceElements(iterable.elements.iterator)
403404
case javaStream: JavaStreamSource[T, _] @unchecked =>
404405
addSourceElements(javaStream.open().iterator.asScala)
405-
case EmptySource => //Empty source is discarded
406-
case _ => attachAndMaterializeSource(source)
406+
case failed: FailedSource[T] @unchecked => addCompletedFutureElem(Failure(failed.failure))
407+
case EmptySource => //Empty source is discarded
408+
case _ => attachAndMaterializeSource(source)
407409
}
408410
case _ => attachAndMaterializeSource(source)
409411
}

0 commit comments

Comments
 (0)