diff --git a/proxy/src/main/java/com/velocitypowered/proxy/config/VelocityConfiguration.java b/proxy/src/main/java/com/velocitypowered/proxy/config/VelocityConfiguration.java index 9c2e1e8280..bbd49f5540 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/config/VelocityConfiguration.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/config/VelocityConfiguration.java @@ -407,6 +407,10 @@ public boolean isForceKeyAuthentication() { return forceKeyAuthentication; } + public boolean isEnableReusePort() { + return advanced.isEnableReusePort(); + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -716,6 +720,8 @@ private static class Advanced { private boolean logPlayerConnections = true; @Expose private boolean acceptTransfers = false; + @Expose + private boolean enableReusePort = false; private Advanced() { } @@ -741,6 +747,7 @@ private Advanced(CommentedConfig config) { this.logCommandExecutions = config.getOrElse("log-command-executions", false); this.logPlayerConnections = config.getOrElse("log-player-connections", true); this.acceptTransfers = config.getOrElse("accepts-transfers", false); + this.enableReusePort = config.getOrElse("enable-reuse-port", false); } } @@ -804,6 +811,10 @@ public boolean isAcceptTransfers() { return this.acceptTransfers; } + public boolean isEnableReusePort() { + return enableReusePort; + } + @Override public String toString() { return "Advanced{" @@ -821,6 +832,7 @@ public String toString() { + ", logCommandExecutions=" + logCommandExecutions + ", logPlayerConnections=" + logPlayerConnections + ", acceptTransfers=" + acceptTransfers + + ", enableReusePort=" + enableReusePort + '}'; } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java b/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java index 7af894f466..7b724f6130 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/network/ConnectionManager.java @@ -18,6 +18,8 @@ package com.velocitypowered.proxy.network; import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import com.velocitypowered.api.event.proxy.ListenerBoundEvent; import com.velocitypowered.api.event.proxy.ListenerCloseEvent; import com.velocitypowered.api.network.ListenerType; @@ -28,14 +30,17 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.unix.UnixChannelOption; import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.MultithreadEventExecutorGroup; import java.net.InetSocketAddress; import java.net.http.HttpClient; -import java.util.HashMap; +import java.util.Collection; import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -49,7 +54,7 @@ public final class ConnectionManager { private static final WriteBufferWaterMark SERVER_WRITE_MARK = new WriteBufferWaterMark(1 << 20, 1 << 21); private static final Logger LOGGER = LogManager.getLogger(ConnectionManager.class); - private final Map endpoints = new HashMap<>(); + private final Multimap endpoints = HashMultimap.create(); private final TransportType transportType; private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; @@ -93,7 +98,6 @@ public void logChannelInformation() { public void bind(final InetSocketAddress address) { final ServerBootstrap bootstrap = new ServerBootstrap() .channelFactory(this.transportType.serverSocketChannelFactory) - .group(this.bossGroup, this.workerGroup) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, SERVER_WRITE_MARK) .childHandler(this.serverChannelInitializer.get()) .childOption(ChannelOption.TCP_NODELAY, true) @@ -104,26 +108,50 @@ public void bind(final InetSocketAddress address) { bootstrap.option(ChannelOption.TCP_FASTOPEN, 3); } - bootstrap.bind() - .addListener((ChannelFutureListener) future -> { - final Channel channel = future.channel(); - if (future.isSuccess()) { - this.endpoints.put(address, new Endpoint(channel, ListenerType.MINECRAFT)); - - // Warn people with console access that HAProxy is in use, see PR: #1436 - if (this.server.getConfiguration().isProxyProtocol()) { - LOGGER.warn("Using HAProxy and listening on {}, please ensure this listener is adequately firewalled.", channel.localAddress()); - } + if (server.getConfiguration().isEnableReusePort()) { + // We don't need a boss group, since each worker will bind to the socket + bootstrap.option(UnixChannelOption.SO_REUSEPORT, true) + .group(this.workerGroup); + } else { + bootstrap.group(this.bossGroup, this.workerGroup); + } - LOGGER.info("Listening on {}", channel.localAddress()); + final int binds = server.getConfiguration().isEnableReusePort() + ? ((MultithreadEventExecutorGroup) this.workerGroup).executorCount() : 1; - // Fire the proxy bound event after the socket is bound - server.getEventManager().fireAndForget( - new ListenerBoundEvent(address, ListenerType.MINECRAFT)); - } else { - LOGGER.error("Can't bind to {}", address, future.cause()); - } - }); + for (int bind = 0; bind < binds; bind++) { + // Wait for each bind to open. If we encounter any errors, don't try to bind again. + int finalBind = bind; + ChannelFuture f = bootstrap.bind() + .addListener((ChannelFutureListener) future -> { + final Channel channel = future.channel(); + if (future.isSuccess()) { + this.endpoints.put(address, new Endpoint(channel, ListenerType.MINECRAFT)); + + LOGGER.info("Listening on {}", channel.localAddress()); + + if (finalBind == 0) { + // Warn people with console access that HAProxy is in use, see PR: #1436 + if (this.server.getConfiguration().isProxyProtocol()) { + LOGGER.warn( + "Using HAProxy and listening on {}, please ensure this listener is adequately firewalled.", + channel.localAddress()); + } + + // Fire the proxy bound event after the socket is bound + server.getEventManager().fireAndForget( + new ListenerBoundEvent(address, ListenerType.MINECRAFT)); + } + } else { + LOGGER.error("Can't bind to {}", address, future.cause()); + } + }); + f.syncUninterruptibly(); + + if (!f.isSuccess()) { + break; + } + } } /** @@ -181,17 +209,20 @@ public Bootstrap createWorker(@Nullable EventLoopGroup group) { * @param oldBind the endpoint to close */ public void close(InetSocketAddress oldBind) { - Endpoint endpoint = endpoints.remove(oldBind); + Collection endpoints = this.endpoints.removeAll(oldBind); + Preconditions.checkState(!endpoints.isEmpty(), "Endpoint was not registered"); + + ListenerType type = endpoints.iterator().next().getType(); // Fire proxy close event to notify plugins of socket close. We block since plugins // should have a chance to be notified before the server stops accepting connections. - server.getEventManager().fire(new ListenerCloseEvent(oldBind, endpoint.getType())).join(); - - Channel serverChannel = endpoint.getChannel(); + server.getEventManager().fire(new ListenerCloseEvent(oldBind, type)).join(); - Preconditions.checkState(serverChannel != null, "Endpoint %s not registered", oldBind); - LOGGER.info("Closing endpoint {}", serverChannel.localAddress()); - serverChannel.close().syncUninterruptibly(); + for (Endpoint endpoint : endpoints) { + Channel serverChannel = endpoint.getChannel(); + LOGGER.info("Closing endpoint {}", serverChannel.localAddress()); + serverChannel.close().syncUninterruptibly(); + } } /** @@ -200,24 +231,28 @@ public void close(InetSocketAddress oldBind) { * @param interrupt should closing forward interruptions */ public void closeEndpoints(boolean interrupt) { - for (final Map.Entry entry : this.endpoints.entrySet()) { + for (final Map.Entry> entry : this.endpoints.asMap() + .entrySet()) { final InetSocketAddress address = entry.getKey(); - final Endpoint endpoint = entry.getValue(); + final Collection endpoints = entry.getValue(); + ListenerType type = endpoints.iterator().next().getType(); // Fire proxy close event to notify plugins of socket close. We block since plugins // should have a chance to be notified before the server stops accepting connections. - server.getEventManager().fire(new ListenerCloseEvent(address, endpoint.getType())).join(); - - LOGGER.info("Closing endpoint {}", address); - if (interrupt) { - try { - endpoint.getChannel().close().sync(); - } catch (final InterruptedException e) { - LOGGER.info("Interrupted whilst closing endpoint", e); - Thread.currentThread().interrupt(); + server.getEventManager().fire(new ListenerCloseEvent(address, type)).join(); + + for (Endpoint endpoint : endpoints) { + LOGGER.info("Closing endpoint {}", address); + if (interrupt) { + try { + endpoint.getChannel().close().sync(); + } catch (final InterruptedException e) { + LOGGER.info("Interrupted whilst closing endpoint", e); + Thread.currentThread().interrupt(); + } + } else { + endpoint.getChannel().close().syncUninterruptibly(); } - } else { - endpoint.getChannel().close().syncUninterruptibly(); } } this.endpoints.clear(); diff --git a/proxy/src/main/resources/default-velocity.toml b/proxy/src/main/resources/default-velocity.toml index e402305cc1..5b0a1e276b 100644 --- a/proxy/src/main/resources/default-velocity.toml +++ b/proxy/src/main/resources/default-velocity.toml @@ -145,6 +145,12 @@ log-player-connections = true # Transfer packet (Minecraft 1.20.5) to be received. accepts-transfers = false +# Enables support for SO_REUSEPORT. This may help the proxy scale better on multicore systems +# with a lot of incoming connections, and provide better CPU utilization than the existing +# strategy of having a single thread accepting connections and distributing them to worker +# threads. Disabled by default. Requires Linux or macOS. +enable-reuse-port = false + [query] # Whether to enable responding to GameSpy 4 query responses or not. enabled = false