Skip to content

Commit 7351471

Browse files
committed
Have TcpServerImpl implement TcpServer.
1 parent 04d25b2 commit 7351471

File tree

9 files changed

+157
-36
lines changed

9 files changed

+157
-36
lines changed

vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import io.vertx.core.internal.*;
4646
import io.vertx.core.internal.net.NetClientInternal;
4747
import io.vertx.core.internal.net.NetServerInternal;
48+
import io.vertx.core.internal.net.TcpServerInternal;
4849
import io.vertx.core.internal.resolver.NameResolver;
4950
import io.vertx.core.internal.threadchecker.BlockedThreadChecker;
5051
import io.vertx.core.net.*;
@@ -149,7 +150,7 @@ private static ThreadFactory virtualThreadFactory() {
149150
private final VerticleManager verticleManager;
150151
private final FileResolver fileResolver;
151152
private final EventExecutorProvider eventExecutorProvider;
152-
private final Map<ServerID, NetServerInternal> sharedNetServers = new HashMap<>();
153+
private final Map<ServerID, TcpServerInternal> sharedNetServers = new HashMap<>();
153154
private final ContextLocal<?>[] contextLocals;
154155
private final List<ContextLocal<?>> contextLocalsList;
155156
final WorkerPool workerPool;
@@ -595,7 +596,7 @@ private EventLoop stickyEventLoop() {
595596
return eventLoop;
596597
}
597598

598-
public Map<ServerID, NetServerInternal> sharedTcpServers() {
599+
public Map<ServerID, TcpServerInternal> sharedTcpServers() {
599600
return sharedNetServers;
600601
}
601602

vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.vertx.core.http.impl.HttpClientBuilderInternal;
1818
import io.vertx.core.impl.*;
1919
import io.vertx.core.internal.deployment.DeploymentManager;
20+
import io.vertx.core.internal.net.TcpServerInternal;
2021
import io.vertx.core.internal.resolver.NameResolver;
2122
import io.vertx.core.internal.threadchecker.BlockedThreadChecker;
2223
import io.vertx.core.net.NetServerOptions;
@@ -130,7 +131,7 @@ default NetServerInternal createNetServer() {
130131

131132
WorkerPool internalWorkerPool();
132133

133-
Map<ServerID, NetServerInternal> sharedTcpServers();
134+
Map<ServerID, TcpServerInternal> sharedTcpServers();
134135

135136
VertxMetrics metrics();
136137

vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.vertx.core.http.*;
2222
import io.vertx.core.http.impl.HttpClientBuilderInternal;
2323
import io.vertx.core.internal.deployment.DeploymentManager;
24+
import io.vertx.core.internal.net.TcpServerInternal;
2425
import io.vertx.core.internal.resolver.NameResolver;
2526
import io.vertx.core.internal.threadchecker.BlockedThreadChecker;
2627
import io.vertx.core.net.NetClient;
@@ -257,7 +258,7 @@ public WorkerPool internalWorkerPool() {
257258
}
258259

259260
@Override
260-
public Map<ServerID, NetServerInternal> sharedTcpServers() {
261+
public Map<ServerID, TcpServerInternal> sharedTcpServers() {
261262
return delegate.sharedTcpServers();
262263
}
263264

vertx-core/src/main/java/io/vertx/core/net/TcpServer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,14 @@ default Future<Boolean> updateSSLOptions(ServerSSLOptions options) {
146146
return updateSSLOptions(options, false);
147147
}
148148

149+
/**
150+
* The socket address the server is listening on. This is useful if you bound the server specifying 0 as port number
151+
* signifying an ephemeral port
152+
*
153+
* @return the server bind address the server is listening on or {@code null}
154+
*/
155+
SocketAddress bindAddress();
156+
149157
/**
150158
* <p>Update the server with new SSL {@code options}, the update happens if the options object is valid and different
151159
* from the existing options object.

vertx-core/src/main/java/io/vertx/core/net/impl/tcp/CleanableNetServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void close(Completable<Void> completion) {
4343
}
4444

4545
@Override
46-
public Future<NetServer> listen(ContextInternal context, SocketAddress localAddress) {
46+
public Future<SocketAddress> listen(ContextInternal context, SocketAddress localAddress) {
4747
synchronized (this) {
4848
if (listenContext != null) {
4949
return context.failedFuture(new IllegalStateException());

vertx-core/src/main/java/io/vertx/core/net/impl/tcp/NetServerBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public NetServerBuilder protocol(String protocol) {
6565
}
6666

6767
public NetServerInternal build() {
68-
NetServerInternal server;
68+
TcpServerImpl server;
6969
if (cleanable) {
7070
server = new CleanableNetServer(vertx,
7171
config,
@@ -83,6 +83,6 @@ public NetServerInternal build() {
8383
registerWriteHandler
8484
);
8585
}
86-
return server;
86+
return new NetServerImpl(server);
8787
}
8888
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright (c) 2011-2026 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
package io.vertx.core.net.impl.tcp;
12+
13+
import io.vertx.codegen.annotations.Nullable;
14+
import io.vertx.core.Future;
15+
import io.vertx.core.Handler;
16+
import io.vertx.core.internal.ContextInternal;
17+
import io.vertx.core.internal.net.NetServerInternal;
18+
import io.vertx.core.internal.tls.SslContextProvider;
19+
import io.vertx.core.net.*;
20+
import io.vertx.core.spi.metrics.Metrics;
21+
22+
import java.time.Duration;
23+
24+
public class NetServerImpl implements NetServerInternal {
25+
26+
private Handler<NetSocket> connectHandler;
27+
private TcpServerImpl delegate;
28+
29+
public NetServerImpl(TcpServerImpl delegate) {
30+
this.delegate = delegate;
31+
}
32+
33+
@Override
34+
public NetServerInternal connectHandler(@Nullable Handler<NetSocket> handler) {
35+
connectHandler = handler;
36+
delegate.connectHandler((Handler)handler);
37+
return this;
38+
}
39+
40+
@Override
41+
public NetServerInternal exceptionHandler(Handler<Throwable> handler) {
42+
delegate.exceptionHandler(handler);
43+
return this;
44+
}
45+
46+
@Override
47+
public SslContextProvider sslContextProvider() {
48+
return delegate.sslContextProvider();
49+
}
50+
51+
@Override
52+
public Future<NetServer> listen(ContextInternal context, SocketAddress localAddress) {
53+
return delegate.listen(context, localAddress).map(this);
54+
}
55+
56+
@Override
57+
public int sniEntrySize() {
58+
return delegate.sniEntrySize();
59+
}
60+
61+
@Override
62+
public boolean isClosed() {
63+
return delegate.isClosed();
64+
}
65+
66+
@Override
67+
public Handler<NetSocket> connectHandler() {
68+
return connectHandler;
69+
}
70+
71+
@Override
72+
public Future<NetServer> listen() {
73+
return delegate.listen().map(this);
74+
}
75+
76+
@Override
77+
public Future<NetServer> listen(SocketAddress localAddress) {
78+
return delegate.listen(localAddress).map(this);
79+
}
80+
81+
@Override
82+
public Future<Void> shutdown(Duration timeout) {
83+
return delegate.shutdown(timeout);
84+
}
85+
86+
@Override
87+
public int actualPort() {
88+
SocketAddress addr = delegate.bindAddress();
89+
return addr != null ? addr.port() : -1;
90+
}
91+
92+
@Override
93+
public Future<Boolean> updateSSLOptions(ServerSSLOptions options, boolean force) {
94+
return delegate.updateSSLOptions(options, force);
95+
}
96+
97+
@Override
98+
public Future<Boolean> updateTrafficShapingOptions(TrafficShapingOptions options) {
99+
return delegate.updateTrafficShapingOptions(options);
100+
}
101+
102+
@Override
103+
public Metrics getMetrics() {
104+
return delegate.getMetrics();
105+
}
106+
}

vertx-core/src/main/java/io/vertx/core/net/impl/tcp/TcpServerImpl.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.vertx.core.internal.net.NetServerInternal;
3939
import io.vertx.core.internal.net.SslChannelProvider;
4040
import io.vertx.core.internal.net.SslHandshakeCompletionHandler;
41+
import io.vertx.core.internal.net.TcpServerInternal;
4142
import io.vertx.core.internal.resolver.NameResolver;
4243
import io.vertx.core.internal.tls.SslContextManager;
4344
import io.vertx.core.internal.tls.SslContextProvider;
@@ -59,7 +60,7 @@
5960
* @author <a href="http://tfox.org">Tim Fox</a>
6061
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
6162
*/
62-
public class TcpServerImpl implements NetServerInternal {
63+
public class TcpServerImpl implements TcpServerInternal {
6364

6465
private static final Logger log = LoggerFactory.getLogger(TcpServerImpl.class);
6566

@@ -69,7 +70,7 @@ public class TcpServerImpl implements NetServerInternal {
6970
private final boolean fileRegionEnabled;
7071
private final boolean registerWriteHandler;
7172
private final String protocol;
72-
private Handler<NetSocket> handler;
73+
private Handler<TcpSocket> handler;
7374
private Handler<Throwable> exceptionHandler;
7475

7576
// Per server
@@ -114,12 +115,7 @@ public SslContextProvider sslContextProvider() {
114115
}
115116

116117
@Override
117-
public synchronized Handler<NetSocket> connectHandler() {
118-
return handler;
119-
}
120-
121-
@Override
122-
public synchronized TcpServerImpl connectHandler(Handler<NetSocket> handler) {
118+
public synchronized TcpServerImpl connectHandler(Handler<TcpSocket> handler) {
123119
if (isListening()) {
124120
throw new IllegalStateException("Cannot set connectHandler when server is listening");
125121
}
@@ -136,11 +132,6 @@ public synchronized TcpServerImpl exceptionHandler(Handler<Throwable> handler) {
136132
return this;
137133
}
138134

139-
public int actualPort() {
140-
TcpServerImpl server = actualServer;
141-
return server != null ? server.actualPort : actualPort;
142-
}
143-
144135
@Override
145136
public Future<Void> shutdown(Duration timeout) {
146137
ConnectionGroup group = channelGroup;
@@ -151,22 +142,22 @@ public Future<Void> shutdown(Duration timeout) {
151142
}
152143

153144
@Override
154-
public Future<NetServer> listen(SocketAddress localAddress) {
145+
public Future<SocketAddress> listen(SocketAddress localAddress) {
155146
return listen(vertx.getOrCreateContext(), localAddress);
156147
}
157148

158-
public Future<NetServer> listen(ContextInternal context, SocketAddress localAddress) {
149+
public Future<SocketAddress> listen(ContextInternal context, SocketAddress localAddress) {
159150
if (localAddress == null) {
160151
throw new NullPointerException("No null bind local address");
161152
}
162153
if (handler == null) {
163154
throw new IllegalStateException("Set connect handler first");
164155
}
165-
return bind(context, localAddress).map(this);
156+
return bind(context, localAddress);
166157
}
167158

168159
@Override
169-
public Future<NetServer> listen() {
160+
public Future<SocketAddress> listen() {
170161
return listen(config.getPort(), config.getHost());
171162
}
172163

@@ -177,11 +168,11 @@ public boolean isClosed() {
177168
private class NetSocketInitializer {
178169

179170
private final ContextInternal context;
180-
private final Handler<NetSocket> connectionHandler;
171+
private final Handler<TcpSocket> connectionHandler;
181172
private final Handler<Throwable> exceptionHandler;
182173
private final GlobalTrafficShapingHandler trafficShapingHandler;
183174

184-
NetSocketInitializer(ContextInternal context, Handler<NetSocket> connectionHandler, Handler<Throwable> exceptionHandler, GlobalTrafficShapingHandler trafficShapingHandler) {
175+
NetSocketInitializer(ContextInternal context, Handler<TcpSocket> connectionHandler, Handler<Throwable> exceptionHandler, GlobalTrafficShapingHandler trafficShapingHandler) {
185176
this.context = context;
186177
this.connectionHandler = connectionHandler;
187178
this.exceptionHandler = exceptionHandler;
@@ -320,6 +311,19 @@ public int sniEntrySize() {
320311
return sslContextManager.sniEntrySize();
321312
}
322313

314+
@Override
315+
public SocketAddress bindAddress() {
316+
Future<Channel> f = bindFuture;
317+
if (f == null) {
318+
return null;
319+
}
320+
Channel ch = f.result();
321+
if (ch == null) {
322+
return null;
323+
}
324+
return vertx.transport().convert(ch.localAddress());
325+
}
326+
323327
public Future<Boolean> updateSSLOptions(ServerSSLOptions options, boolean force) {
324328
TcpServerImpl server = actualServer;
325329
if (server != null && server != this) {
@@ -406,16 +410,17 @@ public void updateTrafficShapingOptions(TrafficShapingOptions options, Promise<B
406410
}
407411
}
408412

409-
private synchronized Future<Channel> bind(ContextInternal context, SocketAddress localAddress) {
413+
private synchronized Future<SocketAddress> bind(ContextInternal context, SocketAddress localAddress) {
410414
if (listening) {
411415
throw new IllegalStateException("Listen already called");
412416
}
413417

414418
this.listening = true;
415419
this.eventLoop = context.nettyEventLoop();
416420

421+
PromiseInternal<Channel> promise = context.promise();
417422
SocketAddress bindAddress;
418-
Map<ServerID, NetServerInternal> sharedNetServers = vertx.sharedTcpServers();
423+
Map<ServerID, TcpServerInternal> sharedNetServers = (Map)vertx.sharedTcpServers();
419424
synchronized (sharedNetServers) {
420425
actualPort = localAddress.port();
421426
String hostOrPath = localAddress.isInetSocket() ? localAddress.host() : localAddress.path();
@@ -451,7 +456,6 @@ protected void handleShutdown(Duration timeout, Completable<Void> completion) {
451456
}
452457
};
453458
channelGroup = group;
454-
PromiseInternal<Channel> promise = context.promise();
455459
if (main == null) {
456460

457461
SslContextManager helper;
@@ -508,8 +512,6 @@ protected void handleShutdown(Duration timeout, Completable<Void> completion) {
508512
}
509513
listening = false;
510514
});
511-
512-
return bindFuture;
513515
} else {
514516
// Server already exists with that host/port - we will use that
515517
actualServer = main;
@@ -523,9 +525,10 @@ protected void handleShutdown(Duration timeout, Completable<Void> completion) {
523525
};
524526
actualServer.channelBalancer.addWorker(eventLoop, worker);
525527
main.bindFuture.onComplete(promise);
526-
return promise.future();
527528
}
528529
}
530+
return promise.future()
531+
.map(ch -> vertx.transport().convert(ch.localAddress()));
529532
}
530533

531534
private void bind(
@@ -535,7 +538,7 @@ private void bind(
535538
SocketAddress localAddress,
536539
boolean shared,
537540
Promise<Channel> promise,
538-
Map<ServerID, NetServerInternal> sharedNetServers,
541+
Map<ServerID, TcpServerInternal> sharedNetServers,
539542
ServerID id) {
540543
// Socket bind
541544
channelBalancer.addWorker(eventLoop, worker);
@@ -629,7 +632,7 @@ private void handleShutdown(Completable<Void> completion) {
629632
completion.succeed();
630633
return;
631634
}
632-
Map<ServerID, NetServerInternal> servers = vertx.sharedTcpServers();
635+
Map<ServerID, TcpServerInternal> servers = vertx.sharedTcpServers();
633636
boolean hasHandlers;
634637
synchronized (servers) {
635638
ServerChannelLoadBalancer balancer = actualServer.channelBalancer;

0 commit comments

Comments
 (0)