Skip to content

Commit bf59439

Browse files
committed
More duplicated context unwrapping
1 parent 1187349 commit bf59439

28 files changed

+303
-115
lines changed

vertx-core/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
*/
5252
public class DatagramSocketImpl implements DatagramSocket, MetricsProvider, Closeable {
5353

54-
public static DatagramSocketImpl create(VertxInternal vertx, CloseFuture closeFuture, DatagramSocketOptions options) {
54+
public static DatagramSocketImpl create(ContextInternal vertx, CloseFuture closeFuture, DatagramSocketOptions options) {
5555
DatagramSocketImpl socket = new DatagramSocketImpl(vertx, closeFuture, options);
5656
// Make sure object is fully initiliased to avoid race with async registration
5757
socket.init();
@@ -65,19 +65,18 @@ public static DatagramSocketImpl create(VertxInternal vertx, CloseFuture closeFu
6565
private Handler<Throwable> exceptionHandler;
6666
private final CloseFuture closeFuture;
6767

68-
private DatagramSocketImpl(VertxInternal vertx, CloseFuture closeFuture, DatagramSocketOptions options) {
69-
Transport transport = vertx.transport();
68+
private DatagramSocketImpl(ContextInternal context, CloseFuture closeFuture, DatagramSocketOptions options) {
69+
Transport transport = context.owner().transport();
7070
DatagramChannel channel = transport.datagramChannel(options.isIpV6() ? InternetProtocolFamily.IPv6 : InternetProtocolFamily.IPv4);
7171
transport.configure(channel, new DatagramSocketOptions(options));
72-
ContextInternal context = vertx.getOrCreateContext();
7372
channel.config().setOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
7473
MaxMessagesRecvByteBufAllocator bufAllocator = channel.config().getRecvByteBufAllocator();
7574
bufAllocator.maxMessagesPerRead(1);
7675
context.nettyEventLoop().register(channel);
7776
if (options.getLogActivity()) {
7877
channel.pipeline().addLast("logging", new LoggingHandler(options.getActivityLogDataFormat()));
7978
}
80-
VertxMetrics metrics = vertx.metrics();
79+
VertxMetrics metrics = context.owner().metrics();
8180
this.metrics = metrics != null ? metrics.createDatagramSocketMetrics(options) : null;
8281
this.channel = channel;
8382
this.context = context;

vertx-core/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ public DnsClientImpl(VertxInternal vertx, DnsClientOptions options) {
6464
this.vertx = vertx;
6565
}
6666

67-
private DnsNameResolver resolver(ContextInternal ctx, InternetProtocolFamily ipFamily) {
68-
EventLoop el = ctx.nettyEventLoop();
67+
private DnsNameResolver resolver(EventLoop el, InternetProtocolFamily ipFamily) {
6968
DnsNameResolver resolver;
7069
synchronized (this) {
7170
if (closed) {
@@ -169,7 +168,7 @@ public Future<List<SrvRecord>> resolveSRV(String name) {
169168
private Future<List<String>> resolveAll(String name, InternetProtocolFamily ipFamily) {
170169
Objects.requireNonNull(name);
171170
ContextInternal ctx = vertx.getOrCreateContext();
172-
DnsNameResolver resolver = resolver(ctx, ipFamily);
171+
DnsNameResolver resolver = resolver(ctx.nettyEventLoop(), ipFamily);
173172
if (resolver == null) {
174173
return ctx.failedFuture("DNS client is closed");
175174
}
@@ -200,7 +199,7 @@ private Future<List<String>> resolveAll(String name, InternetProtocolFamily ipFa
200199
private <T> Future<List<T>> queryAll(String name, DnsRecordType recordType, Function<DnsRecord, T> mapper) {
201200
Objects.requireNonNull(name);
202201
ContextInternal ctx = vertx.getOrCreateContext();
203-
DnsNameResolver resolver = resolver(ctx, InternetProtocolFamily.IPv4);
202+
DnsNameResolver resolver = resolver(ctx.nettyEventLoop(), InternetProtocolFamily.IPv4);
204203
if (resolver == null) {
205204
return ctx.failedFuture("DNS client is closed");
206205
}

vertx-core/src/main/java/io/vertx/core/eventbus/EventBus.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.vertx.core.eventbus.impl.DefaultSerializableChecker;
2222
import io.vertx.core.metrics.Measured;
2323

24+
import java.util.Objects;
2425
import java.util.function.Function;
2526

2627
import static io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE;
@@ -175,7 +176,12 @@ default <T> Future<Message<T>> request(String address, @Nullable Object message)
175176
* @param handler the handler that will process the received messages
176177
* @return the event bus message consumer
177178
*/
178-
<T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler);
179+
default <T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler) {
180+
Objects.requireNonNull(handler, "handler");
181+
MessageConsumer<T> consumer = localConsumer(address);
182+
consumer.handler(handler);
183+
return consumer;
184+
}
179185

180186
/**
181187
* Create a message sender against the specified address.

vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ public <T> MessageConsumer<T> consumer(MessageConsumerOptions options, Handler<M
187187
public <T> MessageConsumer<T> consumer(String address) {
188188
checkStarted();
189189
Objects.requireNonNull(address, "address");
190-
return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, false, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
190+
return new MessageConsumerImpl<>(vertx.getOrCreateContext().unwrap(), this, address, false, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
191191
}
192192

193193
@Override
@@ -200,17 +200,18 @@ public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handl
200200

201201
@Override
202202
public <T> MessageConsumer<T> localConsumer(String address) {
203-
checkStarted();
204-
Objects.requireNonNull(address, "address");
205-
return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, true, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
203+
return localConsumer(vertx.getOrCreateContext(), address);
206204
}
207205

208206
@Override
209-
public <T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler) {
210-
Objects.requireNonNull(handler, "handler");
211-
MessageConsumer<T> consumer = localConsumer(address);
212-
consumer.handler(handler);
213-
return consumer;
207+
public <T> MessageConsumer<T> localConsumer(Context context, String address) {
208+
checkStarted();
209+
Objects.requireNonNull(context, "context");
210+
Objects.requireNonNull(address, "address");
211+
if (context.owner() != vertx) {
212+
throw new IllegalArgumentException("Invalid context instance");
213+
}
214+
return new MessageConsumerImpl<>((ContextInternal) context, this, address, true, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
214215
}
215216

216217
@Override

vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusInternal.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,13 @@
1111

1212
package io.vertx.core.eventbus.impl;
1313

14+
import io.vertx.core.Context;
1415
import io.vertx.core.Promise;
1516
import io.vertx.core.eventbus.EventBus;
17+
import io.vertx.core.eventbus.MessageConsumer;
18+
import io.vertx.core.eventbus.MessageConsumerOptions;
19+
20+
import java.util.Objects;
1621

1722
public interface EventBusInternal extends EventBus {
1823

@@ -21,6 +26,8 @@ public interface EventBusInternal extends EventBus {
2126
*/
2227
void start(Promise<Void> promise);
2328

29+
<T> MessageConsumer<T> localConsumer(Context context, String address);
30+
2431
/**
2532
* Close the event bus and release any resources held.
2633
*/

vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public class MessageConsumerImpl<T> extends HandlerRegistration<T> implements Me
3838

3939
MessageConsumerImpl(ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly, int maxBufferedMessages) {
4040
super(context, eventBus, address, false);
41+
42+
if (context.isDuplicate()) {
43+
throw new IllegalArgumentException("Duplicated context are not allowed");
44+
}
45+
4146
this.localOnly = localOnly;
4247
this.result = context.promise();
4348
this.maxBufferedMessages = maxBufferedMessages;

vertx-core/src/main/java/io/vertx/core/http/impl/ClientWebSocketImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class ClientWebSocketImpl implements ClientWebSocketInternal {
5252

5353
@Override
5454
public Future<WebSocket> connect(WebSocketConnectOptions options) {
55-
return connect(client.vertx().getOrCreateContext(), options);
55+
return connect(client.vertx().getOrCreateContext().unwrap(), options);
5656
}
5757

5858
@Override

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ void createWebSocket(Http1xServerRequest request, PromiseInternal<ServerWebSocke
297297
promise.fail(e);
298298
return;
299299
}
300-
promise.complete(new ServerWebSocketHandshaker(request, handshaker, options));
300+
promise.complete(new ServerWebSocketHandshaker(request, promise.context(), handshaker, options));
301301
}
302302
});
303303
}

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -436,22 +436,14 @@ public String getFormAttribute(String attributeName) {
436436

437437
@Override
438438
public Future<ServerWebSocket> toWebSocket() {
439-
return webSocketHandshake().compose(handshake -> handshake.accept());
440-
}
441-
442-
/**
443-
* @return a future of the un-accepted WebSocket
444-
*/
445-
Future<ServerWebSocketHandshake> webSocketHandshake() {
446-
PromiseInternal<ServerWebSocketHandshake> promise = context.promise();
447-
webSocketHandshake(promise);
448-
return promise.future();
439+
return webSocketHandshake(context.unwrap()).compose(handshake -> handshake.accept());
449440
}
450441

451442
/**
452443
* Handle the request when a WebSocket upgrade header is present.
453444
*/
454-
private void webSocketHandshake(PromiseInternal<ServerWebSocketHandshake> promise) {
445+
Future<ServerWebSocketHandshake> webSocketHandshake(ContextInternal context) {
446+
PromiseInternal<ServerWebSocketHandshake> promise = context.promise();
455447
BufferInternal body = BufferInternal.buffer();
456448
boolean[] failed = new boolean[1];
457449
handler(buff -> {
@@ -482,6 +474,7 @@ private void webSocketHandshake(PromiseInternal<ServerWebSocketHandshake> promis
482474
});
483475
// In case we were paused
484476
resume();
477+
return promise.future();
485478
}
486479

487480
@Override

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerRequestHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void handle(HttpServerRequest req) {
4444
if (wsHandler != null || wsHandshakeHandler != null) {
4545
if (req.headers().contains(UPGRADE, WEBSOCKET, true) && handlers.server.wsAccept()) {
4646
// Missing upgrade header + null request handler will be handled when creating the handshake by sending a 400 error
47-
((Http1xServerRequest)req).webSocketHandshake().onComplete(ar -> {
47+
((Http1xServerRequest)req).webSocketHandshake(((Http1xServerRequest)req).context.unwrap()).onComplete(ar -> {
4848
if (ar.succeeded()) {
4949
ServerWebSocketHandshaker handshake = (ServerWebSocketHandshaker) ar.result();
5050
if (wsHandshakeHandler == null) {

0 commit comments

Comments
 (0)