Skip to content

Commit 8188c2d

Browse files
committed
Add close on ActorSystem
1 parent 16ca91c commit 8188c2d

File tree

33 files changed

+120
-100
lines changed

33 files changed

+120
-100
lines changed

actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/ActorSystemStub.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ import com.typesafe.config.{ Config, ConfigFactory }
113113
override def terminate(): Unit = terminationPromise.trySuccess(Done)
114114
override def whenTerminated: Future[Done] = terminationPromise.future
115115
override def getWhenTerminated: CompletionStage[Done] = whenTerminated.asJava
116+
override def close(): Unit = {
117+
terminate()
118+
Await.result(whenTerminated, scala.concurrent.duration.Duration.Inf)
119+
}
116120
override val startTime: Long = System.currentTimeMillis()
117121
override def uptime: Long = System.currentTimeMillis() - startTime
118122
override def threadFactory: java.util.concurrent.ThreadFactory = new ThreadFactory {

actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ class ActorSystemSpec extends PekkoSpec(ActorSystemSpec.config) with ImplicitSen
263263

264264
"throw RejectedExecutionException when shutdown" in {
265265
val system2 = ActorSystem("RejectedExecution-1", PekkoSpec.testConf)
266-
Await.ready(system2.terminate(), 10.seconds)
266+
system2.close()
267267

268268
intercept[RejectedExecutionException] {
269269
system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S A BUG HERE") }

actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class DispatcherShutdownSpec extends AnyWordSpec with Matchers {
4545
val system = ActorSystem("DispatcherShutdownSpec")
4646
threadCount should be > 0
4747

48-
Await.ready(system.terminate(), 1.second)
48+
system.close()
4949
Await.ready(Future(pekko.Done)(system.dispatcher), 1.second)
5050

5151
TestKit.awaitCond(threadCount == 0, 3.second)

actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package org.apache.pekko.actor.typed
1515

16-
import java.util.concurrent.{ CompletionStage, ThreadFactory }
16+
import java.util.concurrent.{ CompletionStage, ThreadFactory, TimeoutException }
1717

1818
import scala.concurrent.{ ExecutionContextExecutor, Future }
1919

@@ -42,7 +42,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
4242
* Not for user extension.
4343
*/
4444
@DoNotInherit
45-
abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicActorSystemProvider {
45+
abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicActorSystemProvider with AutoCloseable {
4646
this: InternalRecipientRef[T] =>
4747

4848
/**
@@ -147,6 +147,26 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicA
147147
*/
148148
def getWhenTerminated: CompletionStage[Done]
149149

150+
/**
151+
* Terminates this actor system by running [[pekko.actor.CoordinatedShutdown]] with reason
152+
* [[pekko.actor.CoordinatedShutdown.ActorSystemTerminateReason]]. This method will block
153+
* until either the actor system is terminated or
154+
* `pekko.coordinated-shutdown.close-actor-system-timeout` timeout duration is
155+
* passed, in which case a [[TimeoutException]] is thrown.
156+
*
157+
* If `pekko.coordinated-shutdown.run-by-actor-system-terminate` is configured to `off`
158+
* it will not run `CoordinatedShutdown`, but the `ActorSystem` and its actors
159+
* will still be terminated.
160+
*
161+
* This will stop the guardian actor, which in turn
162+
* will recursively stop all its child actors, and finally the system guardian
163+
* (below which the logging actors reside) and then execute all registered
164+
* termination handlers (see [[pekko.actor.ActorSystem.registerOnTermination]]).
165+
* @since 1.3.0
166+
*/
167+
@throws(classOf[TimeoutException])
168+
override def close(): Unit
169+
150170
/**
151171
* The deadLetter address is a destination that will accept (and discard)
152172
* every message sent to it.

actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ import org.slf4j.{ Logger, LoggerFactory }
122122
override lazy val getWhenTerminated: CompletionStage[pekko.Done] =
123123
whenTerminated.asJava
124124

125+
override def close(): Unit = system.close()
126+
125127
override def systemActorOf[U](behavior: Behavior[U], name: String, props: Props): ActorRef[U] = {
126128
val ref = system.systemActorOf(
127129
PropsAdapter(
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Add close to ActorSystem
19+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.ActorSystem.close")
20+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.ExtendedActorSystem.close")

actor/src/main/resources/reference.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,6 +1244,9 @@ pekko {
12441244
# Terminate the ActorSystem in the last phase actor-system-terminate.
12451245
terminate-actor-system = on
12461246

1247+
# The timeout that will be used when calling .close on an ActorSystem
1248+
close-actor-system-timeout = 60 s
1249+
12471250
# Exit the JVM (System.exit(0)) in the last phase actor-system-terminate
12481251
# if this is set to 'on'. It is done after termination of the
12491252
# ActorSystem if terminate-actor-system=on, otherwise it is done

actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import java.util.concurrent.atomic.AtomicReference
2020

2121
import scala.annotation.tailrec
2222
import scala.collection.immutable
23-
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise }
23+
import scala.concurrent.{ Await, ExecutionContext, ExecutionContextExecutor, Future, Promise }
2424
import scala.concurrent.blocking
2525
import scala.concurrent.duration.Duration
2626
import scala.jdk.CollectionConverters._
@@ -524,7 +524,7 @@ object ActorSystem {
524524
* extending [[pekko.actor.ExtendedActorSystem]] instead, but beware that you
525525
* are completely on your own in that case!
526526
*/
527-
abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvider {
527+
abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvider with AutoCloseable {
528528
import ActorSystem._
529529

530530
/**
@@ -677,6 +677,26 @@ abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvid
677677
*/
678678
def terminate(): Future[Terminated]
679679

680+
/**
681+
* Terminates this actor system by running [[CoordinatedShutdown]] with reason
682+
* [[CoordinatedShutdown.ActorSystemTerminateReason]]. This method will block
683+
* until either the actor system is terminated or
684+
* `pekko.coordinated-shutdown.close-actor-system-timeout` timeout duration is
685+
* passed, in which case a [[TimeoutException]] is thrown.
686+
*
687+
* If `pekko.coordinated-shutdown.run-by-actor-system-terminate` is configured to `off`
688+
* it will not run `CoordinatedShutdown`, but the `ActorSystem` and its actors
689+
* will still be terminated.
690+
*
691+
* This will stop the guardian actor, which in turn
692+
* will recursively stop all its child actors, and finally the system guardian
693+
* (below which the logging actors reside) and then execute all registered
694+
* termination handlers (see [[ActorSystem#registerOnTermination]]).
695+
* @since 1.3.0
696+
*/
697+
@throws(classOf[TimeoutException])
698+
override def close(): Unit
699+
680700
/**
681701
* Returns a Future which will be completed after the ActorSystem has been terminated
682702
* and termination hooks have been executed. If you registered any callback with
@@ -1080,6 +1100,13 @@ private[pekko] class ActorSystemImpl(
10801100
whenTerminated
10811101
}
10821102

1103+
override def close(): Unit = {
1104+
terminate()
1105+
val duration = Duration(settings.config.getDuration("coordinated-shutdown.close-actor-system-timeout").toMillis,
1106+
TimeUnit.MILLISECONDS)
1107+
Await.result(whenTerminated, duration)
1108+
}
1109+
10831110
override private[pekko] def finalTerminate(): Unit = {
10841111
terminating = true
10851112
// these actions are idempotent

bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ package org.apache.pekko.actor
1515

1616
import java.util.concurrent.TimeUnit
1717

18-
import scala.concurrent.Await
19-
import scala.concurrent.duration._
20-
2118
import BenchmarkActors._
2219
import org.openjdk.jmh.annotations._
2320

@@ -100,10 +97,8 @@ class ActorBenchmark {
10097
}
10198

10299
@TearDown(Level.Trial)
103-
def shutdown(): Unit = {
104-
system.terminate()
105-
Await.ready(system.whenTerminated, 15.seconds)
106-
}
100+
def shutdown(): Unit =
101+
system.close()
107102

108103
@Benchmark
109104
@OperationsPerInvocation(totalMessages)

bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ package org.apache.pekko.actor
1515

1616
import java.util.concurrent.TimeUnit
1717

18-
import scala.concurrent.Await
19-
import scala.concurrent.duration._
20-
2118
import org.openjdk.jmh.annotations._
2219

2320
/*
@@ -46,10 +43,8 @@ class ActorCreationBenchmark {
4643
}
4744

4845
@TearDown(Level.Trial)
49-
def shutdown(): Unit = {
50-
system.terminate()
51-
Await.ready(system.whenTerminated, 15.seconds)
52-
}
46+
def shutdown(): Unit =
47+
system.close()
5348

5449
@Benchmark
5550
@OutputTimeUnit(TimeUnit.MICROSECONDS)

0 commit comments

Comments
 (0)