Skip to content

Commit 233dad1

Browse files
committed
=act Extract AtomicCancellable in Scheduler.
1 parent 861a188 commit 233dad1

File tree

2 files changed

+48
-58
lines changed

2 files changed

+48
-58
lines changed

akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import scala.util.control.NonFatal
1616

1717
import com.typesafe.config.Config
1818

19+
import akka.actor.Scheduler.AtomicCancellable
1920
import akka.dispatch.AbstractNodeQueue
2021
import akka.event.LoggingAdapter
2122
import akka.util.Helpers
@@ -102,48 +103,23 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
102103
override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, runnable: Runnable)(
103104
implicit executor: ExecutionContext): Cancellable = {
104105
checkMaxDelay(roundUp(delay).toNanos)
105-
try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self =>
106-
compareAndSet(
107-
InitialRepeatMarker,
106+
new AtomicCancellable(InitialRepeatMarker) { self =>
107+
final override protected def next(): Cancellable =
108108
schedule(
109109
executor,
110110
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
111111
override def run(): Unit = {
112112
try {
113113
runnable.run()
114114
val driftNanos = clock() - getAndAdd(delay.toNanos)
115-
if (self.get != null)
115+
if (self.get() != null)
116116
swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
117117
} catch {
118118
case _: SchedulerException => // ignore failure to enqueue or terminated target actor
119119
}
120120
}
121121
},
122-
roundUp(initialDelay)))
123-
124-
@tailrec private def swap(c: Cancellable): Unit = {
125-
get match {
126-
case null => if (c != null) c.cancel()
127-
case old => if (!compareAndSet(old, c)) swap(c)
128-
}
129-
}
130-
131-
final def cancel(): Boolean = {
132-
@tailrec def tailrecCancel(): Boolean = {
133-
get match {
134-
case null => false
135-
case c =>
136-
if (c.cancel()) compareAndSet(c, null)
137-
else compareAndSet(c, null) || tailrecCancel()
138-
}
139-
}
140-
141-
tailrecCancel()
142-
}
143-
144-
override def isCancelled: Boolean = get == null
145-
} catch {
146-
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
122+
roundUp(initialDelay))
147123
}
148124
}
149125

akka-actor/src/main/scala/akka/actor/Scheduler.scala

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import scala.concurrent.ExecutionContext
1212
import scala.concurrent.duration._
1313
import scala.util.control.NoStackTrace
1414

15+
import akka.actor.Scheduler.AtomicCancellable
1516
import akka.annotation.InternalApi
1617
import akka.util.JavaDurationConverters
1718

@@ -71,48 +72,23 @@ trait Scheduler {
7172
*/
7273
def scheduleWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration)(runnable: Runnable)(
7374
implicit executor: ExecutionContext): Cancellable = {
74-
try new AtomicReference[Cancellable](Cancellable.initialNotCancelled) with Cancellable { self =>
75-
compareAndSet(
76-
Cancellable.initialNotCancelled,
75+
new AtomicCancellable(Cancellable.initialNotCancelled) {
76+
final override protected def next(): Cancellable =
7777
scheduleOnce(
7878
initialDelay,
7979
new Runnable {
8080
override def run(): Unit = {
8181
try {
8282
runnable.run()
83-
if (self.get != null)
83+
if (get != null)
8484
swap(scheduleOnce(delay, this))
8585
} catch {
8686
// ignore failure to enqueue or terminated target actor
8787
case _: SchedulerException =>
8888
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
8989
}
9090
}
91-
}))
92-
93-
@tailrec private def swap(c: Cancellable): Unit = {
94-
get match {
95-
case null => if (c != null) c.cancel()
96-
case old => if (!compareAndSet(old, c)) swap(c)
97-
}
98-
}
99-
100-
final def cancel(): Boolean = {
101-
@tailrec def tailrecCancel(): Boolean = {
102-
get match {
103-
case null => false
104-
case c =>
105-
if (c.cancel()) compareAndSet(c, null)
106-
else compareAndSet(c, null) || tailrecCancel()
107-
}
108-
}
109-
110-
tailrecCancel()
111-
}
112-
113-
override def isCancelled: Boolean = get == null
114-
} catch {
115-
case SchedulerException(msg) => throw new IllegalStateException(msg)
91+
})
11692
}
11793
}
11894

@@ -561,4 +537,42 @@ object Scheduler {
561537
* a custom implementation of `Scheduler` must also implement this.
562538
*/
563539
trait TaskRunOnClose extends Runnable
540+
541+
private[akka] abstract class AtomicCancellable(initialValue: Cancellable)
542+
extends AtomicReference[Cancellable](initialValue)
543+
with Cancellable { self =>
544+
545+
try {
546+
compareAndSet(initialValue, next())
547+
} catch {
548+
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
549+
}
550+
551+
protected def next(): Cancellable
552+
553+
@tailrec final protected def swap(c: Cancellable): Unit = {
554+
get match {
555+
case null => if (c != null) c.cancel()
556+
case old =>
557+
if (!compareAndSet(old, c))
558+
swap(c)
559+
}
560+
}
561+
562+
final def cancel(): Boolean = {
563+
@tailrec def tailrecCancel(): Boolean = {
564+
get match {
565+
case null => false
566+
case c =>
567+
if (c.cancel()) compareAndSet(c, null)
568+
else compareAndSet(c, null) || tailrecCancel()
569+
}
570+
}
571+
572+
tailrecCancel()
573+
}
574+
575+
final override def isCancelled: Boolean = get == null
576+
577+
}
564578
}

0 commit comments

Comments
 (0)