@@ -683,13 +683,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
683683 final def completeStage (): Unit =
684684 internalCompleteStage(SubscriptionWithCancelException .StageWasCompleted , OptionVal .None )
685685
686- // Variable used from `OutHandler.onDownstreamFinish` to carry over cancellation cause in cases where
687- // `OutHandler` implementations call `super.onDownstreamFinished()`.
688- /**
689- * INTERNAL API
690- */
691- @ InternalApi private [stream] var lastCancellationCause : Throwable = _
692-
693686 /**
694687 * Automatically invokes [[cancel ]] or [[complete ]] on all the input or output ports that have been called,
695688 * then marks the stage as stopped.
@@ -1893,34 +1886,14 @@ trait OutHandler {
18931886 @ throws(classOf [Exception ])
18941887 def onPull (): Unit
18951888
1896- /**
1897- * Called when the output port will no longer accept any new elements. After this callback no other callbacks will
1898- * be called for this port.
1899- */
1900- @ throws(classOf [Exception ])
1901- @ deprecatedOverriding(" Override `def onDownstreamFinish(cause: Throwable)`, instead." , since = " 2.6.0" ) // warns when overriding
1902- @ deprecated(" Call onDownstreamFinish with a cancellation cause." , since = " 2.6.0" ) // warns when calling
1903- def onDownstreamFinish (): Unit = {
1904- val thisStage = GraphInterpreter .currentInterpreter.activeStage
1905- require(
1906- thisStage.lastCancellationCause ne null ,
1907- " onDownstreamFinish() must not be called without a cancellation cause" )
1908- thisStage.cancelStage(thisStage.lastCancellationCause)
1909- }
1910-
19111889 /**
19121890 * Called when the output port will no longer accept any new elements. After this callback no other callbacks will
19131891 * be called for this port.
19141892 */
19151893 @ throws(classOf [Exception ])
19161894 def onDownstreamFinish (cause : Throwable ): Unit = {
19171895 val thisStage = GraphInterpreter .currentInterpreter.activeStage
1918- try {
1919- require(cause ne null , " Cancellation cause must not be null" )
1920- require(thisStage.lastCancellationCause eq null , " onDownstreamFinish(cause) must not be called recursively" )
1921- thisStage.lastCancellationCause = cause
1922- (onDownstreamFinish(): @ nowarn(" msg=deprecated" )) // if not overridden, call old deprecated variant
1923- } finally thisStage.lastCancellationCause = null
1896+ thisStage.cancelStage(cause)
19241897 }
19251898}
19261899
0 commit comments