@@ -159,19 +159,6 @@ private[akka] object FlattenConcat {
159159 def materialize (): Unit = ()
160160 }
161161
162- private final class InflightSingleSource [T ](elem : T ) extends InflightSource [T ] {
163- private var _hasNext = true
164- override def hasNext : Boolean = _hasNext
165- override def next (): T =
166- if (_hasNext) {
167- _hasNext = false
168- elem
169- } else throw new NoSuchElementException (" next called after completion" )
170- override def tryPull (): Unit = ()
171- override def cancel (cause : Throwable ): Unit = ()
172- override def isClosed : Boolean = ! hasNext
173- }
174-
175162 private final class InflightIteratorSource [T ](iterator : Iterator [T ]) extends InflightSource [T ] {
176163 override def hasNext : Boolean = iterator.hasNext
177164 override def next (): T = iterator.next()
@@ -235,7 +222,9 @@ private[akka] final class FlattenConcat[T, M](parallelism: Int)
235222 override def createLogic (enclosingAttributes : Attributes ) =
236223 new GraphStageLogic (shape) with InHandler with OutHandler {
237224 import FlattenConcat ._
238- private var queue : BufferImpl [InflightSource [T ]] = _
225+ // InflightSource[T] or SingleSource[T]
226+ // AnyRef here to avoid lift the SingleSource[T] to InflightSource[T]
227+ private var queue : BufferImpl [AnyRef ] = _
239228 private val invokeCb : InflightSource [T ] => Unit =
240229 getAsyncCallback[InflightSource [T ]](futureSourceCompleted).invoke
241230
@@ -274,17 +263,23 @@ private[akka] final class FlattenConcat[T, M](parallelism: Int)
274263
275264 override def onPull (): Unit = {
276265 if (queue.nonEmpty) {
277- val currentSource = queue.peek()
278266 // purge if possible
279- if (currentSource.hasNext) {
280- push(out, currentSource.next())
281- if (currentSource.isClosed) {
282- handleCurrentSourceClosed(currentSource)
283- }
284- } else if (currentSource.isClosed) {
285- handleCurrentSourceClosed(currentSource)
286- } else {
287- currentSource.tryPull()
267+ queue.peek() match {
268+ case singleSource : SingleSource [T ] @ unchecked =>
269+ push(out, singleSource.elem)
270+ removeSource()
271+ case currentSource : InflightSource [T ] @ unchecked =>
272+ if (currentSource.hasNext) {
273+ push(out, currentSource.next())
274+ if (currentSource.isClosed) {
275+ handleCurrentSourceClosed(currentSource)
276+ }
277+ } else if (currentSource.isClosed) {
278+ handleCurrentSourceClosed(currentSource)
279+ } else {
280+ currentSource.tryPull()
281+ }
282+ case _ => throw new IllegalStateException (" Should not reach here." )
288283 }
289284 } else if (! hasBeenPulled(in)) {
290285 tryPull(in)
@@ -308,18 +303,18 @@ private[akka] final class FlattenConcat[T, M](parallelism: Int)
308303 private def cancelInflightSources (cause : Throwable ): Unit = {
309304 if (queue.nonEmpty) {
310305 var source = queue.dequeue()
311- while (source ne null ) {
312- source.cancel(cause)
306+ while (( source ne null ) && (source. isInstanceOf [ InflightSource [ T ] @ unchecked]) ) {
307+ source.asInstanceOf [ InflightSource [ T ]]. cancel(cause)
313308 source = queue.dequeue()
314309 }
315310 }
316311 }
317312
318- private def addSourceElem ( elem : T ): Unit = {
313+ private def addSource ( singleSource : SingleSource [ T ] ): Unit = {
319314 if (isAvailable(out) && queue.isEmpty) {
320- push(out, elem)
315+ push(out, singleSource. elem)
321316 } else {
322- queue.enqueue(new InflightSingleSource (elem) )
317+ queue.enqueue(singleSource )
323318 }
324319 }
325320
@@ -397,7 +392,7 @@ private[akka] final class FlattenConcat[T, M](parallelism: Int)
397392 (TraversalBuilder .getValuePresentedSource(source): @ nowarn(" cat=lint-infer-any" )) match {
398393 case OptionVal .Some (graph) =>
399394 graph match {
400- case single : SingleSource [T ] @ unchecked => addSourceElem (single.elem )
395+ case single : SingleSource [T ] @ unchecked => addSource (single)
401396 case futureSource : FutureSource [T ] @ unchecked =>
402397 val future = futureSource.future
403398 future.value match {
@@ -415,32 +410,45 @@ private[akka] final class FlattenConcat[T, M](parallelism: Int)
415410
416411 }
417412
413+ private def removeSource (): Unit = {
414+ queue.dequeue()
415+ pullIfNeeded()
416+ }
417+
418418 private def removeSource (source : InflightSource [T ]): Unit = {
419419 if (queue.nonEmpty && (source eq queue.peek())) {
420420 // only dequeue if it's the current emitting source
421421 val s = queue.dequeue()
422422 if (s != source) {
423423 throw new IllegalStateException (" Should not reach here." )
424424 }
425- if (isClosed(in)) {
426- if (queue.isEmpty) {
427- completeStage()
428- } else {
429- // pull the new emitting source
430- val nextSource = queue.peek()
431- nextSource.tryPull()
432- }
425+ pullIfNeeded()
426+ } // not the head source, just ignore
427+ }
428+
429+ private def pullIfNeeded () : Unit = {
430+ if (isClosed(in)) {
431+ if (queue.isEmpty) {
432+ completeStage()
433433 } else {
434- if (queue.nonEmpty) {
435- // pull the new emitting source
436- val nextSource = queue.peek()
437- nextSource.tryPull()
438- }
439- if (! hasBeenPulled(in)) {
440- tryPull(in)
441- }
434+ tryPullNextSourceInQueue()
442435 }
443- } // not the head source, just ignore
436+ } else {
437+ if (queue.nonEmpty) {
438+ tryPullNextSourceInQueue()
439+ }
440+ if (! hasBeenPulled(in)) {
441+ tryPull(in)
442+ }
443+ }
444+ }
445+
446+ private def tryPullNextSourceInQueue (): Unit = {
447+ // pull the new emitting source
448+ val nextSource = queue.peek()
449+ if (nextSource.isInstanceOf [InflightSource [T ] @ unchecked]) {
450+ nextSource.asInstanceOf [InflightSource [T ]].tryPull()
451+ }
444452 }
445453
446454 setHandlers(in, out, this )
0 commit comments