Skip to content

Commit 068a6c3

Browse files
author
Alexandre Curreli
committed
Added awaitTermination method + Redis now waits for all actors to be terminated before shutting down ActorSystem
1 parent 6178d2c commit 068a6c3

File tree

8 files changed

+110
-60
lines changed

8 files changed

+110
-60
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ organization := "com.livestream"
66

77
name := "scredis"
88

9-
version := "2.0.2"
9+
version := "2.0.3"
1010

1111
scalaVersion := "2.11.2"
1212

src/main/scala/scredis/BlockingClient.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ final class BlockingClient(
163163
*/
164164
def quit()(implicit timeout: Duration): Try[Unit] = sendBlocking(Quit())
165165

166+
watchTermination()
167+
166168
}
167169

168170
/**

src/main/scala/scredis/Client.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ final class Client(
137137
RedisConfig(configName, path)
138138
)
139139

140+
watchTermination()
141+
140142
}
141143

142144
/**

src/main/scala/scredis/Redis.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,15 @@ final class Redis(
255255
if (shouldShutdownBlockingClient) {
256256
try {
257257
blocking.quit()(5 seconds)
258+
blocking.awaitTermination(3 seconds)
258259
} catch {
259260
case e: Throwable => logger.error("Could not shutdown blocking client", e)
260261
}
261262
}
262263
val future = if (shouldShutdownSubscriberClient) {
263-
subscriber.quit()
264+
subscriber.quit().map { _ =>
265+
subscriber.awaitTermination(3 seconds)
266+
}
264267
} else {
265268
Future.successful(())
266269
}
@@ -269,6 +272,7 @@ final class Redis(
269272
}.flatMap { _ =>
270273
super.quit()
271274
}.map { _ =>
275+
awaitTermination(3 seconds)
272276
system.shutdown()
273277
}
274278
}
@@ -292,6 +296,8 @@ final class Redis(
292296
super.select(database)
293297
}
294298

299+
watchTermination()
300+
295301
}
296302

297303
/**

src/main/scala/scredis/SubscriberClient.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ final class SubscriberClient(
148148
*/
149149
def quit(): Future[Unit] = shutdown()
150150

151+
watchTermination()
152+
151153
}
152154

153155
/**

src/main/scala/scredis/io/AbstractAkkaConnection.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import scredis.protocol.requests.ServerRequests.{ ClientSetName, Shutdown }
1111
import scala.concurrent.{ ExecutionContext, Future }
1212
import scala.concurrent.duration._
1313

14+
import java.util.concurrent.{ CountDownLatch, TimeUnit }
15+
1416
abstract class AbstractAkkaConnection(
1517
protected val system: ActorSystem,
1618
protected val host: String,
@@ -29,10 +31,14 @@ abstract class AbstractAkkaConnection(
2931
protected val akkaDecoderDispatcherPath: String
3032
) extends Connection with LazyLogging {
3133

34+
private val shutdownLatch = new CountDownLatch(1)
35+
3236
@volatile protected var isShuttingDown = false
3337

3438
override implicit val dispatcher = system.dispatcher
3539

40+
protected val listenerActor: ActorRef
41+
3642
protected def updateState(request: Request[_]): Unit = request match {
3743
case Auth(password) => if (password.isEmpty) {
3844
passwordOpt = None
@@ -53,4 +59,37 @@ abstract class AbstractAkkaConnection(
5359
protected def getDatabase: Int = database
5460
protected def getNameOpt: Option[String] = nameOpt
5561

62+
protected def watchTermination(): Unit = system.actorOf(
63+
Props(
64+
classOf[WatchActor],
65+
listenerActor,
66+
shutdownLatch
67+
)
68+
)
69+
70+
/**
71+
* Waits for all the internal actors to be shutdown.
72+
*
73+
* @note This method is usually called after issuing a QUIT command
74+
*
75+
* @param timeout amount of time to wait
76+
*/
77+
def awaitTermination(timeout: Duration = Duration.Inf): Unit = {
78+
if (timeout.isFinite) {
79+
shutdownLatch.await(timeout.toMillis, TimeUnit.MILLISECONDS)
80+
} else {
81+
shutdownLatch.await()
82+
}
83+
}
84+
85+
}
86+
87+
class WatchActor(actor: ActorRef, shutdownLatch: CountDownLatch) extends Actor {
88+
def receive: Receive = {
89+
case Terminated(_) => {
90+
shutdownLatch.countDown()
91+
context.stop(self)
92+
}
93+
}
94+
context.watch(actor)
5695
}

src/main/scala/scredis/io/IOActor.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,6 @@ class IOActor(
7474
become(awaitingAbort)
7575
}
7676

77-
protected def stop(): Unit = {
78-
logger.trace("Stopping Actor...")
79-
//listenerActor ! PoisonPill
80-
context.stop(self)
81-
}
82-
8377
protected def encode(request: Request[_]): Int = {
8478
request.encode()
8579
request.encoded match {

src/main/scala/scredis/io/ListenerActor.scala

Lines changed: 57 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class ListenerActor(
5454
private var initializationRequestsCount = 0
5555
private var isConnecting = false
5656
private var isShuttingDown = false
57+
private var isShuttingDownBeforeConnected = false
5758
private var isReceiveTimeout = false
5859
private var timeoutCancellableOpt: Option[Cancellable] = None
5960

@@ -284,7 +285,7 @@ class ListenerActor(
284285
case request: Quit => {
285286
request.success(())
286287
failAllQueuedRequests(RedisIOException("Connection has been shutdown by QUIT command"))
287-
shutdown()
288+
isShuttingDownBeforeConnected = true
288289
}
289290
case request: Request[_] => {
290291
queuedRequests.addLast(request)
@@ -305,60 +306,64 @@ class ListenerActor(
305306
case Connected => {
306307
isConnecting = false
307308

308-
onConnect()
309-
310-
val authRequestOpt = passwordOpt.map { password =>
311-
Auth(password)
312-
}
313-
val selectRequestOpt = if (database > 0) {
314-
Some(Select(database))
309+
if (isShuttingDownBeforeConnected) {
310+
shutdown()
315311
} else {
316-
None
317-
}
318-
val setNameRequestOpt = nameOpt.map { name =>
319-
ServerRequests.ClientSetName(name)
320-
}
321-
322-
val authFuture = authRequestOpt match {
323-
case Some(request) => request.future
324-
case None => Future.successful(())
325-
}
326-
val selectFuture = selectRequestOpt match {
327-
case Some(request) => request.future
328-
case None => Future.successful(())
329-
}
330-
val setNameFuture = setNameRequestOpt match {
331-
case Some(request) => request.future
332-
case None => Future.successful(())
333-
}
334-
335-
val requests = List[Option[Request[Unit]]](
336-
authRequestOpt, selectRequestOpt, setNameRequestOpt
337-
).flatten
338-
339-
initializationRequestsCount = requests.size
340-
341-
if (initializationRequestsCount > 0) {
342-
send(requests: _*)
343-
become(initializing)
344-
} else {
345-
onInitialized()
346-
sendAllQueuedRequests()
347-
if (isShuttingDown) {
348-
become(shuttingDown)
312+
onConnect()
313+
314+
val authRequestOpt = passwordOpt.map { password =>
315+
Auth(password)
316+
}
317+
val selectRequestOpt = if (database > 0) {
318+
Some(Select(database))
349319
} else {
350-
become(initialized)
320+
None
321+
}
322+
val setNameRequestOpt = nameOpt.map { name =>
323+
ServerRequests.ClientSetName(name)
324+
}
325+
326+
val authFuture = authRequestOpt match {
327+
case Some(request) => request.future
328+
case None => Future.successful(())
329+
}
330+
val selectFuture = selectRequestOpt match {
331+
case Some(request) => request.future
332+
case None => Future.successful(())
333+
}
334+
val setNameFuture = setNameRequestOpt match {
335+
case Some(request) => request.future
336+
case None => Future.successful(())
337+
}
338+
339+
val requests = List[Option[Request[Unit]]](
340+
authRequestOpt, selectRequestOpt, setNameRequestOpt
341+
).flatten
342+
343+
initializationRequestsCount = requests.size
344+
345+
if (initializationRequestsCount > 0) {
346+
send(requests: _*)
347+
become(initializing)
348+
} else {
349+
onInitialized()
350+
sendAllQueuedRequests()
351+
if (isShuttingDown) {
352+
become(shuttingDown)
353+
} else {
354+
become(initialized)
355+
}
356+
}
357+
358+
authFuture.recover {
359+
case e: Throwable => logger.error(s"Could not authenticate to $remote", e)
360+
}
361+
selectFuture.recover {
362+
case e: Throwable => logger.error(s"Could not select database '$database' in $remote", e)
363+
}
364+
setNameFuture.recover {
365+
case e: Throwable => logger.error(s"Could not set client name in $remote", e)
351366
}
352-
}
353-
354-
authFuture.recover {
355-
case e: Throwable => logger.error(s"Could not authenticate to $remote", e)
356-
}
357-
selectFuture.recover {
358-
case e: Throwable => logger.error(s"Could not select database '$database' in $remote", e)
359-
}
360-
setNameFuture.recover {
361-
case e: Throwable => logger.error(s"Could not set client name in $remote", e)
362367
}
363368
}
364369
case ReceiveTimeout =>

0 commit comments

Comments
 (0)