14
14
package org .apache .pekko .dispatch
15
15
16
16
import com .typesafe .config .Config
17
+ import org .apache .pekko
18
+ import pekko .dispatch .VirtualThreadSupport .newVirtualThreadFactory
19
+ import pekko .util .JavaVersion
17
20
18
21
import java .lang .invoke .{ MethodHandle , MethodHandles , MethodType }
19
- import java .util .concurrent .{ ExecutorService , ForkJoinPool , ForkJoinTask , ThreadFactory }
22
+ import java .util .concurrent .{ Executor , ExecutorService , ForkJoinPool , ForkJoinTask , ThreadFactory }
20
23
import scala .util .Try
21
24
22
- import org .apache .pekko .util .JavaVersion
23
-
24
25
object ForkJoinExecutorConfigurator {
25
26
26
27
/**
@@ -86,15 +87,28 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
86
87
}
87
88
88
89
class ForkJoinExecutorServiceFactory (
90
+ val id : String ,
89
91
val threadFactory : ForkJoinPool .ForkJoinWorkerThreadFactory ,
90
92
val parallelism : Int ,
91
93
val asyncMode : Boolean ,
92
- val maxPoolSize : Int )
94
+ val maxPoolSize : Int ,
95
+ val virtualize : Boolean )
93
96
extends ExecutorServiceFactory {
97
+ def this (threadFactory : ForkJoinPool .ForkJoinWorkerThreadFactory ,
98
+ parallelism : Int ,
99
+ asyncMode : Boolean ,
100
+ maxPoolSize : Int ,
101
+ virtualize : Boolean ) =
102
+ this (null , threadFactory, parallelism, asyncMode, maxPoolSize, virtualize)
94
103
95
104
def this (threadFactory : ForkJoinPool .ForkJoinWorkerThreadFactory ,
96
105
parallelism : Int ,
97
- asyncMode : Boolean ) = this (threadFactory, parallelism, asyncMode, ForkJoinPoolConstants .MaxCap )
106
+ asyncMode : Boolean ) = this (threadFactory, parallelism, asyncMode, ForkJoinPoolConstants .MaxCap , false )
107
+
108
+ def this (threadFactory : ForkJoinPool .ForkJoinWorkerThreadFactory ,
109
+ parallelism : Int ,
110
+ asyncMode : Boolean ,
111
+ maxPoolSize : Int ) = this (threadFactory, parallelism, asyncMode, maxPoolSize, false )
98
112
99
113
private def pekkoJdk9ForkJoinPoolClassOpt : Option [Class [_]] =
100
114
Try (Class .forName(" org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool" )).toOption
@@ -116,12 +130,50 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
116
130
def this (threadFactory : ForkJoinPool .ForkJoinWorkerThreadFactory , parallelism : Int ) =
117
131
this (threadFactory, parallelism, asyncMode = true )
118
132
119
- def createExecutorService : ExecutorService = pekkoJdk9ForkJoinPoolHandleOpt match {
120
- case Some (handle) =>
121
- handle.invoke(parallelism, threadFactory, maxPoolSize,
122
- MonitorableThreadFactory .doNothing, asyncMode).asInstanceOf [ExecutorService ]
123
- case _ =>
124
- new PekkoForkJoinPool (parallelism, threadFactory, MonitorableThreadFactory .doNothing, asyncMode)
133
+ def createExecutorService : ExecutorService = {
134
+ val tf = if (virtualize && JavaVersion .majorVersion >= 21 ) {
135
+ threadFactory match {
136
+ // we need to use the thread factory to create carrier thread
137
+ case m : MonitorableThreadFactory => new MonitorableCarrierThreadFactory (m.name)
138
+ case _ => threadFactory
139
+ }
140
+ } else threadFactory
141
+
142
+ val pool = pekkoJdk9ForkJoinPoolHandleOpt match {
143
+ case Some (handle) =>
144
+ // carrier Thread only exists in JDK 17+
145
+ handle.invoke(parallelism, tf, maxPoolSize, MonitorableThreadFactory .doNothing, asyncMode)
146
+ .asInstanceOf [ExecutorService with LoadMetrics ]
147
+ case _ =>
148
+ new PekkoForkJoinPool (parallelism, tf, MonitorableThreadFactory .doNothing, asyncMode)
149
+ }
150
+
151
+ if (virtualize && JavaVersion .majorVersion >= 21 ) {
152
+ // when virtualized, we need enhanced thread factory
153
+ val factory : ThreadFactory = threadFactory match {
154
+ case MonitorableThreadFactory (name, _, contextClassLoader, exceptionHandler, _) =>
155
+ new ThreadFactory {
156
+ private val vtFactory = newVirtualThreadFactory(name, pool) // use the pool as the scheduler
157
+
158
+ override def newThread (r : Runnable ): Thread = {
159
+ val vt = vtFactory.newThread(r)
160
+ vt.setUncaughtExceptionHandler(exceptionHandler)
161
+ contextClassLoader.foreach(vt.setContextClassLoader)
162
+ vt
163
+ }
164
+ }
165
+ case _ => newVirtualThreadFactory(prerequisites.settings.name, pool); // use the pool as the scheduler
166
+ }
167
+ // wrap the pool with virtualized executor service
168
+ new VirtualizedExecutorService (
169
+ factory, // the virtual thread factory
170
+ pool, // the underlying pool
171
+ (_ : Executor ) => pool.atFullThrottle(), // the load metrics provider, we use the pool itself
172
+ cascadeShutdown = true // cascade shutdown
173
+ )
174
+ } else {
175
+ pool
176
+ }
125
177
}
126
178
}
127
179
@@ -143,12 +195,14 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
143
195
}
144
196
145
197
new ForkJoinExecutorServiceFactory (
198
+ id,
146
199
validate(tf),
147
200
ThreadPoolConfig .scaledPoolSize(
148
201
config.getInt(" parallelism-min" ),
149
202
config.getDouble(" parallelism-factor" ),
150
203
config.getInt(" parallelism-max" )),
151
204
asyncMode,
152
- config.getInt(" maximum-pool-size" ))
205
+ config.getInt(" maximum-pool-size" ),
206
+ config.getBoolean(" virtualize" ))
153
207
}
154
208
}
0 commit comments