Skip to content

Commit 5776a32

Browse files
committed
feat: Add support for switching scheduler
1 parent b20ec82 commit 5776a32

File tree

8 files changed

+256
-15
lines changed

8 files changed

+256
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* license agreements; and to You under the Apache License, version 2.0:
4+
*
5+
* https://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* This file is part of the Apache Pekko project, which was derived from Akka.
8+
*/
9+
10+
/*
11+
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
12+
*/
13+
14+
package org.apache.pekko.dispatch
15+
16+
import com.typesafe.config.ConfigFactory
17+
18+
import org.apache.pekko
19+
import pekko.actor.{ Actor, Props }
20+
import pekko.testkit.{ ImplicitSender, PekkoSpec }
21+
import pekko.util.JavaVersion
22+
23+
object ForkJoinPoolVirtualThreadSpec {
24+
val config = ConfigFactory.parseString("""
25+
|virtual {
26+
| task-dispatcher {
27+
| mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
28+
| throughput = 5
29+
| fork-join-executor {
30+
| parallelism-factor = 2
31+
| parallelism-max = 2
32+
| parallelism-min = 2
33+
| virtualize = on
34+
| }
35+
| }
36+
|}
37+
""".stripMargin)
38+
39+
class ThreadNameActor extends Actor {
40+
41+
override def receive = {
42+
case "ping" =>
43+
sender() ! Thread.currentThread().getName
44+
}
45+
}
46+
47+
}
48+
49+
class ForkJoinPoolVirtualThreadSpec extends PekkoSpec(ForkJoinPoolVirtualThreadSpec.config) with ImplicitSender {
50+
import ForkJoinPoolVirtualThreadSpec._
51+
52+
"PekkoForkJoinPool" must {
53+
54+
"support virtualization with Virtual Thread" in {
55+
val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("virtual.task-dispatcher"))
56+
for (_ <- 1 to 1000) {
57+
// External task submission via the default dispatcher
58+
actor ! "ping"
59+
expectMsgPF() { case name: String => name should include("virtual-thread-") }
60+
}
61+
}
62+
63+
}
64+
}

actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala

-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ object VirtualThreadPoolDispatcherSpec {
4343
class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender {
4444
import VirtualThreadPoolDispatcherSpec._
4545

46-
val Iterations = 1000
47-
4846
"VirtualThreadPool support" must {
4947

5048
"handle simple dispatch" in {

actor/src/main/resources/reference.conf

+5
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,11 @@ pekko {
487487
# This config is new in Pekko v1.1.0 and only has an effect if you are running with JDK 9 and above.
488488
# Read the documentation on `java.util.concurrent.ForkJoinPool` to find out more. Default in hex is 0x7fff.
489489
maximum-pool-size = 32767
490+
491+
# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above.
492+
# Virtualize this dispatcher as a virtual-thread-executor
493+
# Valid values are: `on`, `off`
494+
virtualize = off
490495
}
491496

492497
# This will be used if you have set "executor = "thread-pool-executor""

actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala

+23-9
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,18 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
8989
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
9090
val parallelism: Int,
9191
val asyncMode: Boolean,
92-
val maxPoolSize: Int)
92+
val maxPoolSize: Int,
93+
val virtualize: Boolean)
9394
extends ExecutorServiceFactory {
9495

9596
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
9697
parallelism: Int,
97-
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap)
98+
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap, false)
99+
100+
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
101+
parallelism: Int,
102+
asyncMode: Boolean,
103+
maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode, maxPoolSize, false)
98104

99105
private def pekkoJdk9ForkJoinPoolClassOpt: Option[Class[_]] =
100106
Try(Class.forName("org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool")).toOption
@@ -116,12 +122,19 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
116122
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) =
117123
this(threadFactory, parallelism, asyncMode = true)
118124

119-
def createExecutorService: ExecutorService = pekkoJdk9ForkJoinPoolHandleOpt match {
120-
case Some(handle) =>
121-
handle.invoke(parallelism, threadFactory, maxPoolSize,
122-
MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService]
123-
case _ =>
124-
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
125+
def createExecutorService: ExecutorService = {
126+
val forkJoinPool = pekkoJdk9ForkJoinPoolHandleOpt match {
127+
case Some(handle) =>
128+
handle.invoke(parallelism, threadFactory, maxPoolSize,
129+
MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService]
130+
case _ =>
131+
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
132+
}
133+
if (virtualize) {
134+
new VirtualizedExecutorService("pekko", forkJoinPool)
135+
} else {
136+
forkJoinPool
137+
}
125138
}
126139
}
127140

@@ -149,6 +162,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
149162
config.getDouble("parallelism-factor"),
150163
config.getInt("parallelism-max")),
151164
asyncMode,
152-
config.getInt("maximum-pool-size"))
165+
config.getInt("maximum-pool-size"),
166+
config.getBoolean("virtualize"))
153167
}
154168
}

actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala

+4
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ object ThreadPoolConfig {
6464
* Function0 without the fun stuff (mostly for the sake of the Java API side of things)
6565
*/
6666
trait ExecutorServiceFactory {
67+
68+
/**
69+
* Create a new ExecutorService
70+
*/
6771
def createExecutorService: ExecutorService
6872
}
6973

actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
package org.apache.pekko.dispatch
1919

20-
import org.apache.pekko.annotation.InternalApi
21-
import org.apache.pekko.util.JavaVersion
20+
import org.apache.pekko
21+
import pekko.annotation.InternalApi
22+
import pekko.util.JavaVersion
2223

2324
import java.lang.invoke.{ MethodHandles, MethodType }
2425
import java.util.concurrent.{ ExecutorService, ThreadFactory }
@@ -34,8 +35,7 @@ private[dispatch] object VirtualThreadSupport {
3435
val isSupported: Boolean = JavaVersion.majorVersion >= 21
3536

3637
/**
37-
* Create a virtual thread factory with a executor, the executor will be used as the scheduler of
38-
* virtual thread.
38+
* Create a virtual thread factory with the default Virtual Thread executor.
3939
*/
4040
def newVirtualThreadFactory(prefix: String): ThreadFactory = {
4141
require(isSupported, "Virtual thread is not supported.")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.dispatch
19+
20+
import org.apache.pekko.annotation.InternalApi
21+
22+
import java.util.concurrent.{ ExecutorService, ThreadFactory }
23+
import scala.util.control.NonFatal
24+
25+
/**
26+
* TODO remove this class once we drop Java 8 support
27+
*/
28+
@InternalApi
29+
private[dispatch] object VirtualThreadSupportReflect {
30+
31+
/**
32+
* Create a virtual thread factory with given executor, the executor will be used as the scheduler of
33+
* virtual thread.
34+
*
35+
* The executor should run task on platform threads.
36+
*
37+
* returns null if not supported.
38+
*/
39+
def newThreadPerTaskExecutor(prefix: String, executor: ExecutorService): ExecutorService = {
40+
val factory = virtualThreadFactory(prefix, executor)
41+
VirtualThreadSupport.newThreadPerTaskExecutor(factory)
42+
}
43+
44+
private def virtualThreadFactory(prefix: String, executor: ExecutorService): ThreadFactory =
45+
try {
46+
val builderClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder")
47+
val ofVirtualClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual")
48+
val ofVirtualMethod = classOf[Thread].getDeclaredMethod("ofVirtual")
49+
var builder = ofVirtualMethod.invoke(null)
50+
if (executor != null) {
51+
val clazz = builder.getClass
52+
val field = clazz.getDeclaredField("scheduler")
53+
field.setAccessible(true)
54+
field.set(builder, executor)
55+
}
56+
val nameMethod = ofVirtualClass.getDeclaredMethod("name", classOf[String], classOf[Long])
57+
val factoryMethod = builderClass.getDeclaredMethod("factory")
58+
builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", 0L)
59+
factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
60+
} catch {
61+
case NonFatal(e) =>
62+
throw new UnsupportedOperationException("Failed to create virtual thread factory", e)
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.dispatch
19+
20+
import org.apache.pekko.annotation.InternalApi
21+
22+
import java.util
23+
import java.util.concurrent.{ Callable, ExecutorService, Future, TimeUnit }
24+
25+
/**
26+
* A virtualized executor service that creates a new virtual thread for each task.
27+
* Will shut down the underlying executor service when this executor is being shutdown.
28+
*
29+
* INTERNAL API
30+
*/
31+
@InternalApi
32+
final class VirtualizedExecutorService(prefix: String, underlying: ExecutorService) extends ExecutorService {
33+
require(prefix ne null, "Parameter prefix must not be null or empty")
34+
require(underlying ne null, "Parameter underlying must not be null")
35+
36+
private val executor = VirtualThreadSupportReflect.newThreadPerTaskExecutor(prefix, underlying)
37+
38+
override def shutdown(): Unit = {
39+
executor.shutdown()
40+
underlying.shutdown()
41+
}
42+
43+
override def shutdownNow(): util.List[Runnable] = {
44+
executor.shutdownNow()
45+
underlying.shutdownNow()
46+
}
47+
48+
override def isShutdown: Boolean = {
49+
executor.isShutdown || underlying.isShutdown
50+
}
51+
52+
override def isTerminated: Boolean = {
53+
executor.isTerminated && underlying.isTerminated
54+
}
55+
56+
override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = {
57+
executor.awaitTermination(timeout, unit) && underlying.awaitTermination(timeout, unit)
58+
}
59+
60+
override def submit[T](task: Callable[T]): Future[T] = {
61+
executor.submit(task)
62+
}
63+
64+
override def submit[T](task: Runnable, result: T): Future[T] = {
65+
executor.submit(task, result)
66+
}
67+
68+
override def submit(task: Runnable): Future[_] = {
69+
executor.submit(task)
70+
}
71+
72+
override def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = {
73+
executor.invokeAll(tasks)
74+
}
75+
76+
override def invokeAll[T](
77+
tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = {
78+
executor.invokeAll(tasks, timeout, unit)
79+
}
80+
81+
override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = {
82+
executor.invokeAny(tasks)
83+
}
84+
85+
override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = {
86+
executor.invokeAny(tasks, timeout, unit)
87+
}
88+
89+
override def execute(command: Runnable): Unit = {
90+
executor.execute(command)
91+
}
92+
}

0 commit comments

Comments
 (0)