Skip to content

Use virtual threads for I/O operations #25404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ protected void setup(Binder binder)
static void configureClient(HttpClientConfig httpConfig, InternalCommunicationConfig internalCommunicationConfig)
{
httpConfig.setHttp2Enabled(internalCommunicationConfig.isHttp2Enabled());
httpConfig.setUseVirtualThreads(true);

if (internalCommunicationConfig.isHttpsRequired() && internalCommunicationConfig.getKeyStorePath() == null && internalCommunicationConfig.getTrustStorePath() == null) {
configureClientForAutomaticHttps(httpConfig, internalCommunicationConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.inject.multibindings.ProvidesIntoSet;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.http.server.EnableVirtualThreads;
import io.airlift.http.server.HttpServerConfig;
import io.airlift.slice.Slice;
import io.airlift.stats.GcMonitor;
Expand Down Expand Up @@ -167,6 +168,7 @@
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.concurrent.Threads.virtualThreadsNamed;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.discovery.client.DiscoveryBinder.discoveryBinder;
Expand All @@ -182,6 +184,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.Executors.newThreadPerTaskExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

Expand Down Expand Up @@ -213,6 +216,8 @@ protected void setup(Binder binder)
httpServerConfig.setHttp2MaxConcurrentStreams(32 * 1024); // from the default 16K
});

newOptionalBinder(binder, Key.get(boolean.class, EnableVirtualThreads.class))
.setBinding().toInstance(true);
binder.bind(PreparedStatementEncoder.class).in(Scopes.SINGLETON);
binder.bind(HttpRequestSessionContextFactory.class).in(Scopes.SINGLETON);
install(new InternalCommunicationModule());
Expand Down Expand Up @@ -567,7 +572,7 @@ public static Executor createStartupExecutor(ServerConfig config)
return directExecutor();
}
return new BoundedExecutor(
newCachedThreadPool(daemonThreadsNamed("startup-%s")),
newThreadPerTaskExecutor(virtualThreadsNamed("startup-%s")),
Runtime.getRuntime().availableProcessors());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import jakarta.annotation.PreDestroy;

import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.concurrent.Threads.virtualThreadsNamed;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newThreadPerTaskExecutor;

public class GcsFileSystemFactory
implements TrinoFileSystemFactory
Expand All @@ -44,7 +44,7 @@ public GcsFileSystemFactory(GcsFileSystemConfig config, GcsStorageFactory storag
this.pageSize = config.getPageSize();
this.batchSize = config.getBatchSize();
this.storageFactory = requireNonNull(storageFactory, "storageFactory is null");
this.executorService = listeningDecorator(newCachedThreadPool(daemonThreadsNamed("trino-filesystem-gcs-%S")));
this.executorService = listeningDecorator(newThreadPerTaskExecutor(virtualThreadsNamed("trino-filesystem-gcs-%d")));
}

@PreDestroy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkState;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.concurrent.Threads.virtualThreadsNamed;
import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.getRetryStrategy;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newThreadPerTaskExecutor;
import static software.amazon.awssdk.core.checksums.ResponseChecksumValidation.WHEN_REQUIRED;

final class S3FileSystemLoader
Expand All @@ -63,7 +63,7 @@ final class S3FileSystemLoader
private final S3ClientFactory clientFactory;
private final S3Presigner preSigner;
private final S3Context context;
private final ExecutorService uploadExecutor = newCachedThreadPool(daemonThreadsNamed("s3-upload-%s"));
private final ExecutorService uploadExecutor = newThreadPerTaskExecutor(virtualThreadsNamed("s3-upload-%d"));
private final Map<Optional<S3SecurityMappingResult>, S3Client> clients = new ConcurrentHashMap<>();

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.concurrent.Threads.virtualThreadsNamed;
import static io.trino.cache.SafeCaches.buildNonEvictableCache;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newThreadPerTaskExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class SharedHiveMetastoreCache
Expand Down Expand Up @@ -112,7 +112,7 @@ public SharedHiveMetastoreCache(
public void start()
{
if (enabled) {
executorService = newCachedThreadPool(daemonThreadsNamed("hive-metastore-" + catalogName + "-%s"));
executorService = newThreadPerTaskExecutor(virtualThreadsNamed("hive-metastore-" + catalogName + "-%d"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.jdbc;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
Expand Down Expand Up @@ -43,9 +42,11 @@
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static com.google.inject.multibindings.ProvidesIntoOptional.Type.DEFAULT;
import static io.airlift.concurrent.Threads.virtualThreadsNamed;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.base.ClosingBinder.closingBinder;
import static java.util.concurrent.Executors.newThreadPerTaskExecutor;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class JdbcModule
Expand Down Expand Up @@ -111,8 +112,7 @@ public void setup(Binder binder)

newOptionalBinder(binder, Key.get(ExecutorService.class, ForRecordCursor.class))
.setDefault()
.toProvider(MoreExecutors::newDirectExecutorService)
.in(Scopes.SINGLETON);
.toInstance(newThreadPerTaskExecutor(virtualThreadsNamed("jdbc-record-cursor-%s")));

newSetBinder(binder, JdbcQueryEventListener.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
import java.util.concurrent.ExecutorService;

import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.concurrent.Threads.virtualThreadsNamed;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newThreadPerTaskExecutor;

public class RemoteQueryCancellationModule
extends AbstractConfigurationAwareModule
Expand Down Expand Up @@ -67,7 +66,7 @@ public RecordCursorExecutorServiceProvider(CatalogName catalogName)
@Override
public ExecutorService get()
{
return newCachedThreadPool(daemonThreadsNamed(format("%s-record-cursor-%%d", catalogName)));
return newThreadPerTaskExecutor(virtualThreadsNamed(catalogName + "-record-cursor-%d"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@
import io.trino.spi.procedure.Procedure;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.concurrent.Threads.virtualThreadsNamed;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.base.ClosingBinder.closingBinder;
import static io.trino.plugin.base.JdkCompatibilityChecks.verifyConnectorAccessOpened;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newThreadPerTaskExecutor;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class BigQueryConnectorModule
Expand Down Expand Up @@ -127,17 +129,23 @@ public static BigQueryLabelFactory labelFactory(BigQueryConfig config)

@Provides
@Singleton
public ListeningExecutorService provideListeningExecutor(BigQueryConfig config)
public ListeningExecutorService provideListeningExecutor(CatalogName catalogName, BigQueryConfig config)
{
return listeningDecorator(newFixedThreadPool(config.getMetadataParallelism(), daemonThreadsNamed("big-query-%s"))); // limit parallelism
return listeningDecorator(new ThreadPoolExecutor(
config.getMetadataParallelism(),
config.getMetadataParallelism(),
0,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(),
virtualThreadsNamed("bigquery-" + catalogName + "-metadata-%d")));
}

@Provides
@Singleton
@ForBigQueryPageSource
public ExecutorService provideExecutor(CatalogName catalogName)
{
return newCachedThreadPool(daemonThreadsNamed("bigquery-" + catalogName + "-%s"));
return newThreadPerTaskExecutor(virtualThreadsNamed("bigquery-" + catalogName + "-%d"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ private List<String> listRemoteSchemaNames(ConnectorSession session)
.distinct();

// filter out all the ambiguous schemas to prevent failures if anyone tries to access the listed schemas

return remoteSchemaNames.map(remoteSchema -> client.toRemoteDataset(projectId, remoteSchema.toLowerCase(ENGLISH), () -> datasetIds))
.filter(Optional::isPresent)
.map(Optional::get)
Expand Down Expand Up @@ -488,10 +487,10 @@ protected <T, R> Stream<R> processInParallel(List<T> list, Function<T, R> functi
return Stream.of(function.apply(list.getFirst()));
}

List<ListenableFuture<R>> futures = list.stream()
.map(element -> executorService.submit(() -> function.apply(element)))
.collect(toImmutableList());
try {
List<ListenableFuture<R>> futures = list.stream()
.map(element -> executorService.submit(() -> function.apply(element)))
.collect(toImmutableList());
return allAsList(futures).get().stream();
}
catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

import java.util.concurrent.ExecutorService;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.concurrent.Threads.virtualThreadsNamed;
import static io.trino.plugin.base.ClosingBinder.closingBinder;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newThreadPerTaskExecutor;

public class DeltaLakeExecutorModule
implements Module
Expand All @@ -41,14 +41,14 @@ public void configure(Binder binder)
@ForDeltaLakeMetadata
public ExecutorService createMetadataExecutor(CatalogName catalogName)
{
return newCachedThreadPool(daemonThreadsNamed("delta-metadata-" + catalogName + "-%s"));
return newThreadPerTaskExecutor(virtualThreadsNamed("delta-metadata-" + catalogName + "-%d"));
}

@Provides
@Singleton
@ForDeltaLakeSplitManager
public ExecutorService createSplitSourceExecutor(CatalogName catalogName)
{
return newCachedThreadPool(daemonThreadsNamed("delta-split-source-" + catalogName + "-%s"));
return newThreadPerTaskExecutor(virtualThreadsNamed("delta-split-source-" + catalogName + "-%d"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.metastore.Table;
Expand All @@ -41,23 +41,25 @@
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static io.airlift.concurrent.AsyncSemaphore.processAll;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.concurrent.Threads.threadsNamed;
import static io.airlift.concurrent.Threads.virtualThreadsNamed;
import static io.trino.metastore.Table.TABLE_COMMENT;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isStoreTableMetadataInMetastoreEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.NONE;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMetadata;
import static java.lang.Math.max;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.Executors.newThreadPerTaskExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.function.BinaryOperator.maxBy;

Expand All @@ -77,7 +79,7 @@ public class DeltaLakeTableMetadataScheduler
private final boolean enabled;
private final Duration scheduleInterval;

private ExecutorService executor;
private ListeningExecutorService executor;
private ScheduledExecutorService scheduler;
private final AtomicInteger failedCounts = new AtomicInteger();

Expand Down Expand Up @@ -117,7 +119,7 @@ public void putAll(Map<SchemaTableName, TableUpdateInfo> tableParameters)
public void start()
{
if (enabled) {
executor = storeTableMetadataThreads == 0 ? newDirectExecutorService() : newFixedThreadPool(storeTableMetadataThreads, threadsNamed("store-table-metadata-%s"));
executor = listeningDecorator(newThreadPerTaskExecutor(virtualThreadsNamed("store-table-metadata-%d")));
scheduler = newSingleThreadScheduledExecutor(daemonThreadsNamed("store-table-metadata"));

scheduler.scheduleWithFixedDelay(() -> {
Expand Down Expand Up @@ -161,7 +163,10 @@ public void process()
}

try {
executor.invokeAll(tasks).forEach(MoreFutures::getDone);
processAll(tasks, executor::submit, max(1, storeTableMetadataThreads), executor).get();
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import java.util.concurrent.ScheduledExecutorService;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.concurrent.Threads.virtualThreadsNamed;
import static io.trino.plugin.base.ClosingBinder.closingBinder;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.Executors.newThreadPerTaskExecutor;

public class HiveExecutorModule
implements Module
Expand All @@ -44,15 +45,15 @@ public void configure(Binder binder)
@ForHiveMetadata
public ExecutorService createMetadataExecutor(CatalogName catalogName)
{
return newCachedThreadPool(daemonThreadsNamed("hive-metadata-" + catalogName + "-%s"));
return newThreadPerTaskExecutor(virtualThreadsNamed("hive-metadata-" + catalogName + "-%d"));
}

@Provides
@Singleton
@ForHiveSplitManager
public ExecutorService createSplitSourceExecutor(CatalogName catalogName)
{
return newCachedThreadPool(daemonThreadsNamed("hive-split-source-" + catalogName + "-%s"));
return newThreadPerTaskExecutor(virtualThreadsNamed("hive-split-source-" + catalogName + "-%d"));
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
import java.util.function.Supplier;

import static com.google.common.cache.CacheLoader.asyncReloading;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.concurrent.Threads.virtualThreadsNamed;
import static io.trino.cache.CacheUtils.invalidateAllIf;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newThreadPerTaskExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

class InMemoryGlueCache
Expand Down Expand Up @@ -99,7 +99,7 @@ public InMemoryGlueCache(
int maxMetastoreRefreshThreads,
long maximumSize)
{
this.refreshExecutor = newCachedThreadPool(daemonThreadsNamed("hive-metastore-" + catalogName + "-%s"));
this.refreshExecutor = newThreadPerTaskExecutor(virtualThreadsNamed("hive-metastore-" + catalogName + "-%d"));
Executor boundedRefreshExecutor = new ReentrantBoundedExecutor(refreshExecutor, maxMetastoreRefreshThreads);

OptionalLong refreshMillis = refreshInterval.stream().mapToLong(Duration::toMillis).findAny();
Expand Down
Loading