diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java index 62024a0aad4..8188b906c01 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java @@ -15,6 +15,7 @@ import io.vertx.core.impl.transports.NioTransport; import io.vertx.core.internal.VertxBootstrap; import io.vertx.core.eventbus.impl.clustered.DefaultNodeSelector; +import io.vertx.core.spi.*; import io.vertx.core.spi.context.executor.EventExecutorProvider; import io.vertx.core.spi.file.FileResolver; import io.vertx.core.file.impl.FileResolverImpl; @@ -22,18 +23,16 @@ import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.json.JsonObject; import io.vertx.core.spi.transport.Transport; -import io.vertx.core.spi.ExecutorServiceFactory; -import io.vertx.core.spi.VertxMetricsFactory; -import io.vertx.core.spi.VertxServiceProvider; -import io.vertx.core.spi.VertxThreadFactory; -import io.vertx.core.spi.VertxTracerFactory; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.eventbus.impl.clustered.NodeSelector; import io.vertx.core.spi.metrics.VertxMetrics; import io.vertx.core.spi.tracing.VertxTracer; +import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; /** * Bootstrap implementation. @@ -53,7 +52,7 @@ public class VertxBootstrapImpl implements VertxBootstrap { private NodeSelector clusterNodeSelector; private VertxTracerFactory tracerFactory; private VertxTracer tracer; - private VertxThreadFactory threadFactory; + private ThreadFactoryProvider threadFactoryProvider; private ExecutorServiceFactory executorServiceFactory; private VertxMetricsFactory metricsFactory; private VertxMetrics metrics; @@ -199,14 +198,25 @@ public VertxBootstrapImpl fileResolver(FileResolver resolver) { } public VertxThreadFactory threadFactory() { - return threadFactory; + throw new UnsupportedOperationException(); } public VertxBootstrapImpl threadFactory(VertxThreadFactory factory) { - this.threadFactory = factory; + this.threadFactoryProvider = factory.provider(); return this; } + @Override + public VertxBootstrap threadFactoryProvider(ThreadFactoryProvider provider) { + this.threadFactoryProvider = provider; + return this; + } + + @Override + public ThreadFactoryProvider threadFactoryProvider() { + return threadFactoryProvider; + } + public ExecutorServiceFactory executorServiceFactory() { return executorServiceFactory; } @@ -237,7 +247,7 @@ private VertxImpl instantiateVertx(ClusterManager clusterManager, NodeSelector n tr, transportUnavailabilityCause, fileResolver, - threadFactory, + threadFactoryProvider, executorServiceFactory, eventExecutorProvider, enableShadowContext); @@ -316,10 +326,10 @@ private void initFileResolver() { } private void initThreadFactory() { - if (threadFactory != null) { + if (threadFactoryProvider != null) { return; } - threadFactory = VertxThreadFactory.INSTANCE; + threadFactoryProvider = VertxThreadFactory.INSTANCE.provider(); } private void initExecutorServiceFactory() { diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java index fab8d506866..dcb8ce1348a 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -50,6 +50,7 @@ import io.vertx.core.net.impl.tcp.NetClientBuilder; import io.vertx.core.net.impl.tcp.NetServerImpl; import io.vertx.core.net.impl.tcp.NetServerInternal; +import io.vertx.core.spi.ThreadFactoryProvider; import io.vertx.core.spi.context.executor.EventExecutorProvider; import io.vertx.core.spi.context.storage.AccessMode; import io.vertx.core.spi.context.storage.ContextLocal; @@ -77,6 +78,7 @@ import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -154,7 +156,7 @@ private static ThreadFactory virtualThreadFactory() { final WorkerPool workerPool; final WorkerPool internalWorkerPool; final WorkerPool virtualThreadWorkerPool; - private final VertxThreadFactory threadFactory; + private final ThreadFactoryProvider threadFactoryProvider; private final ExecutorServiceFactory executorServiceFactory; private final ThreadFactory eventLoopThreadFactory; private final EventLoopGroup eventLoopGroup; @@ -183,7 +185,7 @@ private static ThreadFactory virtualThreadFactory() { VertxImpl(VertxOptions options, ClusterManager clusterManager, NodeSelector nodeSelector, VertxMetrics metrics, VertxTracer tracer, Transport transport, Throwable transportUnavailabilityCause, - FileResolver fileResolver, VertxThreadFactory threadFactory, ExecutorServiceFactory executorServiceFactory, + FileResolver fileResolver, ThreadFactoryProvider threadFactoryProvider, ExecutorServiceFactory executorServiceFactory, EventExecutorProvider eventExecutorProvider, boolean enableShadowContext) { // Sanity check if (Vertx.currentContext() != null) { @@ -196,14 +198,14 @@ private static ThreadFactory virtualThreadFactory() { BlockedThreadChecker checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(), options.getWarningExceptionTimeUnit()); long maxEventLoopExecuteTime = options.getMaxEventLoopExecuteTime(); TimeUnit maxEventLoopExecuteTimeUnit = options.getMaxEventLoopExecuteTimeUnit(); - ThreadFactory acceptorEventLoopThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxEventLoopExecuteTime, maxEventLoopExecuteTimeUnit, "vert.x-acceptor-thread-", false); + ThreadFactory acceptorEventLoopThreadFactory = createThreadFactory(threadFactoryProvider, checker, useDaemonThread, maxEventLoopExecuteTime, maxEventLoopExecuteTimeUnit, "vert.x-acceptor-thread-", false); TimeUnit maxWorkerExecuteTimeUnit = options.getMaxWorkerExecuteTimeUnit(); long maxWorkerExecuteTime = options.getMaxWorkerExecuteTime(); - ThreadFactory workerThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxWorkerExecuteTime, maxWorkerExecuteTimeUnit, "vert.x-worker-thread-", true); + ThreadFactory workerThreadFactory = createThreadFactory(threadFactoryProvider, checker, useDaemonThread, maxWorkerExecuteTime, maxWorkerExecuteTimeUnit, "vert.x-worker-thread-", true); ExecutorService workerExec = executorServiceFactory.createExecutor(workerThreadFactory, workerPoolSize, workerPoolSize); PoolMetrics workerPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null; - ThreadFactory internalWorkerThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxWorkerExecuteTime, maxWorkerExecuteTimeUnit, "vert.x-internal-blocking-", true); + ThreadFactory internalWorkerThreadFactory = createThreadFactory(threadFactoryProvider, checker, useDaemonThread, maxWorkerExecuteTime, maxWorkerExecuteTimeUnit, "vert.x-internal-blocking-", true); ExecutorService internalWorkerExec = executorServiceFactory.createExecutor(internalWorkerThreadFactory, internalBlockingPoolSize, internalBlockingPoolSize); PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", internalBlockingPoolSize) : null; @@ -215,7 +217,7 @@ private static ThreadFactory virtualThreadFactory() { closeFuture = new CloseFuture(log); maxEventLoopExecTime = maxEventLoopExecuteTime; maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit; - eventLoopThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxEventLoopExecTime, maxEventLoopExecTimeUnit, "vert.x-eventloop-thread-", false); + eventLoopThreadFactory = createThreadFactory(threadFactoryProvider, checker, useDaemonThread, maxEventLoopExecTime, maxEventLoopExecTimeUnit, "vert.x-eventloop-thread-", false); eventLoopGroup = transport.eventLoopGroup(Transport.IO_EVENT_LOOP_GROUP, options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO); // The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections // under a lot of load @@ -231,7 +233,7 @@ private static ThreadFactory virtualThreadFactory() { this.checker = checker; this.useDaemonThread = useDaemonThread; this.executorServiceFactory = executorServiceFactory; - this.threadFactory = threadFactory; + this.threadFactoryProvider = threadFactoryProvider; this.metrics = metrics; this.transport = transport; this.transportUnavailabilityCause = transportUnavailabilityCause; @@ -1106,7 +1108,7 @@ private synchronized WorkerPool createSharedWorkerPool(CloseFuture closeFuture, throw new IllegalArgumentException("maxExecuteTime must be > 0"); } WorkerPool shared = createSharedResource("__vertx.shared.workerPools", name, closeFuture, cf -> { - ThreadFactory workerThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxExecuteTime, maxExecuteTimeUnit, name + "-", true); + ThreadFactory workerThreadFactory = createThreadFactory(threadFactoryProvider, checker, useDaemonThread, maxExecuteTime, maxExecuteTimeUnit, name + "-", true); ExecutorService workerExec = executorServiceFactory.createExecutor(workerThreadFactory, poolSize, poolSize); PoolMetrics workerMetrics = metrics != null ? metrics.createPoolMetrics("worker", name, poolSize) : null; WorkerPool pool = new WorkerPool(workerExec, workerMetrics); @@ -1130,14 +1132,17 @@ public WorkerPool wrapWorkerPool(ExecutorService executor) { return new WorkerPool(executor, workerMetrics); } - private ThreadFactory createThreadFactory(VertxThreadFactory threadFactory, BlockedThreadChecker checker, Boolean useDaemonThread, long maxExecuteTime, TimeUnit maxExecuteTimeUnit, String prefix, boolean worker) { - AtomicInteger threadCount = new AtomicInteger(0); + private ThreadFactory createThreadFactory(ThreadFactoryProvider threadFactoryProvider, BlockedThreadChecker checker, Boolean useDaemonThread, long maxExecuteTime, TimeUnit maxExecuteTimeUnit, String prefix, boolean worker) { + ThreadFactory factory = threadFactoryProvider.threadFactory(prefix, worker, Duration.ofMillis(maxExecuteTimeUnit.toMillis(maxExecuteTime))); return runnable -> { - VertxThread thread = threadFactory.newVertxThread(runnable, prefix + threadCount.getAndIncrement(), worker, maxExecuteTime, maxExecuteTimeUnit); - thread.owner = VertxImpl.this; - checker.registerThread(thread, thread.info); - if (useDaemonThread != null && thread.isDaemon() != useDaemonThread) { - thread.setDaemon(useDaemonThread); + Thread thread = factory.newThread(runnable); + if (thread instanceof VertxThread) { + VertxThread vertxThread = (VertxThread) thread; + vertxThread.owner = VertxImpl.this; + checker.registerThread(thread, vertxThread.info); + if (useDaemonThread != null && thread.isDaemon() != useDaemonThread) { + thread.setDaemon(useDaemonThread); + } } return thread; }; diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxBootstrap.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxBootstrap.java index f17d38f42f5..f756c81a5b5 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxBootstrap.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxBootstrap.java @@ -14,10 +14,7 @@ import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.impl.VertxBootstrapImpl; -import io.vertx.core.spi.ExecutorServiceFactory; -import io.vertx.core.spi.VertxMetricsFactory; -import io.vertx.core.spi.VertxThreadFactory; -import io.vertx.core.spi.VertxTracerFactory; +import io.vertx.core.spi.*; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.context.executor.EventExecutorProvider; import io.vertx.core.spi.file.FileResolver; @@ -124,17 +121,17 @@ static VertxBootstrap create() { VertxBootstrap executorServiceFactory(ExecutorServiceFactory factory); /** - * @return the {@code VertxThreadFactory} to use + * Set the {@code ThreadFactoryProvider} instance to use. + * + * @param provider the provider + * @return this builder instance */ - VertxThreadFactory threadFactory(); + VertxBootstrap threadFactoryProvider(ThreadFactoryProvider provider); /** - * Set the {@code VertxThreadFactory} instance to use. - * - * @param factory the metrics - * @return this builder instance + * @return the bootstrap thread factory provider */ - VertxBootstrap threadFactory(VertxThreadFactory factory); + ThreadFactoryProvider threadFactoryProvider(); /** * @return the transport to use diff --git a/vertx-core/src/main/java/io/vertx/core/spi/ThreadFactoryProvider.java b/vertx-core/src/main/java/io/vertx/core/spi/ThreadFactoryProvider.java new file mode 100644 index 00000000000..0ba078c18aa --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/spi/ThreadFactoryProvider.java @@ -0,0 +1,10 @@ +package io.vertx.core.spi; + +import java.time.Duration; +import java.util.concurrent.ThreadFactory; + +public interface ThreadFactoryProvider { + + ThreadFactory threadFactory(String prefix, boolean worker, Duration maxExecuteTime); + +} diff --git a/vertx-core/src/main/java/io/vertx/core/spi/VertxThreadFactory.java b/vertx-core/src/main/java/io/vertx/core/spi/VertxThreadFactory.java index 5df5b7e7518..8336235fd10 100644 --- a/vertx-core/src/main/java/io/vertx/core/spi/VertxThreadFactory.java +++ b/vertx-core/src/main/java/io/vertx/core/spi/VertxThreadFactory.java @@ -26,12 +26,18 @@ public interface VertxThreadFactory extends VertxServiceProvider { @Override default void init(VertxBootstrap builder) { - if (builder.threadFactory() == null) { - builder.threadFactory(this); + if (builder.threadFactoryProvider() == null) { + builder.threadFactoryProvider(provider()); } } default VertxThread newVertxThread(Runnable target, String name, boolean worker, long maxExecTime, TimeUnit maxExecTimeUnit) { return new VertxThread(target, name, worker, maxExecTime, maxExecTimeUnit); } + + default ThreadFactoryProvider provider() { + return (prefix, worker, maxExecuteTime) -> + target -> + newVertxThread(target, prefix, worker, maxExecuteTime.toMillis(), TimeUnit.MILLISECONDS); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/vertx/ThreadFactoryProviderTest.java b/vertx-core/src/test/java/io/vertx/tests/vertx/ThreadFactoryProviderTest.java new file mode 100644 index 00000000000..0315f3d7e75 --- /dev/null +++ b/vertx-core/src/test/java/io/vertx/tests/vertx/ThreadFactoryProviderTest.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2011-2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.tests.vertx; + +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.FastThreadLocalThread; +import io.vertx.core.Vertx; +import io.vertx.core.impl.VertxThread; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.VertxBootstrap; +import io.vertx.core.internal.VertxInternal; +import io.vertx.test.core.AsyncTestBase; +import org.junit.Test; + +import java.util.concurrent.ThreadFactory; + +public class ThreadFactoryProviderTest extends AsyncTestBase { + + @Test + public void testFastThreadLocalFactoryProvider() throws Exception { + VertxBootstrap factory = VertxBootstrap.create(); + VertxInternal vertx = (VertxInternal) factory + .threadFactoryProvider((prefix, worker, maxExecuteTime) -> new ThreadFactory() { + @Override + public Thread newThread(Runnable target) { + return new FastThreadLocalThread(target, prefix); + } + }) + .init() + .vertx(); + ContextInternal context = vertx.createEventLoopContext(); + context.runOnContext(v -> { + assertFalse(Thread.currentThread() instanceof VertxThread); + assertSame(context, Vertx.currentContext()); + EventLoop eventLoop = context.nettyEventLoop(); + assertTrue(eventLoop.inEventLoop()); + testComplete(); + }); + await(); + } +} diff --git a/vertx-core/src/test/java/io/vertx/tests/vertx/VertxBootstrapTest.java b/vertx-core/src/test/java/io/vertx/tests/vertx/VertxBootstrapTest.java index 8b02db47cc5..94c032acff5 100644 --- a/vertx-core/src/test/java/io/vertx/tests/vertx/VertxBootstrapTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/vertx/VertxBootstrapTest.java @@ -174,7 +174,7 @@ public VertxThread newVertxThread(Runnable target, String name, boolean worker, } }; factory - .threadFactory(tf) + .threadFactoryProvider(tf.provider()) .executorServiceFactory(new CustomExecutorServiceFactory()) .init() .vertx()