Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,24 @@
import io.vertx.core.impl.transports.NioTransport;
import io.vertx.core.internal.VertxBootstrap;
import io.vertx.core.eventbus.impl.clustered.DefaultNodeSelector;
import io.vertx.core.spi.*;
import io.vertx.core.spi.context.executor.EventExecutorProvider;
import io.vertx.core.spi.file.FileResolver;
import io.vertx.core.file.impl.FileResolverImpl;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.transport.Transport;
import io.vertx.core.spi.ExecutorServiceFactory;
import io.vertx.core.spi.VertxMetricsFactory;
import io.vertx.core.spi.VertxServiceProvider;
import io.vertx.core.spi.VertxThreadFactory;
import io.vertx.core.spi.VertxTracerFactory;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.eventbus.impl.clustered.NodeSelector;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.core.spi.tracing.VertxTracer;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* Bootstrap implementation.
Expand All @@ -53,7 +52,7 @@ public class VertxBootstrapImpl implements VertxBootstrap {
private NodeSelector clusterNodeSelector;
private VertxTracerFactory tracerFactory;
private VertxTracer tracer;
private VertxThreadFactory threadFactory;
private ThreadFactoryProvider threadFactoryProvider;
private ExecutorServiceFactory executorServiceFactory;
private VertxMetricsFactory metricsFactory;
private VertxMetrics metrics;
Expand Down Expand Up @@ -199,14 +198,25 @@ public VertxBootstrapImpl fileResolver(FileResolver resolver) {
}

public VertxThreadFactory threadFactory() {
return threadFactory;
throw new UnsupportedOperationException();
}

public VertxBootstrapImpl threadFactory(VertxThreadFactory factory) {
this.threadFactory = factory;
this.threadFactoryProvider = factory.provider();
return this;
}

@Override
public VertxBootstrap threadFactoryProvider(ThreadFactoryProvider provider) {
this.threadFactoryProvider = provider;
return this;
}

@Override
public ThreadFactoryProvider threadFactoryProvider() {
return threadFactoryProvider;
}

public ExecutorServiceFactory executorServiceFactory() {
return executorServiceFactory;
}
Expand Down Expand Up @@ -237,7 +247,7 @@ private VertxImpl instantiateVertx(ClusterManager clusterManager, NodeSelector n
tr,
transportUnavailabilityCause,
fileResolver,
threadFactory,
threadFactoryProvider,
executorServiceFactory,
eventExecutorProvider,
enableShadowContext);
Expand Down Expand Up @@ -316,10 +326,10 @@ private void initFileResolver() {
}

private void initThreadFactory() {
if (threadFactory != null) {
if (threadFactoryProvider != null) {
return;
}
threadFactory = VertxThreadFactory.INSTANCE;
threadFactoryProvider = VertxThreadFactory.INSTANCE.provider();
}

private void initExecutorServiceFactory() {
Expand Down
35 changes: 20 additions & 15 deletions vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.vertx.core.net.impl.tcp.NetClientBuilder;
import io.vertx.core.net.impl.tcp.NetServerImpl;
import io.vertx.core.net.impl.tcp.NetServerInternal;
import io.vertx.core.spi.ThreadFactoryProvider;
import io.vertx.core.spi.context.executor.EventExecutorProvider;
import io.vertx.core.spi.context.storage.AccessMode;
import io.vertx.core.spi.context.storage.ContextLocal;
Expand Down Expand Up @@ -77,6 +78,7 @@
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -154,7 +156,7 @@ private static ThreadFactory virtualThreadFactory() {
final WorkerPool workerPool;
final WorkerPool internalWorkerPool;
final WorkerPool virtualThreadWorkerPool;
private final VertxThreadFactory threadFactory;
private final ThreadFactoryProvider threadFactoryProvider;
private final ExecutorServiceFactory executorServiceFactory;
private final ThreadFactory eventLoopThreadFactory;
private final EventLoopGroup eventLoopGroup;
Expand Down Expand Up @@ -183,7 +185,7 @@ private static ThreadFactory virtualThreadFactory() {

VertxImpl(VertxOptions options, ClusterManager clusterManager, NodeSelector nodeSelector, VertxMetrics metrics,
VertxTracer<?, ?> tracer, Transport transport, Throwable transportUnavailabilityCause,
FileResolver fileResolver, VertxThreadFactory threadFactory, ExecutorServiceFactory executorServiceFactory,
FileResolver fileResolver, ThreadFactoryProvider threadFactoryProvider, ExecutorServiceFactory executorServiceFactory,
EventExecutorProvider eventExecutorProvider, boolean enableShadowContext) {
// Sanity check
if (Vertx.currentContext() != null) {
Expand All @@ -196,14 +198,14 @@ private static ThreadFactory virtualThreadFactory() {
BlockedThreadChecker checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(), options.getWarningExceptionTimeUnit());
long maxEventLoopExecuteTime = options.getMaxEventLoopExecuteTime();
TimeUnit maxEventLoopExecuteTimeUnit = options.getMaxEventLoopExecuteTimeUnit();
ThreadFactory acceptorEventLoopThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxEventLoopExecuteTime, maxEventLoopExecuteTimeUnit, "vert.x-acceptor-thread-", false);
ThreadFactory acceptorEventLoopThreadFactory = createThreadFactory(threadFactoryProvider, checker, useDaemonThread, maxEventLoopExecuteTime, maxEventLoopExecuteTimeUnit, "vert.x-acceptor-thread-", false);
TimeUnit maxWorkerExecuteTimeUnit = options.getMaxWorkerExecuteTimeUnit();
long maxWorkerExecuteTime = options.getMaxWorkerExecuteTime();

ThreadFactory workerThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxWorkerExecuteTime, maxWorkerExecuteTimeUnit, "vert.x-worker-thread-", true);
ThreadFactory workerThreadFactory = createThreadFactory(threadFactoryProvider, checker, useDaemonThread, maxWorkerExecuteTime, maxWorkerExecuteTimeUnit, "vert.x-worker-thread-", true);
ExecutorService workerExec = executorServiceFactory.createExecutor(workerThreadFactory, workerPoolSize, workerPoolSize);
PoolMetrics workerPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
ThreadFactory internalWorkerThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxWorkerExecuteTime, maxWorkerExecuteTimeUnit, "vert.x-internal-blocking-", true);
ThreadFactory internalWorkerThreadFactory = createThreadFactory(threadFactoryProvider, checker, useDaemonThread, maxWorkerExecuteTime, maxWorkerExecuteTimeUnit, "vert.x-internal-blocking-", true);
ExecutorService internalWorkerExec = executorServiceFactory.createExecutor(internalWorkerThreadFactory, internalBlockingPoolSize, internalBlockingPoolSize);
PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", internalBlockingPoolSize) : null;

Expand All @@ -215,7 +217,7 @@ private static ThreadFactory virtualThreadFactory() {
closeFuture = new CloseFuture(log);
maxEventLoopExecTime = maxEventLoopExecuteTime;
maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit;
eventLoopThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxEventLoopExecTime, maxEventLoopExecTimeUnit, "vert.x-eventloop-thread-", false);
eventLoopThreadFactory = createThreadFactory(threadFactoryProvider, checker, useDaemonThread, maxEventLoopExecTime, maxEventLoopExecTimeUnit, "vert.x-eventloop-thread-", false);
eventLoopGroup = transport.eventLoopGroup(Transport.IO_EVENT_LOOP_GROUP, options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);
// The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections
// under a lot of load
Expand All @@ -231,7 +233,7 @@ private static ThreadFactory virtualThreadFactory() {
this.checker = checker;
this.useDaemonThread = useDaemonThread;
this.executorServiceFactory = executorServiceFactory;
this.threadFactory = threadFactory;
this.threadFactoryProvider = threadFactoryProvider;
this.metrics = metrics;
this.transport = transport;
this.transportUnavailabilityCause = transportUnavailabilityCause;
Expand Down Expand Up @@ -1106,7 +1108,7 @@ private synchronized WorkerPool createSharedWorkerPool(CloseFuture closeFuture,
throw new IllegalArgumentException("maxExecuteTime must be > 0");
}
WorkerPool shared = createSharedResource("__vertx.shared.workerPools", name, closeFuture, cf -> {
ThreadFactory workerThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxExecuteTime, maxExecuteTimeUnit, name + "-", true);
ThreadFactory workerThreadFactory = createThreadFactory(threadFactoryProvider, checker, useDaemonThread, maxExecuteTime, maxExecuteTimeUnit, name + "-", true);
ExecutorService workerExec = executorServiceFactory.createExecutor(workerThreadFactory, poolSize, poolSize);
PoolMetrics workerMetrics = metrics != null ? metrics.createPoolMetrics("worker", name, poolSize) : null;
WorkerPool pool = new WorkerPool(workerExec, workerMetrics);
Expand All @@ -1130,14 +1132,17 @@ public WorkerPool wrapWorkerPool(ExecutorService executor) {
return new WorkerPool(executor, workerMetrics);
}

private ThreadFactory createThreadFactory(VertxThreadFactory threadFactory, BlockedThreadChecker checker, Boolean useDaemonThread, long maxExecuteTime, TimeUnit maxExecuteTimeUnit, String prefix, boolean worker) {
AtomicInteger threadCount = new AtomicInteger(0);
private ThreadFactory createThreadFactory(ThreadFactoryProvider threadFactoryProvider, BlockedThreadChecker checker, Boolean useDaemonThread, long maxExecuteTime, TimeUnit maxExecuteTimeUnit, String prefix, boolean worker) {
ThreadFactory factory = threadFactoryProvider.threadFactory(prefix, worker, Duration.ofMillis(maxExecuteTimeUnit.toMillis(maxExecuteTime)));
return runnable -> {
VertxThread thread = threadFactory.newVertxThread(runnable, prefix + threadCount.getAndIncrement(), worker, maxExecuteTime, maxExecuteTimeUnit);
thread.owner = VertxImpl.this;
checker.registerThread(thread, thread.info);
if (useDaemonThread != null && thread.isDaemon() != useDaemonThread) {
thread.setDaemon(useDaemonThread);
Thread thread = factory.newThread(runnable);
if (thread instanceof VertxThread) {
VertxThread vertxThread = (VertxThread) thread;
vertxThread.owner = VertxImpl.this;
checker.registerThread(thread, vertxThread.info);
if (useDaemonThread != null && thread.isDaemon() != useDaemonThread) {
thread.setDaemon(useDaemonThread);
}
}
return thread;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.VertxBootstrapImpl;
import io.vertx.core.spi.ExecutorServiceFactory;
import io.vertx.core.spi.VertxMetricsFactory;
import io.vertx.core.spi.VertxThreadFactory;
import io.vertx.core.spi.VertxTracerFactory;
import io.vertx.core.spi.*;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.context.executor.EventExecutorProvider;
import io.vertx.core.spi.file.FileResolver;
Expand Down Expand Up @@ -124,17 +121,17 @@ static VertxBootstrap create() {
VertxBootstrap executorServiceFactory(ExecutorServiceFactory factory);

/**
* @return the {@code VertxThreadFactory} to use
* Set the {@code ThreadFactoryProvider} instance to use.
*
* @param provider the provider
* @return this builder instance
*/
VertxThreadFactory threadFactory();
VertxBootstrap threadFactoryProvider(ThreadFactoryProvider provider);

/**
* Set the {@code VertxThreadFactory} instance to use.
*
* @param factory the metrics
* @return this builder instance
* @return the bootstrap thread factory provider
*/
VertxBootstrap threadFactory(VertxThreadFactory factory);
ThreadFactoryProvider threadFactoryProvider();

/**
* @return the transport to use
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.vertx.core.spi;

import java.time.Duration;
import java.util.concurrent.ThreadFactory;

public interface ThreadFactoryProvider {

ThreadFactory threadFactory(String prefix, boolean worker, Duration maxExecuteTime);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@ public interface VertxThreadFactory extends VertxServiceProvider {

@Override
default void init(VertxBootstrap builder) {
if (builder.threadFactory() == null) {
builder.threadFactory(this);
if (builder.threadFactoryProvider() == null) {
builder.threadFactoryProvider(provider());
}
}

default VertxThread newVertxThread(Runnable target, String name, boolean worker, long maxExecTime, TimeUnit maxExecTimeUnit) {
return new VertxThread(target, name, worker, maxExecTime, maxExecTimeUnit);
}

default ThreadFactoryProvider provider() {
return (prefix, worker, maxExecuteTime) ->
target ->
newVertxThread(target, prefix, worker, maxExecuteTime.toMillis(), TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2011-2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.tests.vertx;

import io.netty.channel.EventLoop;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxThread;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxBootstrap;
import io.vertx.core.internal.VertxInternal;
import io.vertx.test.core.AsyncTestBase;
import org.junit.Test;

import java.util.concurrent.ThreadFactory;

public class ThreadFactoryProviderTest extends AsyncTestBase {

@Test
public void testFastThreadLocalFactoryProvider() throws Exception {
VertxBootstrap factory = VertxBootstrap.create();
VertxInternal vertx = (VertxInternal) factory
.threadFactoryProvider((prefix, worker, maxExecuteTime) -> new ThreadFactory() {
@Override
public Thread newThread(Runnable target) {
return new FastThreadLocalThread(target, prefix);
}
})
.init()
.vertx();
ContextInternal context = vertx.createEventLoopContext();
context.runOnContext(v -> {
assertFalse(Thread.currentThread() instanceof VertxThread);
assertSame(context, Vertx.currentContext());
EventLoop eventLoop = context.nettyEventLoop();
assertTrue(eventLoop.inEventLoop());
testComplete();
});
await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public VertxThread newVertxThread(Runnable target, String name, boolean worker,
}
};
factory
.threadFactory(tf)
.threadFactoryProvider(tf.provider())
.executorServiceFactory(new CustomExecutorServiceFactory())
.init()
.vertx()
Expand Down
Loading