Skip to content

Commit b78e373

Browse files
committed
feat: Add support for switching scheduler
1 parent b20ec82 commit b78e373

15 files changed

+439
-28
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.dispatch
19+
20+
import com.typesafe.config.ConfigFactory
21+
22+
import org.apache.pekko
23+
import pekko.actor.{ Actor, Props }
24+
import pekko.testkit.{ ImplicitSender, PekkoSpec }
25+
import pekko.util.JavaVersion
26+
27+
object ForkJoinPoolVirtualThreadSpec {
28+
val config = ConfigFactory.parseString("""
29+
|virtual {
30+
| task-dispatcher {
31+
| mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
32+
| throughput = 5
33+
| fork-join-executor {
34+
| parallelism-factor = 2
35+
| parallelism-max = 2
36+
| parallelism-min = 2
37+
| virtualize = on
38+
| }
39+
| }
40+
|}
41+
""".stripMargin)
42+
43+
class ThreadNameActor extends Actor {
44+
45+
override def receive = {
46+
case "ping" =>
47+
sender() ! Thread.currentThread().getName
48+
}
49+
}
50+
51+
}
52+
53+
class ForkJoinPoolVirtualThreadSpec extends PekkoSpec(ForkJoinPoolVirtualThreadSpec.config) with ImplicitSender {
54+
import ForkJoinPoolVirtualThreadSpec._
55+
56+
"PekkoForkJoinPool" must {
57+
58+
"support virtualization with Virtual Thread" in {
59+
val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("virtual.task-dispatcher"))
60+
for (_ <- 1 to 1000) {
61+
actor ! "ping"
62+
expectMsgPF() { case name: String => name should include("virtual.task-dispatcher-virtual-thread") }
63+
}
64+
}
65+
66+
}
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.dispatch
19+
20+
import com.typesafe.config.ConfigFactory
21+
import org.apache.pekko
22+
import pekko.actor.{ Actor, Props }
23+
import pekko.testkit.{ ImplicitSender, PekkoSpec }
24+
25+
object ThreadPoolVirtualThreadSpec {
26+
val config = ConfigFactory.parseString("""
27+
|pekko.actor.default-dispatcher.executor = "thread-pool-executor"
28+
|virtual {
29+
| task-dispatcher {
30+
| mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
31+
| throughput = 1
32+
| thread-pool-executor {
33+
| fixed-pool-size = 2
34+
| virtualize = on
35+
| }
36+
| }
37+
|}
38+
""".stripMargin)
39+
40+
class ThreadNameActor extends Actor {
41+
42+
override def receive = {
43+
case "ping" =>
44+
sender() ! Thread.currentThread().getName
45+
}
46+
}
47+
48+
}
49+
50+
class ThreadPoolVirtualThreadSpec extends PekkoSpec(ThreadPoolVirtualThreadSpec.config) with ImplicitSender {
51+
import ThreadPoolVirtualThreadSpec._
52+
53+
"PekkoThreadPoolExecutor" must {
54+
55+
"support virtualization with Virtual Thread" in {
56+
val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("virtual.task-dispatcher"))
57+
for (_ <- 1 to 1000) {
58+
actor ! "ping"
59+
expectMsgPF() { case name: String => name should include("virtual.task-dispatcher-virtual-thread") }
60+
}
61+
}
62+
63+
}
64+
}

actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala

+5-5
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ object VirtualThreadPoolDispatcherSpec {
3434

3535
override def receive = {
3636
case "ping" =>
37-
sender() ! "All fine"
37+
sender() ! Thread.currentThread().getName
3838
}
3939
}
4040

@@ -43,14 +43,14 @@ object VirtualThreadPoolDispatcherSpec {
4343
class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender {
4444
import VirtualThreadPoolDispatcherSpec._
4545

46-
val Iterations = 1000
47-
4846
"VirtualThreadPool support" must {
4947

5048
"handle simple dispatch" in {
5149
val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("virtual-thread-dispatcher"))
52-
innocentActor ! "ping"
53-
expectMsg("All fine")
50+
for (_ <- 1 to 1000) {
51+
innocentActor ! "ping"
52+
expectMsgPF() { case name: String => name should include("virtual-thread") }
53+
}
5454
}
5555

5656
}

actor/src/main/resources/reference.conf

+12
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,12 @@ pekko {
487487
# This config is new in Pekko v1.1.0 and only has an effect if you are running with JDK 9 and above.
488488
# Read the documentation on `java.util.concurrent.ForkJoinPool` to find out more. Default in hex is 0x7fff.
489489
maximum-pool-size = 32767
490+
491+
# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
492+
# When set to `on` but underlying runtime does not support virtual threads, an Exception will throw.
493+
# Virtualize this dispatcher as a virtual-thread-executor
494+
# Valid values are: `on`, `off`
495+
virtualize = off
490496
}
491497

492498
# This will be used if you have set "executor = "thread-pool-executor""
@@ -538,6 +544,12 @@ pekko {
538544

539545
# Allow core threads to time out
540546
allow-core-timeout = on
547+
548+
# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
549+
# When set to `on` but underlying runtime does not support virtual threads, an Exception will throw.
550+
# Virtualize this dispatcher as a virtual-thread-executor
551+
# Valid values are: `on`, `off`
552+
virtualize = off
541553
}
542554

543555
# This will be used if you have set "executor = "virtual-thread-executor"

actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
444444
@unused prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = {
445445
import org.apache.pekko.util.Helpers.ConfigOps
446446
val builder =
447-
ThreadPoolConfigBuilder(ThreadPoolConfig())
447+
ThreadPoolConfigBuilder(ThreadPoolConfig(virtualize = false))
448448
.setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
449449
.setAllowCoreThreadTimeout(config.getBoolean("allow-core-timeout"))
450450
.configure(Some(config.getInt("task-queue-size")).flatMap {
@@ -474,6 +474,10 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
474474
config.getInt("max-pool-size-max"))
475475
else
476476
builder.setFixedPoolSize(config.getInt("fixed-pool-size"))
477+
478+
if (config.getBoolean("virtualize")) {
479+
builder.setVirtualize(true)
480+
} else builder
477481
}
478482

479483
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =

actor/src/main/scala/org/apache/pekko/dispatch/Dispatchers.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer
390390
this.getClass,
391391
"PinnedDispatcher [%s] not configured to use ThreadPoolExecutor, falling back to default config.".format(
392392
config.getString("id"))))
393-
ThreadPoolConfig()
393+
ThreadPoolConfig(virtualize = false)
394394
}
395395

396396
/**

actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala

+31-9
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,28 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
8686
}
8787

8888
class ForkJoinExecutorServiceFactory(
89+
val id: String,
8990
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
9091
val parallelism: Int,
9192
val asyncMode: Boolean,
92-
val maxPoolSize: Int)
93+
val maxPoolSize: Int,
94+
val virtualize: Boolean)
9395
extends ExecutorServiceFactory {
96+
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
97+
parallelism: Int,
98+
asyncMode: Boolean,
99+
maxPoolSize: Int,
100+
virtualize: Boolean) =
101+
this(null, threadFactory, parallelism, asyncMode, maxPoolSize, virtualize)
102+
103+
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
104+
parallelism: Int,
105+
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap, false)
94106

95107
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
96108
parallelism: Int,
97-
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap)
109+
asyncMode: Boolean,
110+
maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode, maxPoolSize, false)
98111

99112
private def pekkoJdk9ForkJoinPoolClassOpt: Option[Class[_]] =
100113
Try(Class.forName("org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool")).toOption
@@ -116,12 +129,19 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
116129
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) =
117130
this(threadFactory, parallelism, asyncMode = true)
118131

119-
def createExecutorService: ExecutorService = pekkoJdk9ForkJoinPoolHandleOpt match {
120-
case Some(handle) =>
121-
handle.invoke(parallelism, threadFactory, maxPoolSize,
122-
MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService]
123-
case _ =>
124-
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
132+
def createExecutorService: ExecutorService = {
133+
val forkJoinPool = pekkoJdk9ForkJoinPoolHandleOpt match {
134+
case Some(handle) =>
135+
handle.invoke(parallelism, threadFactory, maxPoolSize,
136+
MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService]
137+
case _ =>
138+
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
139+
}
140+
if (virtualize) {
141+
new VirtualizedExecutorService(id, forkJoinPool)
142+
} else {
143+
forkJoinPool
144+
}
125145
}
126146
}
127147

@@ -143,12 +163,14 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
143163
}
144164

145165
new ForkJoinExecutorServiceFactory(
166+
id,
146167
validate(tf),
147168
ThreadPoolConfig.scaledPoolSize(
148169
config.getInt("parallelism-min"),
149170
config.getDouble("parallelism-factor"),
150171
config.getInt("parallelism-max")),
151172
asyncMode,
152-
config.getInt("maximum-pool-size"))
173+
config.getInt("maximum-pool-size"),
174+
config.getBoolean("virtualize"))
153175
}
154176
}

0 commit comments

Comments
 (0)