Skip to content
61 changes: 57 additions & 4 deletions vertx-core/src/main/java/io/vertx/core/Vertx.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import io.vertx.core.dns.impl.DnsAddressResolverProvider;
import io.vertx.core.internal.VertxBootstrap;
import io.vertx.core.metrics.Measured;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.*;
import io.vertx.core.shareddata.SharedData;
import io.vertx.core.spi.VerticleFactory;
import io.vertx.core.spi.VertxMetricsFactory;
Expand Down Expand Up @@ -220,6 +217,62 @@ default NetClient createNetClient() {
return createNetClient(new NetClientOptions());
}

/**
* Create a TCP/SSL server using the specified TCP config and the specified default SSL options
*
* @param tcpConfig the tcp config to use
* @param sslOptions the default ssl options to use
* @return the server
*/
TcpServer createTcpServer(TcpServerConfig tcpConfig, ServerSSLOptions sslOptions);

/**
* Create a TCP server using the specified TCP config
*
* @param tcpConfig the tcp config to use
* @return the server
*/
default TcpServer createTcpServer(TcpServerConfig tcpConfig) {
return createTcpServer(tcpConfig, null);
}

/**
* Create a TCP server using the default TCP config
*
* @return the server
*/
default TcpServer createTcpServer() {
return createTcpServer(new TcpServerConfig());
}

/**
* Create a TCP/SSL client using the specified TCP config and the specified default SSL options
*
* @param tcpConfig the tcp config to use
* @param sslOptions the default ssl options to use
* @return the client
*/
TcpClient createTcpClient(TcpClientConfig tcpConfig, ClientSSLOptions sslOptions);

/**
* Create a TCP/SSL client using the specified TCP config
*
* @param tcpConfig the tcp config to use
* @return the client
*/
default TcpClient createTcpClient(TcpClientConfig tcpConfig) {
return createTcpClient(tcpConfig, null);
}

/**
* Create a TCP/SSL client using default options
*
* @return the client
*/
default TcpClient createTcpClient() {
return createTcpClient(new TcpClientConfig());
}

/**
* Create an HTTP/HTTPS server using the specified options
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.net.*;
import io.vertx.core.net.impl.tcp.NetClientBuilder;
import io.vertx.core.net.impl.tcp.NetClientImpl;
import io.vertx.core.net.impl.tcp.TcpClientBuilder;
import io.vertx.core.internal.net.NetServerInternal;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeInfo;
import io.vertx.core.spi.cluster.RegistrationInfo;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;

import java.net.Inet6Address;
import java.net.InetAddress;
Expand Down Expand Up @@ -130,10 +130,10 @@ public static String defaultAddress() {

private NetClient createNetClient(VertxInternal vertx, NetClientOptions clientOptions) {
TcpClientConfig config = new TcpClientConfig(clientOptions);
NetClientBuilder builder = new NetClientBuilder(vertx, config)
TcpClientBuilder builder = new TcpClientBuilder(vertx, config)
.sslOptions(clientOptions.getSslOptions())
.registerWriteHandler(clientOptions.isRegisterWriteHandler());
return builder.build();
return new NetClientImpl(builder.build());
}

private NetServerOptions getServerOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
import io.vertx.core.internal.http.HttpClientTransport;
import io.vertx.core.internal.http.HttpClientInternal;
import io.vertx.core.internal.net.NetClientInternal;
import io.vertx.core.internal.net.TcpClientInternal;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.endpoint.LoadBalancer;
import io.vertx.core.net.AddressResolver;
import io.vertx.core.net.endpoint.impl.EndpointResolverImpl;
import io.vertx.core.net.endpoint.EndpointResolver;
import io.vertx.core.net.TcpClientConfig;
import io.vertx.core.net.impl.tcp.NetClientBuilder;
import io.vertx.core.net.impl.tcp.TcpClientBuilder;
import io.vertx.core.spi.metrics.HttpClientMetrics;

import java.time.Duration;
Expand Down Expand Up @@ -234,7 +235,7 @@ public HttpClientAgent build() {
shared = co.isShared() ? co.getName() : null;
TcpClientConfig clientConfig = netClientConfig(co)
.setProxyOptions(null);
NetClientInternal tcpClient = new NetClientBuilder(vertx, clientConfig)
TcpClientInternal tcpClient = new TcpClientBuilder(vertx, clientConfig)
.protocol("http")
.sslOptions(co.getSslOptions())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import io.vertx.core.internal.net.SslChannelProvider;
import io.vertx.core.net.ServerSSLOptions;
import io.vertx.core.net.impl.*;
import io.vertx.core.net.impl.tcp.NetServerImpl;
import io.vertx.core.net.impl.tcp.TcpServerImpl;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import io.vertx.core.spi.metrics.TransportMetrics;
import io.vertx.core.tracing.TracingPolicy;
Expand Down Expand Up @@ -256,7 +256,7 @@ public void checkAccept(ChannelPipeline pipeline) {

/**
* Compute the name of the logical end of pipeline when adding handlers to a preconfigured NetSocket pipeline.
* See {@link NetServerImpl}
* See {@link TcpServerImpl}
* @param pipeline the channel pipeline
* @return the name of the handler to use
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.http.HttpClientTransport;
import io.vertx.core.internal.http.HttpHeadersInternal;
import io.vertx.core.internal.net.NetClientInternal;
import io.vertx.core.internal.net.TcpClientInternal;
import io.vertx.core.net.*;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.net.impl.tcp.NetSocketImpl;
Expand All @@ -61,7 +61,7 @@
*/
public class TcpHttpClientTransport implements HttpClientTransport {

public static TcpHttpClientTransport create(NetClientInternal netClient,
public static TcpHttpClientTransport create(TcpClientInternal netClient,
HttpClientConfig config,
HttpClientMetrics httpMetrics) {
return new TcpHttpClientTransport(netClient,
Expand Down Expand Up @@ -89,9 +89,9 @@ public static TcpHttpClientTransport create(NetClientInternal netClient,
private final Duration readIdleTimeout;
private final Duration writeIdleTimeout;
private final WebSocketMetrics<?> webSocketMetrics;
private final NetClientInternal client;
private final TcpClientInternal client;

public TcpHttpClientTransport(NetClientInternal netClient,
public TcpHttpClientTransport(TcpClientInternal netClient,
TracingPolicy tracingPolicy,
boolean useDecompression,
boolean logActivity,
Expand Down Expand Up @@ -121,7 +121,7 @@ public TcpHttpClientTransport(NetClientInternal netClient,
this.client = netClient;
}

public NetClientInternal client() {
public TcpClientInternal client() {
return client;
}

Expand All @@ -138,7 +138,7 @@ private Http2ClientChannelInitializer http2Initializer() {
}
}

private void connect(ContextInternal context, HttpConnectParams params, HostAndPort authority, SocketAddress server, Promise<NetSocket> promise) {
private void connect(ContextInternal context, HttpConnectParams params, HostAndPort authority, SocketAddress server, Promise<TcpSocket> promise) {
ConnectOptions connectOptions = new ConnectOptions();
connectOptions.setRemoteAddress(server);
if (authority != null) {
Expand Down Expand Up @@ -169,7 +169,7 @@ private void connect(ContextInternal context, HttpConnectParams params, HostAndP
client.connectInternal(connectOptions, promise, context);
}

public Future<HttpClientConnection> wrap(ContextInternal context, HttpConnectParams params, HostAndPort authority, ClientMetrics<?, ?, ?> clientMetrics, SocketAddress server, NetSocket so_) {
public Future<HttpClientConnection> wrap(ContextInternal context, HttpConnectParams params, HostAndPort authority, ClientMetrics<?, ?, ?> clientMetrics, SocketAddress server, TcpSocket so_) {
NetSocketImpl so = (NetSocketImpl) so_;
Object metric = so.metric();
TransportMetrics<?> transportMetrics = so.metrics();
Expand Down Expand Up @@ -240,8 +240,8 @@ public Future<HttpClientConnection> connect(ContextInternal context, SocketAddre
}
}

Promise<NetSocket> promise = context.promise();
Future<NetSocket> future = promise.future();
Promise<TcpSocket> promise = context.promise();
Future<TcpSocket> future = promise.future();
// We perform the compose operation before calling connect to be sure that the composition happens
// before the promise is completed by the connect operation
Future<HttpClientConnection> ret = future.compose(so -> wrap(context, params, authority, clientMetrics, server, so));
Expand Down Expand Up @@ -338,7 +338,7 @@ private void http1xConnected(HttpVersion version,

@Override
public Future<Void> shutdown(Duration timeout) {
return client.shutdown(timeout.toMillis(), TimeUnit.MILLISECONDS);
return client.shutdown(timeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.net.NetServerInternal;
import io.vertx.core.internal.net.TcpServerInternal;
import io.vertx.core.net.*;
import io.vertx.core.net.impl.tcp.*;
import io.vertx.core.spi.metrics.HttpServerMetrics;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class TcpHttpServer implements HttpServerInternal {
private Handler<HttpServerRequest> invalidRequestHandler;
private Handler<HttpConnection> connectionHandler;
private Handler<Throwable> exceptionHandler;
private NetServerInternal tcpServer;
private TcpServerInternal tcpServer;
private Duration closeTimeout = Duration.ZERO;
private CloseSequence closeSequence;
private HttpServerMetrics<?, ?> httpMetrics;
Expand All @@ -68,7 +69,7 @@ public TcpHttpServer(VertxInternal vertx, HttpServerConfig config, HttpServerMet

@Override
public Future<Boolean> updateSSLOptions(ServerSSLOptions options, boolean force) {
NetServer s;
TcpServer s;
synchronized (this) {
s = tcpServer;
}
Expand All @@ -81,7 +82,7 @@ public Future<Boolean> updateSSLOptions(ServerSSLOptions options, boolean force)

@Override
public Future<Boolean> updateTrafficShapingOptions(TrafficShapingOptions options) {
NetServer s;
TcpServer s;
synchronized (this) {
s = tcpServer;
}
Expand All @@ -93,8 +94,13 @@ public Future<Boolean> updateTrafficShapingOptions(TrafficShapingOptions options

@Override
public synchronized int actualPort() {
NetServer s = tcpServer;
return s != null ? s.actualPort() : 0;
TcpServer s;
SocketAddress address;
if ((s = tcpServer) != null && (address = s.bindAddress()) != null) {
return address.port();
} else {
return 0;
}
}

@Override
Expand Down Expand Up @@ -199,7 +205,8 @@ public synchronized Future<HttpServer> listen(ContextInternal context, SocketAdd
.build();
}
HttpCompressionConfig compression = config.getCompression();
NetServerInternal server = new NetServerBuilder(vertx, config.getTcpConfig(), config.getSslOptions())
TcpServerInternal server = new TcpServerBuilder(vertx, config.getTcpConfig())
.sslOptions(config.getSslOptions())
.fileRegionEnabled(!compression.isCompressionEnabled())
.cleanable(false)
.protocol("http")
Expand Down Expand Up @@ -270,18 +277,18 @@ public synchronized Future<HttpServer> listen(ContextInternal context, SocketAdd
return result.future();
}

private void doShutdown(NetServer netServer, Completable<Void> p) {
netServer.shutdown(closeTimeout).onComplete(p);
private void doShutdown(TcpServer tcpServer, Completable<Void> p) {
tcpServer.shutdown(closeTimeout).onComplete(p);
}

private void doClose(NetServer netServer, Completable<Void> p) {
private void doClose(TcpServer tcpServer, Completable<Void> p) {
if (requestHandler instanceof Closeable) {
Closeable closeable = (Closeable) requestHandler;
closeable.close((res, err) -> {
netServer.close().onComplete(foo(p));
tcpServer.close().onComplete(foo(p));
});
} else {
netServer.close().onComplete(foo(p));
tcpServer.close().onComplete(foo(p));
}
}

Expand Down Expand Up @@ -319,7 +326,7 @@ private boolean isListening() {
}

public synchronized boolean isClosed() {
NetServerInternal s = tcpServer;
TcpServerInternal s = tcpServer;
return s == null || s.isClosed();
}

Expand All @@ -328,7 +335,7 @@ public boolean requestAccept() {
return true;
}

public NetServerInternal tcpServer() {
public TcpServerInternal tcpServer() {
return tcpServer;
}
}
Loading