@@ -19,15 +19,19 @@ import akka.stream.Attributes.SourceLocation
1919import akka .stream .impl .{ Buffer => BufferImpl }
2020import akka .stream .impl .ActorSubscriberMessage
2121import akka .stream .impl .ActorSubscriberMessage .OnError
22+ import akka .stream .impl .EmptySource
2223import akka .stream .impl .Stages .DefaultAttributes
2324import akka .stream .impl .SubscriptionTimeoutException
2425import akka .stream .impl .TraversalBuilder
26+ import akka .stream .impl .fusing .GraphStages .IterableSource
2527import akka .stream .impl .fusing .GraphStages .SingleSource
2628import akka .stream .scaladsl ._
2729import akka .stream .stage ._
2830import akka .util .OptionVal
2931import akka .util .ccompat .JavaConverters ._
3032
33+ import scala .annotation .nowarn
34+
3135/**
3236 * INTERNAL API
3337 */
@@ -39,11 +43,11 @@ import akka.util.ccompat.JavaConverters._
3943 override def initialAttributes = DefaultAttributes .flattenMerge
4044 override val shape = FlowShape (in, out)
4145
42- override def createLogic (enclosingAttributes : Attributes ) =
43- new GraphStageLogic (shape) with OutHandler with InHandler {
46+ override def createLogic (enclosingAttributes : Attributes ): GraphStageLogic with InHandler with OutHandler =
47+ new GraphStageLogic (shape) with InHandler with OutHandler {
4448 var sources = Set .empty[SubSinkInlet [T ]]
45- var pendingSingleSources = 0
46- def activeSources = sources.size + pendingSingleSources
49+ var pendingDirectPushSources = 0
50+ def activeSources : Int = sources.size + pendingDirectPushSources
4751
4852 // To be able to optimize for SingleSource without materializing them the queue may hold either
4953 // SubSinkInlet[T] or SingleSource
@@ -60,15 +64,32 @@ import akka.util.ccompat.JavaConverters._
6064 case single : SingleSource [T ] @ unchecked =>
6165 push(out, single.elem)
6266 removeSource(single)
67+ case iterableSource : IterableSource [T ] @ unchecked =>
68+ handleIterableSource(iterableSource)
6369 case other =>
6470 throw new IllegalStateException (s " Unexpected source type in queue: ' ${other.getClass}' " )
6571 }
6672 }
6773
74+ @ nowarn(" msg=deprecated" )
75+ private def handleIterableSource (iterableSource : IterableSource [T ]): Unit = {
76+ val elements = iterableSource.elements
77+ if (elements.hasDefiniteSize) {
78+ if (elements.isEmpty) {
79+ removeSource(iterableSource)
80+ } else if (elements.size == 1 ) {
81+ push(out, elements.head)
82+ removeSource(iterableSource)
83+ }
84+ } else {
85+ emitMultiple(out, elements, () => removeSource(iterableSource))
86+ }
87+ }
88+
6889 override def onPush (): Unit = {
6990 val source = grab(in)
7091 addSource(source)
71- if (activeSources < breadth) tryPull(in)
92+ if (activeSources < breadth && ! hasBeenPulled(in) ) tryPull(in)
7293 }
7394
7495 override def onUpstreamFinish (): Unit = if (activeSources == 0 ) completeStage()
@@ -87,47 +108,73 @@ import akka.util.ccompat.JavaConverters._
87108 def addSource (source : Graph [SourceShape [T ], M ]): Unit = {
88109 // If it's a SingleSource or wrapped such we can push the element directly instead of materializing it.
89110 // Have to use AnyRef because of OptionVal null value.
90- TraversalBuilder .getSingleSource(source.asInstanceOf [Graph [SourceShape [AnyRef ], M ]]) match {
91- case OptionVal .Some (single) =>
92- if (isAvailable(out) && queue.isEmpty) {
93- push(out, single.elem.asInstanceOf [T ])
94- } else {
95- queue.enqueue(single)
96- pendingSingleSources += 1
97- }
98- case _ =>
99- val sinkIn = new SubSinkInlet [T ](" FlattenMergeSink" )
100- sinkIn.setHandler(new InHandler {
101- override def onPush (): Unit = {
102- if (isAvailable(out)) {
103- push(out, sinkIn.grab())
104- sinkIn.pull()
111+ TraversalBuilder .getDirectPushableSource(source.asInstanceOf [Graph [SourceShape [AnyRef ], M ]]) match {
112+ case OptionVal .Some (s) =>
113+ s.asInstanceOf [GraphStage [SourceShape [T ]]] match {
114+ case single : SingleSource [T ] @ unchecked =>
115+ if (isAvailable(out) && queue.isEmpty) {
116+ push(out, single.elem)
105117 } else {
106- queue.enqueue(sinkIn)
118+ queue.enqueue(single)
119+ pendingDirectPushSources += 1
107120 }
108- }
109- override def onUpstreamFinish (): Unit = if (! sinkIn.isAvailable) removeSource(sinkIn)
110- })
111- sinkIn.pull()
112- sources += sinkIn
113- val graph = Source .fromGraph(source).to(sinkIn.sink)
114- interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes)
121+ case iterable : IterableSource [T ] @ unchecked =>
122+ pendingDirectPushSources += 1
123+ if (isAvailable(out) && queue.isEmpty) {
124+ handleIterableSource(iterable)
125+ } else {
126+ queue.enqueue(iterable)
127+ }
128+ case EmptySource =>
129+ tryPullOrComplete()
130+ case _ =>
131+ addSourceWithMaterialization(source)
132+ }
133+ case _ =>
134+ addSourceWithMaterialization(source)
115135 }
116136 }
117137
138+ private def addSourceWithMaterialization (source : Graph [SourceShape [T ], M ]): Unit = {
139+ val sinkIn = new SubSinkInlet [T ](" FlattenMergeSink" )
140+ sinkIn.setHandler(new InHandler {
141+ override def onPush (): Unit = {
142+ if (isAvailable(out)) {
143+ push(out, sinkIn.grab())
144+ sinkIn.pull()
145+ } else {
146+ queue.enqueue(sinkIn)
147+ }
148+ }
149+ override def onUpstreamFinish (): Unit = if (! sinkIn.isAvailable) removeSource(sinkIn)
150+ })
151+ sinkIn.pull()
152+ sources += sinkIn
153+ val graph = Source .fromGraph(source).to(sinkIn.sink)
154+ interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes)
155+ }
156+
118157 def removeSource (src : AnyRef ): Unit = {
119158 val pullSuppressed = activeSources == breadth
120159 src match {
121160 case sub : SubSinkInlet [T ] @ unchecked =>
122161 sources -= sub
123- case _ : SingleSource [_] =>
124- pendingSingleSources -= 1
162+ case _ : SingleSource [_] | _ : IterableSource [_] =>
163+ pendingDirectPushSources -= 1
125164 case other => throw new IllegalArgumentException (s " Unexpected source type: ' ${other.getClass}' " )
126165 }
127166 if (pullSuppressed) tryPull(in)
128167 if (activeSources == 0 && isClosed(in)) completeStage()
129168 }
130169
170+ private def tryPullOrComplete (): Unit = {
171+ if (activeSources < breadth) {
172+ tryPull(in)
173+ } else if (activeSources == 0 && isClosed(in)) {
174+ completeStage()
175+ }
176+ }
177+
131178 override def postStop (): Unit = sources.foreach(_.cancel())
132179
133180 }
0 commit comments