@@ -16,7 +16,6 @@ package org.apache.pekko.stream.stage
1616import java .util .concurrent .{ CompletionStage , ConcurrentHashMap }
1717import java .util .concurrent .atomic .AtomicReference
1818
19- import scala .annotation .nowarn
2019import scala .annotation .tailrec
2120import scala .collection .{ immutable , mutable }
2221import scala .concurrent .{ Future , Promise }
@@ -697,13 +696,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
697696 final def completeStage (): Unit =
698697 internalCompleteStage(SubscriptionWithCancelException .StageWasCompleted , OptionVal .None )
699698
700- // Variable used from `OutHandler.onDownstreamFinish` to carry over cancellation cause in cases where
701- // `OutHandler` implementations call `super.onDownstreamFinished()`.
702- /**
703- * INTERNAL API
704- */
705- @ InternalApi private [stream] var lastCancellationCause : Throwable = _
706-
707699 /**
708700 * Automatically invokes [[cancel ]] or [[complete ]] on all the input or output ports that have been called,
709701 * then marks the stage as stopped.
@@ -1896,27 +1888,14 @@ trait OutHandler {
18961888 @ throws(classOf [Exception ])
18971889 def onPull (): Unit
18981890
1899- private def onDownstreamFinish (): Unit = {
1900- val thisStage = GraphInterpreter .currentInterpreter.activeStage
1901- require(
1902- thisStage.lastCancellationCause ne null ,
1903- " onDownstreamFinish() must not be called without a cancellation cause" )
1904- thisStage.cancelStage(thisStage.lastCancellationCause)
1905- }
1906-
19071891 /**
19081892 * Called when the output port will no longer accept any new elements. After this callback no other callbacks will
19091893 * be called for this port.
19101894 */
19111895 @ throws(classOf [Exception ])
19121896 def onDownstreamFinish (cause : Throwable ): Unit = {
1913- val thisStage = GraphInterpreter .currentInterpreter.activeStage
1914- try {
1915- require(cause ne null , " Cancellation cause must not be null" )
1916- require(thisStage.lastCancellationCause eq null , " onDownstreamFinish(cause) must not be called recursively" )
1917- thisStage.lastCancellationCause = cause
1918- onDownstreamFinish(): @ nowarn(" msg=deprecated" ) // if not overridden, call old deprecated variant
1919- } finally thisStage.lastCancellationCause = null
1897+ require(cause ne null , " Cancellation cause must not be null" )
1898+ GraphInterpreter .currentInterpreter.activeStage.cancelStage(cause)
19201899 }
19211900}
19221901
0 commit comments