Skip to content

Commit a5b3d5d

Browse files
committed
Have TcpServerImpl implement TcpServer.
1 parent 04d25b2 commit a5b3d5d

File tree

4 files changed

+139
-29
lines changed

4 files changed

+139
-29
lines changed

vertx-core/src/main/java/io/vertx/core/net/impl/tcp/CleanableNetServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void close(Completable<Void> completion) {
4343
}
4444

4545
@Override
46-
public Future<NetServer> listen(ContextInternal context, SocketAddress localAddress) {
46+
public Future<SocketAddress> listen(ContextInternal context, SocketAddress localAddress) {
4747
synchronized (this) {
4848
if (listenContext != null) {
4949
return context.failedFuture(new IllegalStateException());

vertx-core/src/main/java/io/vertx/core/net/impl/tcp/NetServerBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public NetServerBuilder protocol(String protocol) {
6565
}
6666

6767
public NetServerInternal build() {
68-
NetServerInternal server;
68+
TcpServerImpl server;
6969
if (cleanable) {
7070
server = new CleanableNetServer(vertx,
7171
config,
@@ -83,6 +83,6 @@ public NetServerInternal build() {
8383
registerWriteHandler
8484
);
8585
}
86-
return server;
86+
return new NetServerImpl(server);
8787
}
8888
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright (c) 2011-2026 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
package io.vertx.core.net.impl.tcp;
12+
13+
import io.vertx.codegen.annotations.Nullable;
14+
import io.vertx.core.Future;
15+
import io.vertx.core.Handler;
16+
import io.vertx.core.internal.ContextInternal;
17+
import io.vertx.core.internal.net.NetServerInternal;
18+
import io.vertx.core.internal.tls.SslContextProvider;
19+
import io.vertx.core.net.*;
20+
import io.vertx.core.spi.metrics.Metrics;
21+
22+
import java.time.Duration;
23+
24+
public class NetServerImpl implements NetServerInternal {
25+
26+
private volatile SocketAddress bindAddress;
27+
private Handler<NetSocket> connectHandler;
28+
private TcpServerImpl delegate;
29+
30+
public NetServerImpl(TcpServerImpl delegate) {
31+
this.delegate = delegate;
32+
}
33+
34+
@Override
35+
public NetServerInternal connectHandler(@Nullable Handler<NetSocket> handler) {
36+
connectHandler = handler;
37+
delegate.connectHandler((Handler)handler);
38+
return this;
39+
}
40+
41+
@Override
42+
public NetServerInternal exceptionHandler(Handler<Throwable> handler) {
43+
delegate.exceptionHandler(handler);
44+
return this;
45+
}
46+
47+
@Override
48+
public SslContextProvider sslContextProvider() {
49+
return delegate.sslContextProvider();
50+
}
51+
52+
@Override
53+
public Future<NetServer> listen(ContextInternal context, SocketAddress localAddress) {
54+
return delegate.listen(context, localAddress)
55+
.map(addr -> {
56+
bindAddress = addr;
57+
return this;
58+
});
59+
}
60+
61+
@Override
62+
public int sniEntrySize() {
63+
return delegate.sniEntrySize();
64+
}
65+
66+
@Override
67+
public boolean isClosed() {
68+
return delegate.isClosed();
69+
}
70+
71+
@Override
72+
public Handler<NetSocket> connectHandler() {
73+
return connectHandler;
74+
}
75+
76+
@Override
77+
public Future<NetServer> listen() {
78+
return delegate.listen()
79+
.map(addr -> {
80+
bindAddress = addr;
81+
return this;
82+
});
83+
}
84+
85+
@Override
86+
public Future<NetServer> listen(SocketAddress localAddress) {
87+
return delegate.listen(localAddress)
88+
.map(addr -> {
89+
bindAddress = addr;
90+
return this;
91+
});
92+
}
93+
94+
@Override
95+
public Future<Void> shutdown(Duration timeout) {
96+
return delegate.shutdown(timeout);
97+
}
98+
99+
@Override
100+
public int actualPort() {
101+
SocketAddress addr = bindAddress;
102+
return addr != null ? addr.port() : -1;
103+
}
104+
105+
@Override
106+
public Future<Boolean> updateSSLOptions(ServerSSLOptions options, boolean force) {
107+
return delegate.updateSSLOptions(options, force);
108+
}
109+
110+
@Override
111+
public Future<Boolean> updateTrafficShapingOptions(TrafficShapingOptions options) {
112+
return delegate.updateTrafficShapingOptions(options);
113+
}
114+
115+
@Override
116+
public Metrics getMetrics() {
117+
return delegate.getMetrics();
118+
}
119+
}

vertx-core/src/main/java/io/vertx/core/net/impl/tcp/TcpServerImpl.java

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.vertx.core.internal.net.NetServerInternal;
3939
import io.vertx.core.internal.net.SslChannelProvider;
4040
import io.vertx.core.internal.net.SslHandshakeCompletionHandler;
41+
import io.vertx.core.internal.net.TcpServerInternal;
4142
import io.vertx.core.internal.resolver.NameResolver;
4243
import io.vertx.core.internal.tls.SslContextManager;
4344
import io.vertx.core.internal.tls.SslContextProvider;
@@ -59,7 +60,7 @@
5960
* @author <a href="http://tfox.org">Tim Fox</a>
6061
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
6162
*/
62-
public class TcpServerImpl implements NetServerInternal {
63+
public class TcpServerImpl implements TcpServerInternal {
6364

6465
private static final Logger log = LoggerFactory.getLogger(TcpServerImpl.class);
6566

@@ -69,7 +70,7 @@ public class TcpServerImpl implements NetServerInternal {
6970
private final boolean fileRegionEnabled;
7071
private final boolean registerWriteHandler;
7172
private final String protocol;
72-
private Handler<NetSocket> handler;
73+
private Handler<TcpSocket> handler;
7374
private Handler<Throwable> exceptionHandler;
7475

7576
// Per server
@@ -114,12 +115,7 @@ public SslContextProvider sslContextProvider() {
114115
}
115116

116117
@Override
117-
public synchronized Handler<NetSocket> connectHandler() {
118-
return handler;
119-
}
120-
121-
@Override
122-
public synchronized TcpServerImpl connectHandler(Handler<NetSocket> handler) {
118+
public synchronized TcpServerImpl connectHandler(Handler<TcpSocket> handler) {
123119
if (isListening()) {
124120
throw new IllegalStateException("Cannot set connectHandler when server is listening");
125121
}
@@ -136,11 +132,6 @@ public synchronized TcpServerImpl exceptionHandler(Handler<Throwable> handler) {
136132
return this;
137133
}
138134

139-
public int actualPort() {
140-
TcpServerImpl server = actualServer;
141-
return server != null ? server.actualPort : actualPort;
142-
}
143-
144135
@Override
145136
public Future<Void> shutdown(Duration timeout) {
146137
ConnectionGroup group = channelGroup;
@@ -151,22 +142,22 @@ public Future<Void> shutdown(Duration timeout) {
151142
}
152143

153144
@Override
154-
public Future<NetServer> listen(SocketAddress localAddress) {
145+
public Future<SocketAddress> listen(SocketAddress localAddress) {
155146
return listen(vertx.getOrCreateContext(), localAddress);
156147
}
157148

158-
public Future<NetServer> listen(ContextInternal context, SocketAddress localAddress) {
149+
public Future<SocketAddress> listen(ContextInternal context, SocketAddress localAddress) {
159150
if (localAddress == null) {
160151
throw new NullPointerException("No null bind local address");
161152
}
162153
if (handler == null) {
163154
throw new IllegalStateException("Set connect handler first");
164155
}
165-
return bind(context, localAddress).map(this);
156+
return bind(context, localAddress);
166157
}
167158

168159
@Override
169-
public Future<NetServer> listen() {
160+
public Future<SocketAddress> listen() {
170161
return listen(config.getPort(), config.getHost());
171162
}
172163

@@ -177,11 +168,11 @@ public boolean isClosed() {
177168
private class NetSocketInitializer {
178169

179170
private final ContextInternal context;
180-
private final Handler<NetSocket> connectionHandler;
171+
private final Handler<TcpSocket> connectionHandler;
181172
private final Handler<Throwable> exceptionHandler;
182173
private final GlobalTrafficShapingHandler trafficShapingHandler;
183174

184-
NetSocketInitializer(ContextInternal context, Handler<NetSocket> connectionHandler, Handler<Throwable> exceptionHandler, GlobalTrafficShapingHandler trafficShapingHandler) {
175+
NetSocketInitializer(ContextInternal context, Handler<TcpSocket> connectionHandler, Handler<Throwable> exceptionHandler, GlobalTrafficShapingHandler trafficShapingHandler) {
185176
this.context = context;
186177
this.connectionHandler = connectionHandler;
187178
this.exceptionHandler = exceptionHandler;
@@ -406,16 +397,17 @@ public void updateTrafficShapingOptions(TrafficShapingOptions options, Promise<B
406397
}
407398
}
408399

409-
private synchronized Future<Channel> bind(ContextInternal context, SocketAddress localAddress) {
400+
private synchronized Future<SocketAddress> bind(ContextInternal context, SocketAddress localAddress) {
410401
if (listening) {
411402
throw new IllegalStateException("Listen already called");
412403
}
413404

414405
this.listening = true;
415406
this.eventLoop = context.nettyEventLoop();
416407

408+
PromiseInternal<Channel> promise = context.promise();
417409
SocketAddress bindAddress;
418-
Map<ServerID, NetServerInternal> sharedNetServers = vertx.sharedTcpServers();
410+
Map<ServerID, TcpServerInternal> sharedNetServers = (Map)vertx.sharedTcpServers();
419411
synchronized (sharedNetServers) {
420412
actualPort = localAddress.port();
421413
String hostOrPath = localAddress.isInetSocket() ? localAddress.host() : localAddress.path();
@@ -451,7 +443,6 @@ protected void handleShutdown(Duration timeout, Completable<Void> completion) {
451443
}
452444
};
453445
channelGroup = group;
454-
PromiseInternal<Channel> promise = context.promise();
455446
if (main == null) {
456447

457448
SslContextManager helper;
@@ -508,8 +499,6 @@ protected void handleShutdown(Duration timeout, Completable<Void> completion) {
508499
}
509500
listening = false;
510501
});
511-
512-
return bindFuture;
513502
} else {
514503
// Server already exists with that host/port - we will use that
515504
actualServer = main;
@@ -523,9 +512,11 @@ protected void handleShutdown(Duration timeout, Completable<Void> completion) {
523512
};
524513
actualServer.channelBalancer.addWorker(eventLoop, worker);
525514
main.bindFuture.onComplete(promise);
526-
return promise.future();
527515
}
528516
}
517+
return promise
518+
.future()
519+
.map(ch -> vertx.transport().convert(ch.localAddress()));
529520
}
530521

531522
private void bind(
@@ -535,7 +526,7 @@ private void bind(
535526
SocketAddress localAddress,
536527
boolean shared,
537528
Promise<Channel> promise,
538-
Map<ServerID, NetServerInternal> sharedNetServers,
529+
Map<ServerID, TcpServerInternal> sharedNetServers,
539530
ServerID id) {
540531
// Socket bind
541532
channelBalancer.addWorker(eventLoop, worker);

0 commit comments

Comments
 (0)