Skip to content

Commit 68f0587

Browse files
committed
feat: Add support for virtualize fork join pool
1 parent 603397b commit 68f0587

File tree

6 files changed

+186
-11
lines changed

6 files changed

+186
-11
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* license agreements; and to You under the Apache License, version 2.0:
4+
*
5+
* https://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* This file is part of the Apache Pekko project, which was derived from Akka.
8+
*/
9+
10+
/*
11+
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
12+
*/
13+
14+
package org.apache.pekko.dispatch
15+
16+
import com.typesafe.config.ConfigFactory
17+
18+
import org.apache.pekko
19+
import pekko.actor.{ Actor, Props }
20+
import pekko.testkit.{ ImplicitSender, PekkoSpec }
21+
import pekko.util.JavaVersion
22+
23+
object ForkJoinPoolVirtualzedSpec {
24+
val config = ConfigFactory.parseString("""
25+
|virtual {
26+
| task-dispatcher {
27+
| mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
28+
| throughput = 5
29+
| fork-join-executor {
30+
| parallelism-factor = 2
31+
| parallelism-max = 2
32+
| parallelism-min = 2
33+
| virtualize = on
34+
| }
35+
| }
36+
|}
37+
""".stripMargin)
38+
39+
class ThreadNameActor extends Actor {
40+
41+
override def receive = {
42+
case "ping" =>
43+
sender() ! Thread.currentThread().getName
44+
}
45+
}
46+
47+
}
48+
49+
class ForkJoinPoolVirtualzedSpec extends PekkoSpec(ForkJoinPoolVirtualzedSpec.config) with ImplicitSender {
50+
import ForkJoinPoolVirtualzedSpec._
51+
52+
"PekkoForkJoinPool" must {
53+
54+
"support virtualization with Virtual Thread" in {
55+
val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("virtual.task-dispatcher"))
56+
57+
for (_ <- 1 to Iterations) {
58+
// External task submission via the default dispatcher
59+
actor ! "ping"
60+
expectMsgPF() { case name: String => name should contains("virtual-thread") }
61+
}
62+
}
63+
64+
}
65+
}

actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala

-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ object VirtualThreadPoolDispatcherSpec {
4343
class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender {
4444
import VirtualThreadPoolDispatcherSpec._
4545

46-
val Iterations = 1000
47-
4846
"VirtualThreadPool support" must {
4947

5048
"handle simple dispatch" in {

actor/src/main/resources/reference.conf

+5
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,11 @@ pekko {
487487
# This config is new in Pekko v1.1.0 and only has an effect if you are running with JDK 9 and above.
488488
# Read the documentation on `java.util.concurrent.ForkJoinPool` to find out more. Default in hex is 0x7fff.
489489
maximum-pool-size = 32767
490+
491+
# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above.
492+
# Virtualize this dispatcher as a virtual-thread-executor
493+
# Valid values are: `on`, `off`
494+
virtualize = off
490495
}
491496

492497
# This will be used if you have set "executor = "thread-pool-executor""

actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala

+23-9
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,18 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
8989
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
9090
val parallelism: Int,
9191
val asyncMode: Boolean,
92-
val maxPoolSize: Int)
92+
val maxPoolSize: Int,
93+
val virtualize: Boolean)
9394
extends ExecutorServiceFactory {
9495

9596
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
9697
parallelism: Int,
97-
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap)
98+
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap, false)
99+
100+
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
101+
parallelism: Int,
102+
asyncMode: Boolean,
103+
maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode, maxPoolSize, false)
98104

99105
private def pekkoJdk9ForkJoinPoolClassOpt: Option[Class[_]] =
100106
Try(Class.forName("org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool")).toOption
@@ -116,12 +122,19 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
116122
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) =
117123
this(threadFactory, parallelism, asyncMode = true)
118124

119-
def createExecutorService: ExecutorService = pekkoJdk9ForkJoinPoolHandleOpt match {
120-
case Some(handle) =>
121-
handle.invoke(parallelism, threadFactory, maxPoolSize,
122-
MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService]
123-
case _ =>
124-
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
125+
def createExecutorService: ExecutorService = {
126+
val forkJoinPool = pekkoJdk9ForkJoinPoolHandleOpt match {
127+
case Some(handle) =>
128+
handle.invoke(parallelism, threadFactory, maxPoolSize,
129+
MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService]
130+
case _ =>
131+
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
132+
}
133+
if (virtualize) {
134+
new VirtualizedExecutorService("pekko", forkJoinPool)
135+
} else {
136+
forkJoinPool
137+
}
125138
}
126139
}
127140

@@ -149,6 +162,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
149162
config.getDouble("parallelism-factor"),
150163
config.getInt("parallelism-max")),
151164
asyncMode,
152-
config.getInt("maximum-pool-size"))
165+
config.getInt("maximum-pool-size"),
166+
config.getBoolean("virtualize"))
153167
}
154168
}

actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala

+4
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ object ThreadPoolConfig {
6464
* Function0 without the fun stuff (mostly for the sake of the Java API side of things)
6565
*/
6666
trait ExecutorServiceFactory {
67+
68+
/**
69+
* Create a new ExecutorService
70+
*/
6771
def createExecutorService: ExecutorService
6872
}
6973

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.dispatch
19+
20+
import org.apache.pekko.annotation.InternalApi
21+
22+
import java.util
23+
import java.util.concurrent.{ Callable, ExecutorService, Future, TimeUnit }
24+
25+
/**
26+
* A virtualized executor service that creates a new virtual thread for each task.
27+
* Will shut down the underlying executor service when this executor is being shutdown.
28+
*
29+
* INTERNAL API
30+
*/
31+
@InternalApi
32+
final class VirtualizedExecutorService(prefix: String, underlying: ExecutorService) extends ExecutorService {
33+
private val executor = VirtualThreadSupport.newThreadPerTaskExecutor(prefix, underlying)
34+
35+
override def shutdown(): Unit = {
36+
executor.shutdown()
37+
underlying.shutdown()
38+
}
39+
40+
override def shutdownNow(): util.List[Runnable] = {
41+
executor.shutdownNow()
42+
underlying.shutdownNow()
43+
}
44+
45+
override def isShutdown: Boolean = {
46+
executor.isShutdown || underlying.isShutdown
47+
}
48+
49+
override def isTerminated: Boolean = {
50+
executor.isTerminated && underlying.isTerminated
51+
}
52+
53+
override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = {
54+
executor.awaitTermination(timeout, unit) && underlying.awaitTermination(timeout, unit)
55+
}
56+
57+
override def submit[T](task: Callable[T]): Future[T] = {
58+
executor.submit(task)
59+
}
60+
61+
override def submit[T](task: Runnable, result: T): Future[T] = {
62+
executor.submit(task, result)
63+
}
64+
65+
override def submit(task: Runnable): Future[_] = {
66+
executor.submit(task)
67+
}
68+
69+
override def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = {
70+
executor.invokeAll(tasks)
71+
}
72+
73+
override def invokeAll[T](
74+
tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = {
75+
executor.invokeAll(tasks, timeout, unit)
76+
}
77+
78+
override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = {
79+
executor.invokeAny(tasks)
80+
}
81+
82+
override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = {
83+
executor.invokeAny(tasks, timeout, unit)
84+
}
85+
86+
override def execute(command: Runnable): Unit = {
87+
executor.execute(command)
88+
}
89+
}

0 commit comments

Comments
 (0)