@@ -58,12 +58,32 @@ object ThreadPoolConfig {
58
58
def reusableQueue (queue : BlockingQueue [Runnable ]): QueueFactory = () => queue
59
59
60
60
def reusableQueue (queueFactory : QueueFactory ): QueueFactory = reusableQueue(queueFactory())
61
+
62
+ // TODO remove in Pekko 2.0 after change to normal class
63
+ def unapply (config : ThreadPoolConfig )
64
+ : Option [(Boolean , Int , Int , Duration , ThreadPoolConfig .QueueFactory , RejectedExecutionHandler )] =
65
+ Some ((config.allowCorePoolTimeout, config.corePoolSize, config.maxPoolSize, config.threadTimeout,
66
+ config.queueFactory, config.rejectionPolicy))
67
+
68
+ // TODO remove in Pekko 2.0 after change to normal class
69
+ def apply (allowCorePoolTimeout : Boolean ,
70
+ corePoolSize : Int ,
71
+ maxPoolSize : Int ,
72
+ threadTimeout : Duration ,
73
+ queueFactory : ThreadPoolConfig .QueueFactory ,
74
+ rejectionPolicy : RejectedExecutionHandler ): ThreadPoolConfig =
75
+ ThreadPoolConfig (allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy,
76
+ virtualize = false )
61
77
}
62
78
63
79
/**
64
80
* Function0 without the fun stuff (mostly for the sake of the Java API side of things)
65
81
*/
66
82
trait ExecutorServiceFactory {
83
+
84
+ /**
85
+ * Create a new ExecutorService
86
+ */
67
87
def createExecutorService : ExecutorService
68
88
}
69
89
@@ -77,14 +97,26 @@ trait ExecutorServiceFactoryProvider {
77
97
/**
78
98
* A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher
79
99
*/
100
+ // TODO don't use case class for this in 2.0, it's not a good fit
80
101
final case class ThreadPoolConfig (
81
102
allowCorePoolTimeout : Boolean = ThreadPoolConfig .defaultAllowCoreThreadTimeout,
82
103
corePoolSize : Int = ThreadPoolConfig .defaultCorePoolSize,
83
104
maxPoolSize : Int = ThreadPoolConfig .defaultMaxPoolSize,
84
105
threadTimeout : Duration = ThreadPoolConfig .defaultTimeout,
85
106
queueFactory : ThreadPoolConfig .QueueFactory = ThreadPoolConfig .linkedBlockingQueue(),
86
- rejectionPolicy : RejectedExecutionHandler = ThreadPoolConfig .defaultRejectionPolicy)
107
+ rejectionPolicy : RejectedExecutionHandler = ThreadPoolConfig .defaultRejectionPolicy,
108
+ virtualize : Boolean )
87
109
extends ExecutorServiceFactoryProvider {
110
+ // TODO remove in Pekko 2.0 after change to normal class
111
+ def this (allowCorePoolTimeout : Boolean ,
112
+ corePoolSize : Int ,
113
+ maxPoolSize : Int ,
114
+ threadTimeout : Duration ,
115
+ queueFactory : ThreadPoolConfig .QueueFactory ,
116
+ rejectionPolicy : RejectedExecutionHandler ) =
117
+ this (allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy,
118
+ virtualize = false )
119
+
88
120
// Written explicitly to permit non-inlined defn; this is necessary for downstream instrumentation that stores extra
89
121
// context information on the config
90
122
@ noinline
@@ -94,13 +126,31 @@ final case class ThreadPoolConfig(
94
126
maxPoolSize : Int = maxPoolSize,
95
127
threadTimeout : Duration = threadTimeout,
96
128
queueFactory : ThreadPoolConfig .QueueFactory = queueFactory,
97
- rejectionPolicy : RejectedExecutionHandler = rejectionPolicy
129
+ rejectionPolicy : RejectedExecutionHandler = rejectionPolicy,
130
+ virtualize : Boolean = virtualize
131
+ ): ThreadPoolConfig =
132
+ ThreadPoolConfig (allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy,
133
+ virtualize)
134
+
135
+ // TODO remove in Pekko 2.0 after change to normal class
136
+ @ noinline
137
+ def copy (allowCorePoolTimeout : Boolean ,
138
+ corePoolSize : Int ,
139
+ maxPoolSize : Int ,
140
+ threadTimeout : Duration ,
141
+ queueFactory : ThreadPoolConfig .QueueFactory ,
142
+ rejectionPolicy : RejectedExecutionHandler
98
143
): ThreadPoolConfig =
99
- ThreadPoolConfig (allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy)
144
+ ThreadPoolConfig (allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy,
145
+ virtualize)
146
+
147
+ class ThreadPoolExecutorServiceFactory (val id : String ,
148
+ val threadFactory : ThreadFactory ) extends ExecutorServiceFactory {
149
+
150
+ def this (threadFactory : ThreadFactory ) = this (null , threadFactory)
100
151
101
- class ThreadPoolExecutorServiceFactory (val threadFactory : ThreadFactory ) extends ExecutorServiceFactory {
102
152
def createExecutorService : ExecutorService = {
103
- val service : ThreadPoolExecutor = new ThreadPoolExecutor (
153
+ val executor : ThreadPoolExecutor = new ThreadPoolExecutor (
104
154
corePoolSize,
105
155
maxPoolSize,
106
156
threadTimeout.length,
@@ -110,18 +160,19 @@ final case class ThreadPoolConfig(
110
160
rejectionPolicy) with LoadMetrics {
111
161
def atFullThrottle (): Boolean = this .getActiveCount >= this .getPoolSize
112
162
}
113
- service .allowCoreThreadTimeOut(allowCorePoolTimeout)
114
- service
163
+ executor .allowCoreThreadTimeOut(allowCorePoolTimeout)
164
+ if (virtualize) new VirtualizedExecutorService (id, executor) else executor
115
165
}
116
166
}
167
+
117
168
def createExecutorServiceFactory (id : String , threadFactory : ThreadFactory ): ExecutorServiceFactory = {
118
169
val tf = threadFactory match {
119
170
case m : MonitorableThreadFactory =>
120
171
// add the dispatcher id to the thread names
121
172
m.withName(m.name + " -" + id)
122
173
case other => other
123
174
}
124
- new ThreadPoolExecutorServiceFactory (tf)
175
+ new ThreadPoolExecutorServiceFactory (id, tf)
125
176
}
126
177
}
127
178
@@ -178,6 +229,9 @@ final case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) {
178
229
def setQueueFactory (newQueueFactory : QueueFactory ): ThreadPoolConfigBuilder =
179
230
this .copy(config = config.copy(queueFactory = newQueueFactory))
180
231
232
+ def setVirtualize (virtualize : Boolean ): ThreadPoolConfigBuilder =
233
+ this .copy(config = config.copy(virtualize = virtualize))
234
+
181
235
def configure (fs : Option [Function [ThreadPoolConfigBuilder , ThreadPoolConfigBuilder ]]* ): ThreadPoolConfigBuilder =
182
236
fs.foldLeft(this )((c, f) => f.map(_(c)).getOrElse(c))
183
237
}
0 commit comments