@@ -683,13 +683,6 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
683683 final def completeStage (): Unit =
684684 internalCompleteStage(SubscriptionWithCancelException .StageWasCompleted , OptionVal .None )
685685
686- // Variable used from `OutHandler.onDownstreamFinish` to carry over cancellation cause in cases where
687- // `OutHandler` implementations call `super.onDownstreamFinished()`.
688- /**
689- * INTERNAL API
690- */
691- @ InternalApi private [stream] var lastCancellationCause : Throwable = _
692-
693686 /**
694687 * Automatically invokes [[cancel ]] or [[complete ]] on all the input or output ports that have been called,
695688 * then marks the stage as stopped.
@@ -1770,66 +1763,6 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap
17701763 scheduleAtFixedRate(timerKey, initialDelay.asScala, interval.asScala)
17711764 }
17721765
1773- /**
1774- * Schedule timer to call [[#onTimer ]] periodically with the given interval after the specified
1775- * initial delay.
1776- * Any existing timer with the same key will automatically be canceled before
1777- * adding the new timer.
1778- */
1779- @ deprecated(
1780- " Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " +
1781- " scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred." ,
1782- since = " 2.6.0" )
1783- final protected def schedulePeriodicallyWithInitialDelay (
1784- timerKey : Any ,
1785- initialDelay : FiniteDuration ,
1786- interval : FiniteDuration ): Unit =
1787- scheduleAtFixedRate(timerKey, initialDelay, interval)
1788-
1789- /**
1790- * Schedule timer to call [[#onTimer ]] periodically with the given interval after the specified
1791- * initial delay.
1792- * Any existing timer with the same key will automatically be canceled before
1793- * adding the new timer.
1794- */
1795- @ deprecated(
1796- " Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " +
1797- " scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred." ,
1798- since = " 2.6.0" )
1799- final protected def schedulePeriodicallyWithInitialDelay (
1800- timerKey : Any ,
1801- initialDelay : java.time.Duration ,
1802- interval : java.time.Duration ): Unit = {
1803- import akka .util .JavaDurationConverters ._
1804- schedulePeriodicallyWithInitialDelay(timerKey, initialDelay.asScala, interval.asScala)
1805- }
1806-
1807- /**
1808- * Schedule timer to call [[#onTimer ]] periodically with the given interval.
1809- * Any existing timer with the same key will automatically be canceled before
1810- * adding the new timer.
1811- */
1812- @ deprecated(
1813- " Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " +
1814- " scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred." ,
1815- since = " 2.6.0" )
1816- final protected def schedulePeriodically (timerKey : Any , interval : FiniteDuration ): Unit =
1817- schedulePeriodicallyWithInitialDelay(timerKey, interval, interval)
1818-
1819- /**
1820- * Schedule timer to call [[#onTimer ]] periodically with the given interval.
1821- * Any existing timer with the same key will automatically be canceled before
1822- * adding the new timer.
1823- */
1824- @ deprecated(
1825- " Use scheduleWithFixedDelay or scheduleAtFixedRate instead. This has the same semantics as " +
1826- " scheduleAtFixedRate, but scheduleWithFixedDelay is often preferred." ,
1827- since = " 2.6.0" )
1828- final protected def schedulePeriodically (timerKey : Any , interval : java.time.Duration ): Unit = {
1829- import akka .util .JavaDurationConverters ._
1830- schedulePeriodically(timerKey, interval.asScala)
1831- }
1832-
18331766 /**
18341767 * Cancel timer, ensuring that the [[#onTimer ]] is not subsequently called.
18351768 *
@@ -1893,34 +1826,14 @@ trait OutHandler {
18931826 @ throws(classOf [Exception ])
18941827 def onPull (): Unit
18951828
1896- /**
1897- * Called when the output port will no longer accept any new elements. After this callback no other callbacks will
1898- * be called for this port.
1899- */
1900- @ throws(classOf [Exception ])
1901- @ deprecatedOverriding(" Override `def onDownstreamFinish(cause: Throwable)`, instead." , since = " 2.6.0" ) // warns when overriding
1902- @ deprecated(" Call onDownstreamFinish with a cancellation cause." , since = " 2.6.0" ) // warns when calling
1903- def onDownstreamFinish (): Unit = {
1904- val thisStage = GraphInterpreter .currentInterpreter.activeStage
1905- require(
1906- thisStage.lastCancellationCause ne null ,
1907- " onDownstreamFinish() must not be called without a cancellation cause" )
1908- thisStage.cancelStage(thisStage.lastCancellationCause)
1909- }
1910-
19111829 /**
19121830 * Called when the output port will no longer accept any new elements. After this callback no other callbacks will
19131831 * be called for this port.
19141832 */
19151833 @ throws(classOf [Exception ])
19161834 def onDownstreamFinish (cause : Throwable ): Unit = {
19171835 val thisStage = GraphInterpreter .currentInterpreter.activeStage
1918- try {
1919- require(cause ne null , " Cancellation cause must not be null" )
1920- require(thisStage.lastCancellationCause eq null , " onDownstreamFinish(cause) must not be called recursively" )
1921- thisStage.lastCancellationCause = cause
1922- (onDownstreamFinish(): @ nowarn(" msg=deprecated" )) // if not overridden, call old deprecated variant
1923- } finally thisStage.lastCancellationCause = null
1836+ thisStage.cancelStage(cause)
19241837 }
19251838}
19261839
0 commit comments