diff --git a/vertx-core/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java b/vertx-core/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java index a6cdf58a9b2..0ec16a87dd7 100644 --- a/vertx-core/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java @@ -51,7 +51,7 @@ */ public class DatagramSocketImpl implements DatagramSocket, MetricsProvider, Closeable { - public static DatagramSocketImpl create(VertxInternal vertx, CloseFuture closeFuture, DatagramSocketOptions options) { + public static DatagramSocketImpl create(ContextInternal vertx, CloseFuture closeFuture, DatagramSocketOptions options) { DatagramSocketImpl socket = new DatagramSocketImpl(vertx, closeFuture, options); // Make sure object is fully initiliased to avoid race with async registration socket.init(); @@ -65,11 +65,10 @@ public static DatagramSocketImpl create(VertxInternal vertx, CloseFuture closeFu private Handler exceptionHandler; private final CloseFuture closeFuture; - private DatagramSocketImpl(VertxInternal vertx, CloseFuture closeFuture, DatagramSocketOptions options) { - Transport transport = vertx.transport(); + private DatagramSocketImpl(ContextInternal context, CloseFuture closeFuture, DatagramSocketOptions options) { + Transport transport = context.owner().transport(); DatagramChannel channel = transport.datagramChannel(options.isIpV6() ? InternetProtocolFamily.IPv6 : InternetProtocolFamily.IPv4); transport.configure(channel, new DatagramSocketOptions(options)); - ContextInternal context = vertx.getOrCreateContext(); channel.config().setOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true); MaxMessagesRecvByteBufAllocator bufAllocator = channel.config().getRecvByteBufAllocator(); bufAllocator.maxMessagesPerRead(1); @@ -77,7 +76,7 @@ private DatagramSocketImpl(VertxInternal vertx, CloseFuture closeFuture, Datagra if (options.getLogActivity()) { channel.pipeline().addLast("logging", new LoggingHandler(options.getActivityLogDataFormat())); } - VertxMetrics metrics = vertx.metrics(); + VertxMetrics metrics = context.owner().metrics(); this.metrics = metrics != null ? metrics.createDatagramSocketMetrics(options) : null; this.channel = channel; this.context = context; diff --git a/vertx-core/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java b/vertx-core/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java index fb499889e83..3543411903f 100644 --- a/vertx-core/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java @@ -64,8 +64,7 @@ public DnsClientImpl(VertxInternal vertx, DnsClientOptions options) { this.vertx = vertx; } - private DnsNameResolver resolver(ContextInternal ctx, InternetProtocolFamily ipFamily) { - EventLoop el = ctx.nettyEventLoop(); + private DnsNameResolver resolver(EventLoop el, InternetProtocolFamily ipFamily) { DnsNameResolver resolver; synchronized (this) { if (closed) { @@ -169,7 +168,7 @@ public Future> resolveSRV(String name) { private Future> resolveAll(String name, InternetProtocolFamily ipFamily) { Objects.requireNonNull(name); ContextInternal ctx = vertx.getOrCreateContext(); - DnsNameResolver resolver = resolver(ctx, ipFamily); + DnsNameResolver resolver = resolver(ctx.nettyEventLoop(), ipFamily); if (resolver == null) { return ctx.failedFuture("DNS client is closed"); } @@ -200,7 +199,7 @@ private Future> resolveAll(String name, InternetProtocolFamily ipFa private Future> queryAll(String name, DnsRecordType recordType, Function mapper) { Objects.requireNonNull(name); ContextInternal ctx = vertx.getOrCreateContext(); - DnsNameResolver resolver = resolver(ctx, InternetProtocolFamily.IPv4); + DnsNameResolver resolver = resolver(ctx.nettyEventLoop(), InternetProtocolFamily.IPv4); if (resolver == null) { return ctx.failedFuture("DNS client is closed"); } diff --git a/vertx-core/src/main/java/io/vertx/core/eventbus/EventBus.java b/vertx-core/src/main/java/io/vertx/core/eventbus/EventBus.java index cb2e0976556..2aa13da0bb5 100644 --- a/vertx-core/src/main/java/io/vertx/core/eventbus/EventBus.java +++ b/vertx-core/src/main/java/io/vertx/core/eventbus/EventBus.java @@ -21,6 +21,7 @@ import io.vertx.core.eventbus.impl.DefaultSerializableChecker; import io.vertx.core.metrics.Measured; +import java.util.Objects; import java.util.function.Function; import static io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE; @@ -175,7 +176,12 @@ default Future> request(String address, @Nullable Object message) * @param handler the handler that will process the received messages * @return the event bus message consumer */ - MessageConsumer localConsumer(String address, Handler> handler); + default MessageConsumer localConsumer(String address, Handler> handler) { + Objects.requireNonNull(handler, "handler"); + MessageConsumer consumer = localConsumer(address); + consumer.handler(handler); + return consumer; + } /** * Create a message sender against the specified address. diff --git a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index f5acb39438b..e6151c9556b 100644 --- a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -187,7 +187,7 @@ public MessageConsumer consumer(MessageConsumerOptions options, Handler MessageConsumer consumer(String address) { checkStarted(); Objects.requireNonNull(address, "address"); - return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, false, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES); + return new MessageConsumerImpl<>(vertx.getOrCreateContext().unwrap(), this, address, false, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES); } @Override @@ -200,17 +200,18 @@ public MessageConsumer consumer(String address, Handler> handl @Override public MessageConsumer localConsumer(String address) { - checkStarted(); - Objects.requireNonNull(address, "address"); - return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, true, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES); + return localConsumer(vertx.getOrCreateContext(), address); } @Override - public MessageConsumer localConsumer(String address, Handler> handler) { - Objects.requireNonNull(handler, "handler"); - MessageConsumer consumer = localConsumer(address); - consumer.handler(handler); - return consumer; + public MessageConsumer localConsumer(Context context, String address) { + checkStarted(); + Objects.requireNonNull(context, "context"); + Objects.requireNonNull(address, "address"); + if (context.owner() != vertx) { + throw new IllegalArgumentException("Invalid context instance"); + } + return new MessageConsumerImpl<>((ContextInternal) context, this, address, true, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES); } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusInternal.java b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusInternal.java index e4b2aaea452..b3ac19968f9 100644 --- a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusInternal.java @@ -11,8 +11,13 @@ package io.vertx.core.eventbus.impl; +import io.vertx.core.Context; import io.vertx.core.Promise; import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.MessageConsumer; +import io.vertx.core.eventbus.MessageConsumerOptions; + +import java.util.Objects; public interface EventBusInternal extends EventBus { @@ -21,6 +26,8 @@ public interface EventBusInternal extends EventBus { */ void start(Promise promise); + MessageConsumer localConsumer(Context context, String address); + /** * Close the event bus and release any resources held. */ diff --git a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java index f9dc75654d3..3e4c976273f 100644 --- a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java @@ -38,6 +38,11 @@ public class MessageConsumerImpl extends HandlerRegistration implements Me MessageConsumerImpl(ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly, int maxBufferedMessages) { super(context, eventBus, address, false); + + if (context.isDuplicate()) { + throw new IllegalArgumentException("Duplicated context are not allowed"); + } + this.localOnly = localOnly; this.result = context.promise(); this.maxBufferedMessages = maxBufferedMessages; diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/ClientWebSocketImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/ClientWebSocketImpl.java index 8d9a3a17b2d..3a6ced12ddc 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/ClientWebSocketImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/ClientWebSocketImpl.java @@ -52,7 +52,7 @@ public class ClientWebSocketImpl implements ClientWebSocketInternal { @Override public Future connect(WebSocketConnectOptions options) { - return connect(client.vertx().getOrCreateContext(), options); + return connect(client.vertx().getOrCreateContext().unwrap(), options); } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java index ad95de480e1..bfd218a93d2 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java @@ -297,7 +297,7 @@ void createWebSocket(Http1xServerRequest request, PromiseInternal toWebSocket() { - return webSocketHandshake().compose(handshake -> handshake.accept()); - } - - /** - * @return a future of the un-accepted WebSocket - */ - Future webSocketHandshake() { - PromiseInternal promise = context.promise(); - webSocketHandshake(promise); - return promise.future(); + return webSocketHandshake(context.unwrap()).compose(handshake -> handshake.accept()); } /** * Handle the request when a WebSocket upgrade header is present. */ - private void webSocketHandshake(PromiseInternal promise) { + Future webSocketHandshake(ContextInternal context) { + PromiseInternal promise = context.promise(); BufferInternal body = BufferInternal.buffer(); boolean[] failed = new boolean[1]; handler(buff -> { @@ -482,6 +474,7 @@ private void webSocketHandshake(PromiseInternal promis }); // In case we were paused resume(); + return promise.future(); } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerRequestHandler.java b/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerRequestHandler.java index 30aea6b8018..05ba849a6a0 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerRequestHandler.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerRequestHandler.java @@ -44,7 +44,7 @@ public void handle(HttpServerRequest req) { if (wsHandler != null || wsHandshakeHandler != null) { if (req.headers().contains(UPGRADE, WEBSOCKET, true) && handlers.server.wsAccept()) { // Missing upgrade header + null request handler will be handled when creating the handshake by sending a 400 error - ((Http1xServerRequest)req).webSocketHandshake().onComplete(ar -> { + ((Http1xServerRequest)req).webSocketHandshake(((Http1xServerRequest)req).context.unwrap()).onComplete(ar -> { if (ar.succeeded()) { ServerWebSocketHandshaker handshake = (ServerWebSocketHandshaker) ar.result(); if (wsHandshakeHandler == null) { diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java index 16c5e76b930..caac92d5c91 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java @@ -166,6 +166,9 @@ public Future wrap(ContextInternal context, NetSoc } public Future httpConnect(ContextInternal context) { + if (context.isDuplicate()) { + throw new IllegalArgumentException("Cannot accept duplicate contexts"); + } Promise promise = context.promise(); Future future = promise.future(); // We perform the compose operation before calling connect to be sure that the composition happens diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java index 34b4a17fdea..622b9fd22df 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java @@ -242,7 +242,7 @@ public Future connect(HttpConnectOptions connect) { server, false, 0); - return (Future) connector.httpConnect(vertx.getOrCreateContext()).map(conn -> new UnpooledHttpClientConnection(conn).init()); + return (Future) connector.httpConnect(vertx.getOrCreateContext().unwrap()).map(conn -> new UnpooledHttpClientConnection(conn).init()); } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java index a899b7a4ba4..8ba636d7014 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java @@ -169,7 +169,27 @@ public Future listen() { } @Override - public synchronized Future listen(SocketAddress address) { + public Future listen(SocketAddress address) { + ContextInternal context = vertx.getOrCreateContext(); + ContextInternal listenContext; + if (context.isEventLoopContext()) { + listenContext = context.unwrap(); + } else { + listenContext = context.toBuilder() + .withThreadingModel(ThreadingModel.EVENT_LOOP) + .build(); + } + Promise promise = context.promise(); + Supplier streamContextSupplier = context.unwrap()::duplicate; + listen(listenContext, context.threadingModel(), streamContextSupplier, address, promise); + return promise.future(); + } + + private synchronized void listen(ContextInternal listenContext, + ThreadingModel threadingModel, + Supplier streamContextSupplier, + SocketAddress address, + Promise promise) { if (requestHandler == null && webSocketHandler == null && webSocketHandhakeHandler == null) { throw new IllegalStateException("Set request or WebSocket handler first"); } @@ -181,23 +201,12 @@ public synchronized Future listen(SocketAddress address) { if (tcpOptions.getSslOptions() != null) { configureApplicationLayerProtocols(tcpOptions.getSslOptions()); } - ContextInternal context = vertx.getOrCreateContext(); - ContextInternal listenContext; - // Not sure of this - if (context.isEventLoopContext()) { - listenContext = context; - } else { - listenContext = context.toBuilder() - .withThreadingModel(ThreadingModel.EVENT_LOOP) - .build(); - } NetServerInternal server = vertx.createNetServer(tcpOptions); Handler h = exceptionHandler; Handler exceptionHandler = h != null ? h : DEFAULT_EXCEPTION_HANDLER; server.exceptionHandler(exceptionHandler); server.connectHandler(so -> { NetSocketImpl soi = (NetSocketImpl) so; - Supplier streamContextSupplier = context::duplicate; String host = address.isInetSocket() ? address.host() : "localhost"; int port = address.port(); String serverOrigin = (tcpOptions.isSsl() ? "https" : "http") + "://" + host + ":" + port; @@ -211,7 +220,7 @@ public synchronized Future listen(SocketAddress address) { exceptionHandler); HttpServerConnectionInitializer initializer = new HttpServerConnectionInitializer( listenContext, - context.threadingModel(), + threadingModel, streamContextSupplier, this, vertx, @@ -224,15 +233,7 @@ public synchronized Future listen(SocketAddress address) { }); tcpServer = server; closeSequence = new CloseSequence(p -> doClose(server, p), p -> doShutdown(server, p )); - Promise result = context.promise(); - tcpServer.listen(listenContext, address).onComplete(ar -> { - if (ar.succeeded()) { - result.complete(this); - } else { - result.fail(ar.cause()); - } - }); - return result.future(); + tcpServer.listen(listenContext, address).map(this).onComplete(promise); } private void doShutdown(NetServer netServer, Completable p) { diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/ServerWebSocketHandshaker.java b/vertx-core/src/main/java/io/vertx/core/http/impl/ServerWebSocketHandshaker.java index e0903459b45..9fc552f43d4 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/ServerWebSocketHandshaker.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/ServerWebSocketHandshaker.java @@ -52,8 +52,8 @@ public class ServerWebSocketHandshaker extends FutureImpl imple private final WebSocketServerHandshaker handshaker; private boolean done; - public ServerWebSocketHandshaker(Http1xServerRequest request, WebSocketServerHandshaker handshaker, HttpServerOptions options) { - super(request.context); + public ServerWebSocketHandshaker(Http1xServerRequest request, ContextInternal context, WebSocketServerHandshaker handshaker, HttpServerOptions options) { + super(context); this.request = request; this.handshaker = handshaker; this.options = options; @@ -94,7 +94,7 @@ public Future accept() { } ServerWebSocket ws; try { - ws = acceptHandshake(); + ws = acceptHandshake(context); } catch (Exception e) { return rejectHandshake(BAD_REQUEST.code()) .transform(ar -> { @@ -159,7 +159,7 @@ private Future rejectHandshake(int sc) { return response.setStatusCode(sc).end(status.reasonPhrase()); } - private ServerWebSocket acceptHandshake() { + private ServerWebSocket acceptHandshake(ContextInternal context) { Http1xServerConnection httpConn = (Http1xServerConnection) request.connection(); ChannelHandlerContext chctx = httpConn.channelHandlerContext(); Channel channel = chctx.channel(); @@ -175,9 +175,9 @@ private ServerWebSocket acceptHandshake() { } VertxHandler handler = VertxHandler.create(ctx -> { long closingTimeoutMS = options.getWebSocketClosingTimeout() >= 0 ? options.getWebSocketClosingTimeout() * 1000L : 0L; - WebSocketConnectionImpl webSocketConn = new WebSocketConnectionImpl(request.context, ctx, true, closingTimeoutMS,httpConn.metrics); + WebSocketConnectionImpl webSocketConn = new WebSocketConnectionImpl(context, ctx, true, closingTimeoutMS,httpConn.metrics); ServerWebSocketImpl webSocket = new ServerWebSocketImpl( - request.context(), + context, webSocketConn, handshaker.version() != WebSocketVersion.V00, request, diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketClientImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketClientImpl.java index e1cf3db9792..25cd877237b 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketClientImpl.java @@ -59,6 +59,9 @@ public Future connect(WebSocketConnectOptions options) { } void webSocket(ContextInternal ctx, WebSocketConnectOptions connectOptions, Promise promise) { + if (ctx.isDuplicate()) { + throw new IllegalArgumentException(); + } int port = getPort(connectOptions); String host = getHost(connectOptions); SocketAddress addr = SocketAddress.inetSocketAddress(port, host); @@ -91,47 +94,10 @@ public Future webSocket(int port, String host, String requestURI) { } public Future webSocket(WebSocketConnectOptions options) { - return webSocket(vertx.getOrCreateContext(), options); - } - - static WebSocketConnectOptions webSocketConnectOptionsAbs(String url, MultiMap headers, WebSocketVersion version, List subProtocols) { - URI uri; - try { - uri = new URI(url); - } catch (URISyntaxException e) { - throw new IllegalArgumentException(e); - } - String scheme = uri.getScheme(); - if (!"ws".equals(scheme) && !"wss".equals(scheme)) { - throw new IllegalArgumentException("Scheme: " + scheme); - } - boolean ssl = scheme.length() == 3; - int port = uri.getPort(); - if (port == -1) port = ssl ? 443 : 80; - StringBuilder relativeUri = new StringBuilder(); - if (uri.getRawPath() != null) { - relativeUri.append(uri.getRawPath()); - } - if (uri.getRawQuery() != null) { - relativeUri.append('?').append(uri.getRawQuery()); - } - if (uri.getRawFragment() != null) { - relativeUri.append('#').append(uri.getRawFragment()); - } - return new WebSocketConnectOptions() - .setHost(uri.getHost()) - .setPort(port).setSsl(ssl) - .setURI(relativeUri.toString()) - .setHeaders(headers) - .setVersion(version) - .setSubProtocols(subProtocols); - } - - public Future webSocketAbs(String url, MultiMap headers, WebSocketVersion version, List subProtocols) { - return webSocket(webSocketConnectOptionsAbs(url, headers, version, subProtocols)); + return webSocket(vertx.getOrCreateContext().unwrap(), options); } - Future webSocket(ContextInternal ctx, WebSocketConnectOptions connectOptions) { + private Future webSocket(ContextInternal ctx, WebSocketConnectOptions connectOptions) { PromiseInternal promise = ctx.promise(); webSocket(ctx, connectOptions, promise); return promise.andThen(ar -> { diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java b/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java index 35bf96ce98f..42f502d2ee9 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java @@ -25,6 +25,7 @@ import io.vertx.core.MultiMap; import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.impl.EventBusInternal; import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.Message; @@ -88,6 +89,9 @@ public abstract class WebSocketImplBase implements WebSocke int maxWebSocketFrameSize, int maxWebSocketMessageSize, boolean registerWebSocketWriteHandlers) { + if (context.isDuplicate()) { + throw new IllegalArgumentException(); + } this.supportsContinuation = supportsContinuation; if (registerWebSocketWriteHandlers) { textHandlerID = "__vertx.ws." + UUID.randomUUID(); @@ -117,12 +121,12 @@ protected void handleMessage(WebSocketFrameInternal msg) { this.headers = headers; } - void registerHandler(EventBus eventBus) { + void registerHandler(EventBusInternal eventBus) { if (binaryHandlerID != null) { Handler> binaryHandler = msg -> writeBinaryFrameInternal(msg.body()); Handler> textHandler = msg -> writeTextFrameInternal(msg.body()); - binaryHandlerRegistration = eventBus.localConsumer(binaryHandlerID).handler(binaryHandler); - textHandlerRegistration = eventBus.localConsumer(textHandlerID).handler(textHandler); + binaryHandlerRegistration = eventBus.localConsumer(context, binaryHandlerID).handler(binaryHandler); + textHandlerRegistration = eventBus.localConsumer(context, textHandlerID).handler(textHandler); } } diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java index 7e9f3397bd1..6b662696494 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -338,7 +338,8 @@ public TimeUnit maxEventLoopExecTimeUnit() { @Override public DatagramSocket createDatagramSocket(DatagramSocketOptions options) { CloseFuture closeFuture = new CloseFuture(log); - DatagramSocketImpl so = DatagramSocketImpl.create(this, closeFuture, options); + ContextInternal context = getOrCreateContext().unwrap(); + DatagramSocketImpl so = DatagramSocketImpl.create(context, closeFuture, options); closeFuture.add(so); CloseFuture fut = resolveCloseFuture(); fut.add(closeFuture); @@ -426,7 +427,7 @@ public HttpClientBuilder httpClientBuilder() { return new HttpClientBuilderInternal(this); } - public EventBus eventBus() { + public EventBusInternal eventBus() { return eventBus; } diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java index c432f963398..ac94c7d07fa 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java @@ -14,6 +14,8 @@ import io.netty.channel.EventLoopGroup; import io.vertx.core.*; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.impl.EventBusInternal; import io.vertx.core.impl.*; import io.vertx.core.internal.deployment.DeploymentManager; import io.vertx.core.internal.resolver.NameResolver; @@ -102,6 +104,9 @@ default NetServerInternal createNetServer() { @Override ContextInternal getOrCreateContext(); + @Override + EventBusInternal eventBus(); + EventLoopGroup eventLoopGroup(); EventLoopGroup acceptorEventLoopGroup(); diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java index 8f93268ad00..b0a55cdf099 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java @@ -17,6 +17,7 @@ import io.vertx.core.dns.DnsClient; import io.vertx.core.dns.DnsClientOptions; import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.impl.EventBusInternal; import io.vertx.core.file.FileSystem; import io.vertx.core.http.*; import io.vertx.core.internal.deployment.DeploymentManager; @@ -106,7 +107,7 @@ public FileSystem fileSystem() { } @Override - public EventBus eventBus() { + public EventBusInternal eventBus() { return delegate.eventBus(); } diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/ConnectionBase.java b/vertx-core/src/main/java/io/vertx/core/net/impl/ConnectionBase.java index 14de56b5322..00eec0fd921 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/ConnectionBase.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/ConnectionBase.java @@ -81,6 +81,10 @@ public abstract class ConnectionBase { protected ConnectionBase(ContextInternal context, ChannelHandlerContext chctx) { + if (context.isDuplicate()) { + throw new IllegalArgumentException("Cannot accept duplicate contexts"); + } + PromiseInternal f = context.promise(); chctx .channel() diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java b/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java index f0036c8bdac..d6af716128a 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java @@ -139,7 +139,7 @@ public Future connect(SocketAddress remoteAddress, String serverName) @Override public Future connect(ConnectOptions connectOptions) { - ContextInternal context = vertx.getOrCreateContext(); + ContextInternal context = vertx.getOrCreateContext().unwrap(); Promise promise = context.promise(); connectInternal(connectOptions, options.isRegisterWriteHandler(), promise, context, options.getReconnectAttempts()); return promise.future(); @@ -237,6 +237,9 @@ private void connectInternal(ConnectOptions connectOptions, Promise connectHandler, ContextInternal context, int remainingAttempts) { + if (context.isDuplicate()) { + throw new IllegalArgumentException("Cannot accept duplicate contexts"); + } if (closeSequence.started()) { connectHandler.fail(new IllegalStateException("Client is closed")); } else { diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java b/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java index dbb9c511ebc..150c76dcdac 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java @@ -141,7 +141,8 @@ public Future shutdown(long timeout, TimeUnit unit) { @Override public Future listen(SocketAddress localAddress) { - return listen(vertx.getOrCreateContext(), localAddress); + ContextInternal context = vertx.getOrCreateContext(); + return listen(context.unwrap(), localAddress); } @Override @@ -152,6 +153,9 @@ public Future listen(ContextInternal context, SocketAddress localAddr if (handler == null) { throw new IllegalStateException("Set connect handler first"); } + if (context.isDuplicate()) { + throw new IllegalArgumentException("Duplicate context are not allowed"); + } return bind(context, localAddress).map(this); } diff --git a/vertx-core/src/test/java/io/vertx/tests/datagram/DatagramTest.java b/vertx-core/src/test/java/io/vertx/tests/datagram/DatagramTest.java index 181069ce249..b89cde14951 100644 --- a/vertx-core/src/test/java/io/vertx/tests/datagram/DatagramTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/datagram/DatagramTest.java @@ -16,6 +16,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.datagram.DatagramSocket; import io.vertx.core.datagram.DatagramSocketOptions; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.core.impl.Utils; import io.vertx.core.json.JsonObject; @@ -31,6 +32,7 @@ import java.net.InetAddress; import java.net.NetworkInterface; import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -571,4 +573,28 @@ public void start(Promise startPromise) { })); await(); } + + @Test + public void testSocketWithDuplicatedContext() { + waitFor(2); + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + Promise cont = Promise.promise(); + duplicated.runOnContext(v -> { + peer2 = vertx.createDatagramSocket(new DatagramSocketOptions()); + peer2.exceptionHandler(t -> fail(t.getMessage())); + peer2.handler(packet -> { + assertSame(context, Vertx.currentContext()); + complete(); + }); + peer2.listen(1234, "127.0.0.1").onComplete(cont); + }); + cont.future().await(); + peer1 = vertx.createDatagramSocket(new DatagramSocketOptions()); + peer1 + .send(TestUtils.randomBuffer(128), 1234, "127.0.0.1") + .onComplete(onSuccess(s -> complete())); + await(); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/eventbus/EventBusTestBase.java b/vertx-core/src/test/java/io/vertx/tests/eventbus/EventBusTestBase.java index 4f5bbf3576c..df8a6de974f 100644 --- a/vertx-core/src/test/java/io/vertx/tests/eventbus/EventBusTestBase.java +++ b/vertx-core/src/test/java/io/vertx/tests/eventbus/EventBusTestBase.java @@ -15,6 +15,7 @@ import io.vertx.core.*; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.*; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.tests.shareddata.AsyncMapTest.SomeClusterSerializableObject; @@ -28,6 +29,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -976,4 +978,29 @@ public void testConsumerUnregistrationContextCallback() throws Exception { }); await(); } + + @Test + public void testConsumerWithDuplicatedContext() { + Vertx[] vertices = vertices(2); + + ContextInternal context = (ContextInternal) vertices[0].getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + duplicated.runOnContext(v1 -> { + vertices[0] + .eventBus() + .consumer(ADDRESS1, msg -> { + ContextInternal current = ContextInternal.current(); + assertTrue("Not a duplicated context", current.isDuplicate()); + assertNull("Local map shouldn't have an entry for the key 'foo'", current.getLocal(ContextInternal.LOCAL_MAP)); + testComplete(); + }).completion() + .onComplete(onSuccess(v2 -> { + vertices[1].eventBus().send(ADDRESS1, "ping"); + })); + }); + + await(); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java index dcb1866a77a..47892116307 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java @@ -7049,4 +7049,76 @@ public void testHttpServerResponseWriteHead() throws Exception { await(); } + @Test + public void testServerWithDuplicatedContext() throws Exception { + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + server.requestHandler(req -> { + ContextInternal current = ContextInternal.current(); + assertTrue("Not a duplicated context", current.isDuplicate()); + assertNotSame("Request should be handled on a different duplicated context", duplicated, current); + assertNull("Local map shouldn't have an entry for the key 'foo'", current.getLocal(ContextInternal.LOCAL_MAP)); + req.response().end(); + }); + startServer(testAddress, duplicated); + + client.request(requestOptions) + .compose(HttpClientRequest::send) + .expecting(HttpResponseExpectation.SC_OK) + .await(); + } + + @Test + public void testClientPoolWithDuplicatedContext() throws Exception { + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + server.requestHandler(req -> { + req.response().end(); + }); + startServer(testAddress); + + duplicated.runOnContext(v -> { + client.request(requestOptions) + .compose(req -> { + HttpConnection conn = req.connection(); + conn.closeHandler(v2 -> { + assertFalse("A duplicated context", context.isDuplicate()); + testComplete(); + }); + return req + .send() + .expecting(HttpResponseExpectation.SC_OK) + .onComplete(onSuccess(v2 -> { + conn.close(); + })); + }); + }); + + await(); + } + + @Test + public void testClientConnectWithDuplicatedContext() throws Exception { + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + server.requestHandler(req -> { + req.response().end(); + }); + startServer(testAddress); + + duplicated.runOnContext(v -> { + client.connect(requestOptions).onComplete(conn -> { + assertSame(duplicated.unwrap(), Vertx.currentContext()); + testComplete(); + }); + }); + + await(); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java b/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java index 914b1c5e96d..57acb7054da 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java @@ -36,6 +36,7 @@ import io.vertx.core.http.WebSocketVersion; import io.vertx.core.http.impl.Http1xClientConnection; import io.vertx.core.http.impl.Http1xServerConnection; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.http.WebSocketInternal; import io.vertx.core.http.impl.ws.WebSocketFrameImpl; import io.vertx.core.internal.VertxInternal; @@ -85,6 +86,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -4058,4 +4060,35 @@ public void testPoolShouldNotStarveOnConnectError() throws Exception { awaitLatch(latch, 10, TimeUnit.SECONDS); } + + @Test + public void testClientWebSocketWithDuplicatedContext1() { + testClientWebSocketWithDuplicatedContext(() -> client.connect(DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/")); + } + + @Test + public void testClientWebSocketWithDuplicatedContext2() { + testClientWebSocketWithDuplicatedContext(() -> client.webSocket().connect(DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/")); + } + + private void testClientWebSocketWithDuplicatedContext(Supplier> sup) { + server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT)).webSocketHandler(ws -> { + ws.write(Buffer.buffer("ping")); + }); + server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST).await(); + client = vertx.createWebSocketClient(); + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.runOnContext(v -> { + sup.get() + .onComplete(onSuccess(ws -> { + assertSame(context, Vertx.currentContext()); + ws.handler(data -> { + assertSame(context, Vertx.currentContext()); + testComplete(); + }); + })); + }); + await(); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java b/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java index ac5b1e3e12b..cdb39a331c9 100755 --- a/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java @@ -30,6 +30,7 @@ import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.http.*; import io.vertx.core.impl.Utils; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.VertxInternal; import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.core.internal.net.NetClientInternal; @@ -4659,4 +4660,22 @@ public void testSNIServerSSLEnginePeerHost() throws Exception { assertEquals("host2.com", test.indicatedServerName); assertTrue("X509ExtendedKeyManager.chooseEngineServerAlias is not called", called.get()); } + + @Test + public void testServerWithDuplicatedContext() throws Exception { + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + server.connectHandler(so -> { + ContextInternal current = ContextInternal.current(); + assertFalse("A duplicated context", current.isDuplicate()); + assertNull("Local map shouldn't have an entry for the key 'foo'", current.getLocal(ContextInternal.LOCAL_MAP)); + so.end(); + }); + startServer(testAddress, duplicated); + + NetSocket so = client.connect(testAddress).await(); + so.end().await(); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/timer/TimerTest.java b/vertx-core/src/test/java/io/vertx/tests/timer/TimerTest.java index 5b4328e898e..fb64ed4d908 100644 --- a/vertx-core/src/test/java/io/vertx/tests/timer/TimerTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/timer/TimerTest.java @@ -19,7 +19,9 @@ import io.vertx.test.core.VertxTestBase; import org.junit.Test; +import java.util.Collections; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -320,6 +322,36 @@ public void handle(Long l) { await(); } + @Test + public void testPeriodicWithDuplicateContext() { + + waitFor(2); + + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + duplicated.runOnContext(v -> { + vertx.setPeriodic(10, id -> { + ContextInternal current = (ContextInternal) Vertx.currentContext(); + assertTrue("Not a duplicated context", current.isDuplicate()); + assertNull("Local map shouldn't have an entry for the key 'foo'", current.getLocal(ContextInternal.LOCAL_MAP)); + vertx.cancelTimer(id); + complete(); + }); + }); + + duplicated.setPeriodic(10, id -> { + ContextInternal current = (ContextInternal) Vertx.currentContext(); + assertTrue("Not a duplicated context", current.isDuplicate()); + assertNull("Local map shouldn't have an entry for the key 'foo'", current.getLocal(ContextInternal.LOCAL_MAP)); + vertx.cancelTimer(id); + complete(); + }); + + await(); + } + @Repeat(times = 100) @Test public void testRaceWhenTimerCreatedOutsideEventLoop() {