Skip to content

Commit dfca939

Browse files
authored
feat: virtual thread executor support (#32689)
1 parent 37cbc4f commit dfca939

File tree

7 files changed

+295
-7
lines changed

7 files changed

+295
-7
lines changed

akka-actor-tests/src/test/scala/akka/dispatch/ForkJoinPoolStarvationSpec.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
package akka.dispatch
66

77
import com.typesafe.config.ConfigFactory
8-
98
import akka.actor.{ Actor, Props }
109
import akka.testkit.{ AkkaSpec, ImplicitSender }
10+
import akka.util.JavaVersion
1111

1212
object ForkJoinPoolStarvationSpec {
1313
val config = ConfigFactory.parseString("""
@@ -52,8 +52,7 @@ class ForkJoinPoolStarvationSpec extends AkkaSpec(ForkJoinPoolStarvationSpec.con
5252

5353
"not starve tasks arriving from external dispatchers under high internal traffic" in {
5454
// TODO issue #31117: starvation with JDK 17 FJP
55-
val javaSpecVersion = System.getProperty("java.specification.version").toInt
56-
if (javaSpecVersion >= 17)
55+
if (JavaVersion.majorVersion >= 17)
5756
pending
5857

5958
// Two busy actors that will occupy the threads of the dispatcher
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright (C) 2009-2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.dispatch
6+
7+
import akka.actor.Actor
8+
import akka.actor.ActorSystem
9+
import akka.actor.Props
10+
import akka.testkit.TestKit
11+
import akka.testkit.TestProbe
12+
import akka.util.JavaVersion
13+
import com.typesafe.config.ConfigFactory
14+
import org.scalatest.matchers.should.Matchers
15+
import org.scalatest.wordspec.AnyWordSpec
16+
17+
object VirtualThreadDispatcherSpec {
18+
final case class ThreadInfo(virtual: Boolean, name: String)
19+
20+
object ThreadInfoActor {
21+
def props() = Props(new ThreadInfoActor)
22+
}
23+
private class ThreadInfoActor extends Actor {
24+
override def receive: Receive = {
25+
case "give-me-info" =>
26+
sender() ! reflectiveVirtualThreadInfo()
27+
}
28+
}
29+
30+
private def reflectiveVirtualThreadInfo(): ThreadInfo = {
31+
val thread = Thread.currentThread()
32+
// can't use methods directly or test won't compile on jdk < 21
33+
val isVirtualMethod = thread.getClass.getMethod("isVirtual")
34+
val isVirtual = isVirtualMethod.invoke(thread).asInstanceOf[Boolean]
35+
ThreadInfo(isVirtual, thread.getName)
36+
}
37+
}
38+
39+
class VirtualThreadDispatcherSpec extends AnyWordSpec with Matchers {
40+
import VirtualThreadDispatcherSpec._
41+
42+
"The virtual thread support" should {
43+
44+
"run tasks on virtual threads" in {
45+
if (JavaVersion.majorVersion < 21) {
46+
// loom not available yet here
47+
pending
48+
} else {
49+
implicit val system: ActorSystem = ActorSystem(
50+
classOf[VirtualThreadDispatcherSpec].getSimpleName,
51+
ConfigFactory.parseString("""
52+
my-vt-dispatcher {
53+
type = "Dispatcher"
54+
executor = virtual-thread-executor
55+
}
56+
""").withFallback(ConfigFactory.load()))
57+
58+
try {
59+
val vtDispatcher = system.dispatchers.lookup("my-vt-dispatcher")
60+
vtDispatcher shouldBe a[BatchingExecutor]
61+
62+
val threadIsVirtualProbe = TestProbe()
63+
vtDispatcher.execute(() => {
64+
threadIsVirtualProbe.ref ! reflectiveVirtualThreadInfo()
65+
})
66+
val info = threadIsVirtualProbe.expectMsgType[ThreadInfo]
67+
info.virtual shouldBe true
68+
info.name should endWith("my-vt-dispatcher")
69+
} finally {
70+
TestKit.shutdownActorSystem(system)
71+
}
72+
73+
}
74+
}
75+
76+
"can be used as default dispatcher" in {
77+
if (JavaVersion.majorVersion < 21) {
78+
// loom not available yet here
79+
pending
80+
} else {
81+
// not necessarily a good idea because of the virtual thread per task overhead, but to know it works
82+
// and to cover running actors on it
83+
implicit val system: ActorSystem = ActorSystem(
84+
classOf[VirtualThreadDispatcherSpec].getSimpleName,
85+
ConfigFactory.parseString("""
86+
akka.actor.default-dispatcher.executor="virtual-thread-executor"
87+
""").withFallback(ConfigFactory.load()))
88+
try {
89+
val echo = system.actorOf(ThreadInfoActor.props())
90+
val responseProbe = TestProbe()
91+
echo.tell("give-me-info", responseProbe.ref)
92+
val info = responseProbe.expectMsgType[ThreadInfo]
93+
info.virtual shouldBe true
94+
info.name should endWith("akka.actor.default-dispatcher")
95+
} finally {
96+
TestKit.shutdownActorSystem(system)
97+
}
98+
}
99+
}
100+
101+
"can be configured with a fallback for work on all JVMs" in {
102+
implicit val system: ActorSystem = ActorSystem(
103+
classOf[VirtualThreadDispatcherSpec].getSimpleName,
104+
ConfigFactory.parseString("""
105+
my-vt-dispatcher {
106+
type = "Dispatcher"
107+
executor = virtual-thread-executor
108+
virtual-thread-executor {
109+
fallback="fork-join-executor"
110+
}
111+
}
112+
""").withFallback(ConfigFactory.load()))
113+
114+
try {
115+
val dispatcher = system.dispatchers.lookup("my-vt-dispatcher")
116+
val threadInfoProbe = TestProbe()
117+
dispatcher.execute(() => {
118+
threadInfoProbe.ref ! "ok"
119+
})
120+
threadInfoProbe.expectMsg("ok")
121+
} finally {
122+
TestKit.shutdownActorSystem(system)
123+
}
124+
}
125+
}
126+
127+
}

akka-actor/src/main/resources/reference.conf

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ akka {
370370
# - "fork-join-executor" requires a "fork-join-executor" section
371371
# - "thread-pool-executor" requires a "thread-pool-executor" section
372372
# - "affinity-pool-executor" requires an "affinity-pool-executor" section
373+
# - "virtual-thread-executor" requires a "virtual-thread-executor" section
373374
# - A FQCN of a class extending ExecutorServiceConfigurator
374375
executor = "default-executor"
375376

@@ -527,6 +528,31 @@ akka {
527528
allow-core-timeout = on
528529
}
529530

531+
# This will be used if you have set "executor = "virtual-thread-executor"
532+
# The virtual thread executor can only be used on JDK 21 an newer and runs each
533+
# task in a virtual thread.
534+
#
535+
# Note that while running actors on this will make sure they do not interfer with each other by starving
536+
# the dispatcher, an individual actor should still not be blocked since that means it cannot act on messages
537+
# while blocked, for example system messages such as termination of the actor. Using the virtual-thread-executor
538+
# comes at the price of some overhead, since each scheduled task is wrapped in a virtual thread instance, in
539+
# benchmarks this has shown up as around 30% lower throughput, a higher allocation rate and 10% more time spent on GC
540+
#
541+
# For other use cases, such as future calls invoking blocking work, or using as blocking-io-dispatcher the executor
542+
# may be a more sound choice.
543+
#
544+
# It is not possible to tune the carrier thread pool per executor however it is possible to globally
545+
# control the JVM carrier thread pool size through system properties. See JVM
546+
# Thread documentation for details: https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/Thread.html
547+
virtual-thread-executor {
548+
# If run on a mixture of JDK versions, some not supporting virtual threads (JDK 17 and earlier),
549+
# this can be used to specify an alternative executor to make sure an application always works
550+
# by default this is empty so that expecting to be able to run virtual threads when it is not possible
551+
# will not go unnoticed.
552+
fallback = ""
553+
}
554+
555+
530556
# How long time the dispatcher will wait for new actors until it shuts down
531557
shutdown-timeout = 1s
532558

akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,19 @@ package akka.dispatch
66

77
import java.{ util => ju }
88
import java.util.concurrent._
9-
109
import scala.annotation.nowarn
1110
import scala.annotation.tailrec
1211
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
1312
import scala.concurrent.duration.{ Duration, FiniteDuration }
1413
import scala.util.control.NonFatal
15-
1614
import com.typesafe.config.Config
17-
1815
import akka.actor._
1916
import akka.annotation.InternalStableApi
2017
import akka.dispatch.affinity.AffinityPoolConfigurator
2118
import akka.dispatch.sysmsg._
2219
import akka.event.EventStream
2320
import akka.event.Logging.{ Debug, Error, LogEventException }
21+
import akka.util.JavaVersion
2422
import akka.util.{ Index, Unsafe }
2523

2624
final case class Envelope private (message: Any, sender: ActorRef) {
@@ -365,6 +363,19 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
365363
new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
366364
case "affinity-pool-executor" =>
367365
new AffinityPoolConfigurator(config.getConfig("affinity-pool-executor"), prerequisites)
366+
case "virtual-thread-executor" =>
367+
val executorConfig = config.getConfig("virtual-thread-executor")
368+
if (VirtualThreadConfigurator.virtualThreadsSupported()) {
369+
new VirtualThreadConfigurator(executorConfig, prerequisites)
370+
} else {
371+
val fallbackExecutorName = executorConfig.getString("fallback")
372+
if (fallbackExecutorName.isEmpty)
373+
throw new RuntimeException(
374+
s"Dispatcher configured to use virtual threads, but JVM version ${JavaVersion.majorVersion} does not support that. " +
375+
"Use a newer Java version (21 or later), or configure 'virtual-thread-executor.fallback' for an alternative executor on older Java versions.")
376+
else
377+
configurator(fallbackExecutorName)
378+
}
368379

369380
case fqcn =>
370381
val args = List(classOf[Config] -> config, classOf[DispatcherPrerequisites] -> prerequisites)
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright (C) 2009-2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.dispatch
6+
7+
import akka.actor.DynamicAccess
8+
import akka.annotation.InternalApi
9+
import akka.util.JavaVersion
10+
import com.typesafe.config.Config
11+
12+
import java.util.concurrent.ExecutorService
13+
import java.util.concurrent.Executors
14+
import java.util.concurrent.ThreadFactory
15+
16+
/**
17+
* INTERNAL API
18+
*/
19+
@InternalApi
20+
private[akka] object VirtualThreadConfigurator {
21+
22+
def virtualThreadsSupported(): Boolean = JavaVersion.majorVersion >= 21
23+
24+
// Note: since we still support JDK 11 and 17, we need to access the factory method reflectively
25+
private def createVirtualThreadFactory(
26+
dynamicAccess: DynamicAccess,
27+
virtualThreadName: String,
28+
uncaughtExceptionHandler: Option[Thread.UncaughtExceptionHandler]): ThreadFactory = {
29+
val ofVirtualMethod =
30+
try {
31+
classOf[Thread].getMethod("ofVirtual")
32+
} catch {
33+
case _: NoSuchMethodError =>
34+
throw new IllegalStateException("Virtual thread executors only supported on JDK 21 and newer")
35+
}
36+
val ofVirtual = ofVirtualMethod.invoke(null)
37+
val ofVirtualInterface = dynamicAccess.getClassFor[AnyRef]("java.lang.Thread$Builder$OfVirtual").get
38+
39+
// thread names
40+
val ofVirtualWithName =
41+
if (virtualThreadName.nonEmpty) {
42+
val nameMethod = ofVirtualInterface.getMethod("name", classOf[String])
43+
nameMethod.invoke(ofVirtual, virtualThreadName)
44+
} else ofVirtual
45+
46+
// uncaught exception handler
47+
val ofVirtualWithUEH = uncaughtExceptionHandler match {
48+
case Some(ueh) =>
49+
val uncaughtExceptionHandlerMethod =
50+
ofVirtualInterface.getMethod("uncaughtExceptionHandler", classOf[Thread.UncaughtExceptionHandler])
51+
uncaughtExceptionHandlerMethod.invoke(ofVirtualWithName, ueh)
52+
case None => ofVirtualWithName
53+
}
54+
val factoryMethod = ofVirtualInterface.getMethod("factory")
55+
factoryMethod.invoke(ofVirtualWithUEH).asInstanceOf[ThreadFactory]
56+
}
57+
58+
private def threadPerTaskExecutor(threadFactory: ThreadFactory): ExecutorService = {
59+
val newThreadPerTaskMethod = classOf[Executors].getMethod("newThreadPerTaskExecutor", classOf[ThreadFactory])
60+
newThreadPerTaskMethod.invoke(null, threadFactory).asInstanceOf[ExecutorService]
61+
}
62+
63+
private class VirtualThreadExecutorServiceFactory(tf: ThreadFactory) extends ExecutorServiceFactory {
64+
override def createExecutorService: ExecutorService = threadPerTaskExecutor(tf)
65+
}
66+
67+
}
68+
69+
/**
70+
* INTERNAL API
71+
*/
72+
@InternalApi
73+
private[akka] class VirtualThreadConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
74+
extends ExecutorServiceConfigurator(config, prerequisites) {
75+
import VirtualThreadConfigurator._
76+
77+
override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
78+
79+
val tf = threadFactory match {
80+
case m: MonitorableThreadFactory =>
81+
// add the dispatcher id to the thread names
82+
val virtualThreadFactory =
83+
createVirtualThreadFactory(prerequisites.dynamicAccess, m.name + "-" + id, Some(m.exceptionHandler))
84+
85+
// Note: daemonic false not allowed for virtual threads, so we ignore that
86+
m.contextClassLoader.fold[ThreadFactory](virtualThreadFactory)(classLoader =>
87+
(r: Runnable) => {
88+
val virtualThread = virtualThreadFactory.newThread(r)
89+
virtualThread.setContextClassLoader(classLoader)
90+
virtualThread
91+
})
92+
93+
case _ => createVirtualThreadFactory(prerequisites.dynamicAccess, "-" + id, None)
94+
}
95+
new VirtualThreadExecutorServiceFactory(tf)
96+
}
97+
98+
}

akka-bench-jmh/src/main/scala/akka/actor/ActorBenchmark.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class ActorBenchmark {
4545
"akka.actor.JCToolsMailbox"))
4646
var mailbox = ""
4747

48-
@Param(Array("fjp-dispatcher")) // @Param(Array("fjp-dispatcher", "affinity-dispatcher"))
48+
@Param(Array("fjp-dispatcher", "vt-dispatcher")) // @Param(Array("fjp-dispatcher", "affinity-dispatcher"))
4949
var dispatcher = ""
5050

5151
implicit var system: ActorSystem = _
@@ -85,6 +85,11 @@ class ActorBenchmark {
8585
throughput = $tpt
8686
mailbox-type = "$mailbox"
8787
}
88+
vt-dispatcher {
89+
executor = "virtual-thread-executor"
90+
throughput = $tpt
91+
mailbox-type = "$mailbox"
92+
}
8893
}
8994
"""))
9095
}

akka-docs/src/main/paradox/typed/dispatchers.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,28 @@ applications.
330330

331331
For a similar discussion specifically about Akka HTTP, refer to @extref[Handling blocking operations in Akka HTTP](akka.http:handling-blocking-operations-in-akka-http-routes.html).
332332

333+
### Solution: Virtual threads dispatcher for blocking operations
334+
335+
If running on Java 21 or later, it is possible to use virtual threads for a blocking dispatcher, configure
336+
the executor of the dispatcher to be `virtual-thread-executor`.
337+
338+
The virtual thread executor will run every task in a virtual thread, which can detach from of the OS-level thread
339+
when it is waiting for a blocking operation, much like how an async task allows threads to be handed back to
340+
a thread pool, until some task completes.
341+
342+
Re-configuring the built-in blocking dispatcher to use virtual threads can be done like this:
343+
344+
```ruby
345+
akka.actor.default-blocking-io-dispatcher {
346+
executor = "virtual-thread-executor"
347+
}
348+
```
349+
350+
Note that there is a difference in behavior compared to using a thread pool dispatcher in that there is no limit
351+
to how many virtual threads can block, for example hitting a service and waiting for a response,
352+
while the threadpool executor puts an upper limit (16 by default) on how many threads are actually in flight,
353+
once that limit has been reached, additional tasks are queued until a thread becomes available.
354+
333355
### Available solutions to blocking operations
334356

335357
The non-exhaustive list of adequate solutions to the “blocking problem”

0 commit comments

Comments
 (0)