Skip to content

Commit 99d9583

Browse files
committed
More duplicated context unwrapping
1 parent 1187349 commit 99d9583

22 files changed

+229
-65
lines changed

vertx-core/src/main/java/io/vertx/core/eventbus/EventBus.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.vertx.core.eventbus.impl.DefaultSerializableChecker;
2222
import io.vertx.core.metrics.Measured;
2323

24+
import java.util.Objects;
2425
import java.util.function.Function;
2526

2627
import static io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE;
@@ -175,7 +176,12 @@ default <T> Future<Message<T>> request(String address, @Nullable Object message)
175176
* @param handler the handler that will process the received messages
176177
* @return the event bus message consumer
177178
*/
178-
<T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler);
179+
default <T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler) {
180+
Objects.requireNonNull(handler, "handler");
181+
MessageConsumer<T> consumer = localConsumer(address);
182+
consumer.handler(handler);
183+
return consumer;
184+
}
179185

180186
/**
181187
* Create a message sender against the specified address.

vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ public <T> MessageConsumer<T> consumer(MessageConsumerOptions options, Handler<M
187187
public <T> MessageConsumer<T> consumer(String address) {
188188
checkStarted();
189189
Objects.requireNonNull(address, "address");
190-
return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, false, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
190+
return new MessageConsumerImpl<>(vertx.getOrCreateContext().unwrap(), this, address, false, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
191191
}
192192

193193
@Override
@@ -200,17 +200,18 @@ public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handl
200200

201201
@Override
202202
public <T> MessageConsumer<T> localConsumer(String address) {
203-
checkStarted();
204-
Objects.requireNonNull(address, "address");
205-
return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, true, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
203+
return localConsumer(vertx.getOrCreateContext(), address);
206204
}
207205

208206
@Override
209-
public <T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler) {
210-
Objects.requireNonNull(handler, "handler");
211-
MessageConsumer<T> consumer = localConsumer(address);
212-
consumer.handler(handler);
213-
return consumer;
207+
public <T> MessageConsumer<T> localConsumer(Context context, String address) {
208+
checkStarted();
209+
Objects.requireNonNull(context, "context");
210+
Objects.requireNonNull(address, "address");
211+
if (context.owner() != vertx) {
212+
throw new IllegalArgumentException("Invalid context instance");
213+
}
214+
return new MessageConsumerImpl<>((ContextInternal) context, this, address, true, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
214215
}
215216

216217
@Override

vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusInternal.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,13 @@
1111

1212
package io.vertx.core.eventbus.impl;
1313

14+
import io.vertx.core.Context;
1415
import io.vertx.core.Promise;
1516
import io.vertx.core.eventbus.EventBus;
17+
import io.vertx.core.eventbus.MessageConsumer;
18+
import io.vertx.core.eventbus.MessageConsumerOptions;
19+
20+
import java.util.Objects;
1621

1722
public interface EventBusInternal extends EventBus {
1823

@@ -21,6 +26,8 @@ public interface EventBusInternal extends EventBus {
2126
*/
2227
void start(Promise<Void> promise);
2328

29+
<T> MessageConsumer<T> localConsumer(Context context, String address);
30+
2431
/**
2532
* Close the event bus and release any resources held.
2633
*/

vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public class MessageConsumerImpl<T> extends HandlerRegistration<T> implements Me
3838

3939
MessageConsumerImpl(ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly, int maxBufferedMessages) {
4040
super(context, eventBus, address, false);
41+
42+
if (context.isDuplicate()) {
43+
throw new IllegalArgumentException("Duplicated context are not allowed");
44+
}
45+
4146
this.localOnly = localOnly;
4247
this.result = context.promise();
4348
this.maxBufferedMessages = maxBufferedMessages;

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ void createWebSocket(Http1xServerRequest request, PromiseInternal<ServerWebSocke
297297
promise.fail(e);
298298
return;
299299
}
300-
promise.complete(new ServerWebSocketHandshaker(request, handshaker, options));
300+
promise.complete(new ServerWebSocketHandshaker(request, promise.context(), handshaker, options));
301301
}
302302
});
303303
}

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -436,22 +436,14 @@ public String getFormAttribute(String attributeName) {
436436

437437
@Override
438438
public Future<ServerWebSocket> toWebSocket() {
439-
return webSocketHandshake().compose(handshake -> handshake.accept());
440-
}
441-
442-
/**
443-
* @return a future of the un-accepted WebSocket
444-
*/
445-
Future<ServerWebSocketHandshake> webSocketHandshake() {
446-
PromiseInternal<ServerWebSocketHandshake> promise = context.promise();
447-
webSocketHandshake(promise);
448-
return promise.future();
439+
return webSocketHandshake(context.unwrap()).compose(handshake -> handshake.accept());
449440
}
450441

451442
/**
452443
* Handle the request when a WebSocket upgrade header is present.
453444
*/
454-
private void webSocketHandshake(PromiseInternal<ServerWebSocketHandshake> promise) {
445+
Future<ServerWebSocketHandshake> webSocketHandshake(ContextInternal context) {
446+
PromiseInternal<ServerWebSocketHandshake> promise = context.promise();
455447
BufferInternal body = BufferInternal.buffer();
456448
boolean[] failed = new boolean[1];
457449
handler(buff -> {
@@ -482,6 +474,7 @@ private void webSocketHandshake(PromiseInternal<ServerWebSocketHandshake> promis
482474
});
483475
// In case we were paused
484476
resume();
477+
return promise.future();
485478
}
486479

487480
@Override

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerRequestHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void handle(HttpServerRequest req) {
4444
if (wsHandler != null || wsHandshakeHandler != null) {
4545
if (req.headers().contains(UPGRADE, WEBSOCKET, true) && handlers.server.wsAccept()) {
4646
// Missing upgrade header + null request handler will be handled when creating the handshake by sending a 400 error
47-
((Http1xServerRequest)req).webSocketHandshake().onComplete(ar -> {
47+
((Http1xServerRequest)req).webSocketHandshake(((Http1xServerRequest)req).context.unwrap()).onComplete(ar -> {
4848
if (ar.succeeded()) {
4949
ServerWebSocketHandshaker handshake = (ServerWebSocketHandshaker) ar.result();
5050
if (wsHandshakeHandler == null) {

vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@ public Future<HttpClientConnectionInternal> wrap(ContextInternal context, NetSoc
166166
}
167167

168168
public Future<HttpClientConnectionInternal> httpConnect(ContextInternal context) {
169+
if (context.isDuplicate()) {
170+
throw new IllegalArgumentException("Cannot accept duplicate contexts");
171+
}
169172
Promise<NetSocket> promise = context.promise();
170173
Future<NetSocket> future = promise.future();
171174
// We perform the compose operation before calling connect to be sure that the composition happens

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ public Future<HttpClientConnection> connect(HttpConnectOptions connect) {
242242
server,
243243
false,
244244
0);
245-
return (Future) connector.httpConnect(vertx.getOrCreateContext()).map(conn -> new UnpooledHttpClientConnection(conn).init());
245+
return (Future) connector.httpConnect(vertx.getOrCreateContext().unwrap()).map(conn -> new UnpooledHttpClientConnection(conn).init());
246246
}
247247

248248
@Override

vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,27 @@ public Future<HttpServer> listen() {
169169
}
170170

171171
@Override
172-
public synchronized Future<HttpServer> listen(SocketAddress address) {
172+
public Future<HttpServer> listen(SocketAddress address) {
173+
ContextInternal context = vertx.getOrCreateContext();
174+
ContextInternal listenContext;
175+
if (context.isEventLoopContext()) {
176+
listenContext = context.unwrap();
177+
} else {
178+
listenContext = context.toBuilder()
179+
.withThreadingModel(ThreadingModel.EVENT_LOOP)
180+
.build();
181+
}
182+
Promise<HttpServer> promise = context.promise();
183+
Supplier<ContextInternal> streamContextSupplier = context.unwrap()::duplicate;
184+
listen(listenContext, context.threadingModel(), streamContextSupplier, address, promise);
185+
return promise.future();
186+
}
187+
188+
private synchronized void listen(ContextInternal listenContext,
189+
ThreadingModel threadingModel,
190+
Supplier<ContextInternal> streamContextSupplier,
191+
SocketAddress address,
192+
Promise<HttpServer> promise) {
173193
if (requestHandler == null && webSocketHandler == null && webSocketHandhakeHandler == null) {
174194
throw new IllegalStateException("Set request or WebSocket handler first");
175195
}
@@ -181,23 +201,12 @@ public synchronized Future<HttpServer> listen(SocketAddress address) {
181201
if (tcpOptions.getSslOptions() != null) {
182202
configureApplicationLayerProtocols(tcpOptions.getSslOptions());
183203
}
184-
ContextInternal context = vertx.getOrCreateContext();
185-
ContextInternal listenContext;
186-
// Not sure of this
187-
if (context.isEventLoopContext()) {
188-
listenContext = context.unwrap();
189-
} else {
190-
listenContext = context.toBuilder()
191-
.withThreadingModel(ThreadingModel.EVENT_LOOP)
192-
.build();
193-
}
194204
NetServerInternal server = vertx.createNetServer(tcpOptions);
195205
Handler<Throwable> h = exceptionHandler;
196206
Handler<Throwable> exceptionHandler = h != null ? h : DEFAULT_EXCEPTION_HANDLER;
197207
server.exceptionHandler(exceptionHandler);
198208
server.connectHandler(so -> {
199209
NetSocketImpl soi = (NetSocketImpl) so;
200-
Supplier<ContextInternal> streamContextSupplier = context.unwrap()::duplicate;
201210
String host = address.isInetSocket() ? address.host() : "localhost";
202211
int port = address.port();
203212
String serverOrigin = (tcpOptions.isSsl() ? "https" : "http") + "://" + host + ":" + port;
@@ -211,7 +220,7 @@ public synchronized Future<HttpServer> listen(SocketAddress address) {
211220
exceptionHandler);
212221
HttpServerConnectionInitializer initializer = new HttpServerConnectionInitializer(
213222
listenContext,
214-
context.threadingModel(),
223+
threadingModel,
215224
streamContextSupplier,
216225
this,
217226
vertx,
@@ -224,15 +233,7 @@ public synchronized Future<HttpServer> listen(SocketAddress address) {
224233
});
225234
tcpServer = server;
226235
closeSequence = new CloseSequence(p -> doClose(server, p), p -> doShutdown(server, p ));
227-
Promise<HttpServer> result = context.promise();
228-
tcpServer.listen(listenContext, address).onComplete(ar -> {
229-
if (ar.succeeded()) {
230-
result.complete(this);
231-
} else {
232-
result.fail(ar.cause());
233-
}
234-
});
235-
return result.future();
236+
tcpServer.listen(listenContext, address).map(this).onComplete(promise);
236237
}
237238

238239
private void doShutdown(NetServer netServer, Completable<Void> p) {

0 commit comments

Comments
 (0)