Skip to content

Commit fb1968d

Browse files
author
Alexandre Curreli
committed
Added BlockingExecutor + separate executor for callbacks
1 parent 238c3c1 commit fb1968d

File tree

8 files changed

+204
-109
lines changed

8 files changed

+204
-109
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ organization := "com.livestream"
66

77
name := "scredis"
88

9-
version := "1.1.1"
9+
version := "1.1.2"
1010

1111
scalaVersion := "2.9.3"
1212

src/main/resources/reference.conf

Lines changed: 70 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -128,41 +128,79 @@ scredis {
128128
# This parameter has no effect unless auto-pipeline is enabled.
129129
timer-thread-naming-pattern = "scredis-$p-pipeliner"
130130

131-
# The default execution context is a FixedThreadPool. The following parameters have no
132-
# effect if a custom execution context is provided with the Redis.withExecutionContext()
133-
# factory method.
131+
# The default executors are CachedThreadPools with a maximum number of concurrent tasks.
134132
executors {
135-
# Sets the number of threads in the fixed thread pool.
136-
threads = 10
137133

138-
# Defines the maximum number of asynchronous tasks that can reside in the queue before
139-
# the executor starts blocking. This prevents OutOfMemory exceptions to occur under super
140-
# high load scenarios where commands are queued faster than processed.
141-
queue-capacity = 10000
134+
# The internal thread pool is the one scredis uses to run asynchronous commands.
135+
# The following parameters have no effect if a custom execution context is provided with
136+
# the Redis.withExecutionContext() factory method.
137+
internal {
138+
# Defines the maximum number of asynchronous tasks (single command or pipeline) that can run
139+
# simultaneously before the executor starts blocking. This prevents OutOfMemory exceptions
140+
# to occur under super high load scenarios where commands are queued faster than processed.
141+
# This parameter basically determines the maximum number of threads that can be created.
142+
max-concurrent = 8
143+
144+
# Sets the name of the threads in the thread pool.
145+
# The "$p" (pool number) and "$t" (thread number) variable can be used in the pattern and
146+
# will be replaced at runtime.
147+
threads-naming-pattern = "scredis-$p-worker-$t"
148+
149+
# Determines whether all internal threads should be created as daemon threads.
150+
#
151+
# Daemon threads cannot prevent an application from terminating. When all non-daemon
152+
# threads (such as the main thread) terminate, all daemon threads are automatically
153+
# terminated and the application itself terminates. As a consequence, if Redis.quit()
154+
# is not invoked before the application terminates, all queued or processing commands will
155+
# be dropped.
156+
#
157+
# In contrast, non-daemon threads will prevent the application from terminating unless there
158+
# is an explicit invocation of the Redis.quit() method (which will terminate all internal
159+
# threads) or if the application is shutdown using System.exit().
160+
daemon-threads = true
161+
162+
# Sets the priority of all threads in the thread pool.
163+
# "min" - lowest priority
164+
# "normal" - normal priority
165+
# "max" - highest priority
166+
threads-priority = normal
167+
}
142168

143-
# Sets the name of the threads in the thread pool.
144-
# The "$p" (pool number) and "$t" (thread number) variable can be used in the pattern and
145-
# will be replaced at runtime.
146-
threads-naming-pattern = "scredis-$p-worker-$t"
147-
148-
# Determines whether all internal threads should be created as daemon threads.
149-
#
150-
# Daemon threads cannot prevent an application from terminating. When all non-daemon
151-
# threads (such as the main thread) terminate, all daemon threads are automatically
152-
# terminated and the application itself terminates. As a consequence, if Redis.quit()
153-
# is not invoked before the application terminates, all queued or processing commands will
154-
# be dropped.
155-
#
156-
# In contrast, non-daemon threads will prevent the application from terminating unless there
157-
# is an explicit invocation of the Redis.quit() method (which will terminate all internal
158-
# threads) or if the application is shutdown using System.exit().
159-
daemon-threads = true
160-
161-
# Sets the priority of all threads in the thread pool.
162-
# "min" - lowest priority
163-
# "normal" - normal priority
164-
# "max" - highest priority
165-
threads-priority = normal
169+
# The callback thread pool is used when registering callbacks on asynchronous commands.
170+
# It is an implicit public member of the Redis instance, i.e. Redis.ec and can be imported
171+
# as follows "import redis.ec". Note that the callback executor is lazily defined, meaning
172+
# that it will only be initialized (and shutdown) if it is explicitly imported.
173+
callback {
174+
# Defines the maximum number of asynchronous tasks (single command or pipeline) that can run
175+
# simultaneously before the executor starts blocking. This prevents OutOfMemory exceptions
176+
# to occur under super high load scenarios where commands are queued faster than processed.
177+
# This parameter basically determines the maximum number of threads that can be created.
178+
max-concurrent = 16
179+
180+
# Sets the name of the threads in the thread pool.
181+
# The "$p" (pool number) and "$t" (thread number) variable can be used in the pattern and
182+
# will be replaced at runtime.
183+
threads-naming-pattern = "scredis-$p-callback-$t"
184+
185+
# Determines whether all internal threads should be created as daemon threads.
186+
#
187+
# Daemon threads cannot prevent an application from terminating. When all non-daemon
188+
# threads (such as the main thread) terminate, all daemon threads are automatically
189+
# terminated and the application itself terminates. As a consequence, if Redis.quit()
190+
# is not invoked before the application terminates, all queued or processing commands will
191+
# be dropped.
192+
#
193+
# In contrast, non-daemon threads will prevent the application from terminating unless there
194+
# is an explicit invocation of the Redis.quit() method (which will terminate all internal
195+
# threads) or if the application is shutdown using System.exit().
196+
daemon-threads = true
197+
198+
# Sets the priority of all threads in the thread pool.
199+
# "min" - lowest priority
200+
# "normal" - normal priority
201+
# "max" - highest priority
202+
threads-priority = normal
203+
}
166204
}
167205
}
168206

src/main/scala/scredis/Redis.scala

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import akka.util.duration._
2626

2727
import scredis.commands.async._
2828
import scredis.exceptions._
29+
import scredis.util.BlockingExecutor
2930

3031
import scala.collection.mutable.ListBuffer
3132

@@ -139,19 +140,36 @@ final class Redis private[scredis] (
139140
private val timerTask = if(IsAutomaticPipeliningEnabled) Some((
140141
new Timer(
141142
config.Async.TimerThreadNamingPattern.replace("$p", PoolNumber),
142-
config.Async.Executors.DaemonThreads
143+
true
143144
),
144145
new TimerTask() {
145146
def run(): Unit = executePipelineIfNeeded()
146147
}
147148
)) else None
148149

149-
private val executorOpt = ecOpt match {
150+
private val internalExecutorOpt = ecOpt match {
150151
case Some(_) => None
151-
case None => Some(newThreadPool())
152+
case None => Some(
153+
newThreadPool(
154+
config.Async.Executors.Internal.ThreadsNamingPattern,
155+
config.Async.Executors.Internal.DaemonThreads,
156+
config.Async.Executors.Internal.ThreadsPriority
157+
)
158+
)
152159
}
160+
private val interalExecutionContext = ecOpt.getOrElse(
161+
BlockingExecutor(internalExecutorOpt.get, config.Async.Executors.Internal.MaxConcurrent)
162+
)
153163

154-
implicit val ec = ecOpt.getOrElse(ExecutionContext.fromExecutorService(executorOpt.get))
164+
private var shouldShutdownCallbackExecutor = false
165+
private lazy val callbackExecutor: ExecutorService = {
166+
shouldShutdownCallbackExecutor = true
167+
newThreadPool(
168+
config.Async.Executors.Callback.ThreadsNamingPattern,
169+
config.Async.Executors.Callback.DaemonThreads,
170+
config.Async.Executors.Callback.ThreadsPriority
171+
)
172+
}
155173

156174
private val pool = ClientPool(config)
157175

@@ -170,23 +188,26 @@ final class Redis private[scredis] (
170188
config.Client.Sleep
171189
)
172190

173-
private def newThreadPool(): ExecutorService = {
191+
lazy implicit val ec = ecOpt.getOrElse(
192+
BlockingExecutor(callbackExecutor, config.Async.Executors.Callback.MaxConcurrent)
193+
)
194+
195+
private def execute[A](f: => A): Future[A] = Future {
196+
f
197+
}(interalExecutionContext)
198+
199+
private def newThreadPool(
200+
namingPattern: String, daemon: Boolean, priority: Int
201+
): ExecutorService = {
174202
val threadFactory = new BasicThreadFactory.Builder()
175203
.namingPattern(
176-
config.Async.Executors.ThreadsNamingPattern.replace("$p", PoolNumber).replace("$t", "%d")
204+
namingPattern.replace("$p", PoolNumber).replace("$t", "%d")
177205
)
178-
.daemon(config.Async.Executors.DaemonThreads)
179-
.priority(config.Async.Executors.ThreadsPriority)
206+
.daemon(daemon)
207+
.priority(priority)
180208
.build()
181209

182-
new ThreadPoolExecutor(
183-
config.Async.Executors.Threads,
184-
config.Async.Executors.Threads,
185-
0L,
186-
TimeUnit.MILLISECONDS,
187-
new LinkedBlockingQueue[Runnable](config.Async.Executors.QueueCapacity),
188-
threadFactory
189-
)
210+
Executors.newCachedThreadPool(threadFactory)
190211
}
191212

192213
private def nextIndex(): Int = synchronized {
@@ -273,7 +294,7 @@ final class Redis private[scredis] (
273294
-1
274295
}
275296
}
276-
if(executeIndex >= 0) Future { executePipeline(executeIndex) }
297+
if(executeIndex >= 0) execute { executePipeline(executeIndex) }
277298
} catch {
278299
case e: Throwable => logger.error("An unexpected error occurred", e)
279300
} finally {
@@ -293,7 +314,11 @@ final class Redis private[scredis] (
293314
val (future, executeIndex) = synchronized {
294315
val commands = this.commands.get(index)
295316
val future = try {
296-
val promise = Promise[Any]()
317+
/*
318+
* Akka promises need explicit execution context.
319+
* Cannot re-use internal execution context otherwise it can create deadlocks.
320+
*/
321+
val promise = Promise[Any]()(ec)
297322
commands += ((body, opts, promise))
298323
promise.future.asInstanceOf[Future[A]]
299324
} catch {
@@ -311,7 +336,7 @@ final class Redis private[scredis] (
311336
}
312337
(future, executeIndex)
313338
}
314-
if(executeIndex >= 0) Future { executePipeline(executeIndex) }
339+
if(executeIndex >= 0) execute { executePipeline(executeIndex) }
315340
future
316341
} else {
317342
withClientAsync(body)
@@ -335,7 +360,7 @@ final class Redis private[scredis] (
335360
def borrowClient(): Client = pool.borrowClient()
336361
def returnClient(client: Client): Unit = pool.returnClient(client)
337362
def withClient[A](body: Client => A): A = pool.withClient(body)
338-
def withClientAsync[A](body: Client => A): Future[A] = Future {
363+
def withClientAsync[A](body: Client => A): Future[A] = execute {
339364
withClient(body)
340365
}
341366

@@ -357,7 +382,7 @@ final class Redis private[scredis] (
357382
*/
358383
override def select(
359384
db: Int
360-
)(implicit opts: CommandOptions = DefaultCommandOptions): Future[Unit] = Future {
385+
)(implicit opts: CommandOptions = DefaultCommandOptions): Future[Unit] = execute {
361386
selectInternal(db)
362387
}
363388

@@ -375,7 +400,7 @@ final class Redis private[scredis] (
375400
*/
376401
override def auth(password: String)(
377402
implicit opts: CommandOptions = DefaultCommandOptions
378-
): Future[Unit] = Future {
403+
): Future[Unit] = execute {
379404
authInternal(if(password.isEmpty) None else Some(password))
380405
}
381406

@@ -420,7 +445,7 @@ final class Redis private[scredis] (
420445
}
421446

422447
try {
423-
executorOpt.foreach { executor =>
448+
internalExecutorOpt.foreach { executor =>
424449
executor.shutdown()
425450
awaitTerminationOpt.foreach { timeout =>
426451
if (timeout.isFinite) {
@@ -432,9 +457,18 @@ final class Redis private[scredis] (
432457
}
433458
} catch {
434459
case e: Throwable => logger.error(
435-
"An error occurred while shutting down default executor", e
460+
"An error occurred while shutting down internal executor", e
436461
)
437462
}
463+
if (shouldShutdownCallbackExecutor) {
464+
try {
465+
callbackExecutor.shutdown()
466+
} catch {
467+
case e: Throwable => logger.error(
468+
"An error occurred while shutting down callback executor", e
469+
)
470+
}
471+
}
438472
try {
439473
pool.close()
440474
} catch {
@@ -444,7 +478,9 @@ final class Redis private[scredis] (
444478
}
445479
}
446480

447-
if(IsAutomaticPipeliningEnabled) nextIndex()
481+
if (IsAutomaticPipeliningEnabled) {
482+
nextIndex()
483+
}
448484

449485
timerTask.foreach {
450486
case (timer, task) => timer.scheduleAtFixedRate(task, Interval.toMillis, Interval.toMillis)

src/main/scala/scredis/RedisConfig.scala

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -177,17 +177,33 @@ class RedisConfig(config: Config = ConfigFactory.load().getConfig("scredis")) {
177177
val TimerThreadNamingPattern = async.getString("timer-thread-naming-pattern")
178178

179179
object Executors {
180-
val Threads = asyncExecutors.getInt("threads")
181-
val QueueCapacity = asyncExecutors.getInt("queue-capacity")
182-
val ThreadsNamingPattern = asyncExecutors.getString("threads-naming-pattern")
183-
val DaemonThreads = asyncExecutors.getBoolean("daemon-threads")
184-
val ThreadsPriority = asyncExecutors.getString("threads-priority").toLowerCase match {
185-
case "min" => Thread.MIN_PRIORITY
186-
case "normal" => Thread.NORM_PRIORITY
187-
case "max" => Thread.MAX_PRIORITY
188-
case _ => throw new IllegalArgumentException(
189-
"threads-priority can only be min, normal or max"
190-
)
180+
object Internal {
181+
private val config = asyncExecutors.getConfig("internal")
182+
val MaxConcurrent = config.getInt("max-concurrent")
183+
val ThreadsNamingPattern = config.getString("threads-naming-pattern")
184+
val DaemonThreads = config.getBoolean("daemon-threads")
185+
val ThreadsPriority = config.getString("threads-priority").toLowerCase match {
186+
case "min" => Thread.MIN_PRIORITY
187+
case "normal" => Thread.NORM_PRIORITY
188+
case "max" => Thread.MAX_PRIORITY
189+
case _ => throw new IllegalArgumentException(
190+
"threads-priority can only be min, normal or max"
191+
)
192+
}
193+
}
194+
object Callback {
195+
private val config = asyncExecutors.getConfig("callback")
196+
val MaxConcurrent = config.getInt("max-concurrent")
197+
val ThreadsNamingPattern = config.getString("threads-naming-pattern")
198+
val DaemonThreads = config.getBoolean("daemon-threads")
199+
val ThreadsPriority = config.getString("threads-priority").toLowerCase match {
200+
case "min" => Thread.MIN_PRIORITY
201+
case "normal" => Thread.NORM_PRIORITY
202+
case "max" => Thread.MAX_PRIORITY
203+
case _ => throw new IllegalArgumentException(
204+
"threads-priority can only be min, normal or max"
205+
)
206+
}
191207
}
192208
}
193209
}

src/main/scala/scredis/commands/KeysCommands.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import scredis.CommandOptions
2121
import scredis.protocol.Protocol
2222
import scredis.parsing._
2323
import scredis.parsing.Implicits._
24-
import scredis.util.ScanLikeIterator
2524

2625
import scala.collection.mutable.ListBuffer
2726

src/main/scala/scredis/commands/async/TransactionalCommands.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ import scala.collection.generic.CanBuildFrom
3232
* @define m [[scredis.TransactionalClient]]
3333
*/
3434
trait TransactionalCommands extends Async {
35-
36-
protected implicit val ec: ExecutionContext
3735

3836
protected def force(opts: CommandOptions): CommandOptions = CommandOptions(
3937
opts.timeout,
@@ -91,7 +89,8 @@ trait TransactionalCommands extends Async {
9189
*/
9290
def pipelinedN[A, B[_] <: Traversable[_]](body: PipelineClient => B[Future[A]])(
9391
implicit opts: CommandOptions = DefaultCommandOptions,
94-
cbf: CanBuildFrom[B[Future[A]], A, B[A]]
92+
cbf: CanBuildFrom[B[Future[A]], A, B[A]],
93+
ec: ExecutionContext
9594
): Future[B[A]] = async(_.pipelinedN(body))(force(opts))
9695

9796
/**
@@ -164,7 +163,8 @@ trait TransactionalCommands extends Async {
164163
*/
165164
def transactionalN[A, B[_] <: Traversable[_]](body: TransactionalClient => B[Future[A]])(
166165
implicit opts: CommandOptions = DefaultCommandOptions,
167-
cbf: CanBuildFrom[B[Future[A]], A, B[A]]
166+
cbf: CanBuildFrom[B[Future[A]], A, B[A]],
167+
ec: ExecutionContext
168168
): Future[B[A]] = async(_.transactionalN(body))(force(opts))
169169

170170
/**

0 commit comments

Comments
 (0)