Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
*/
public class DatagramSocketImpl implements DatagramSocket, MetricsProvider, Closeable {

public static DatagramSocketImpl create(VertxInternal vertx, CloseFuture closeFuture, DatagramSocketOptions options) {
public static DatagramSocketImpl create(ContextInternal vertx, CloseFuture closeFuture, DatagramSocketOptions options) {
DatagramSocketImpl socket = new DatagramSocketImpl(vertx, closeFuture, options);
// Make sure object is fully initiliased to avoid race with async registration
socket.init();
Expand All @@ -65,19 +65,18 @@ public static DatagramSocketImpl create(VertxInternal vertx, CloseFuture closeFu
private Handler<Throwable> exceptionHandler;
private final CloseFuture closeFuture;

private DatagramSocketImpl(VertxInternal vertx, CloseFuture closeFuture, DatagramSocketOptions options) {
Transport transport = vertx.transport();
private DatagramSocketImpl(ContextInternal context, CloseFuture closeFuture, DatagramSocketOptions options) {
Transport transport = context.owner().transport();
DatagramChannel channel = transport.datagramChannel(options.isIpV6() ? InternetProtocolFamily.IPv6 : InternetProtocolFamily.IPv4);
transport.configure(channel, new DatagramSocketOptions(options));
ContextInternal context = vertx.getOrCreateContext();
channel.config().setOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
MaxMessagesRecvByteBufAllocator bufAllocator = channel.config().getRecvByteBufAllocator();
bufAllocator.maxMessagesPerRead(1);
context.nettyEventLoop().register(channel);
if (options.getLogActivity()) {
channel.pipeline().addLast("logging", new LoggingHandler(options.getActivityLogDataFormat()));
}
VertxMetrics metrics = vertx.metrics();
VertxMetrics metrics = context.owner().metrics();
this.metrics = metrics != null ? metrics.createDatagramSocketMetrics(options) : null;
this.channel = channel;
this.context = context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public DnsClientImpl(VertxInternal vertx, DnsClientOptions options) {
this.vertx = vertx;
}

private DnsNameResolver resolver(ContextInternal ctx, InternetProtocolFamily ipFamily) {
EventLoop el = ctx.nettyEventLoop();
private DnsNameResolver resolver(EventLoop el, InternetProtocolFamily ipFamily) {
DnsNameResolver resolver;
synchronized (this) {
if (closed) {
Expand Down Expand Up @@ -169,7 +168,7 @@ public Future<List<SrvRecord>> resolveSRV(String name) {
private Future<List<String>> resolveAll(String name, InternetProtocolFamily ipFamily) {
Objects.requireNonNull(name);
ContextInternal ctx = vertx.getOrCreateContext();
DnsNameResolver resolver = resolver(ctx, ipFamily);
DnsNameResolver resolver = resolver(ctx.nettyEventLoop(), ipFamily);
if (resolver == null) {
return ctx.failedFuture("DNS client is closed");
}
Expand Down Expand Up @@ -200,7 +199,7 @@ private Future<List<String>> resolveAll(String name, InternetProtocolFamily ipFa
private <T> Future<List<T>> queryAll(String name, DnsRecordType recordType, Function<DnsRecord, T> mapper) {
Objects.requireNonNull(name);
ContextInternal ctx = vertx.getOrCreateContext();
DnsNameResolver resolver = resolver(ctx, InternetProtocolFamily.IPv4);
DnsNameResolver resolver = resolver(ctx.nettyEventLoop(), InternetProtocolFamily.IPv4);
if (resolver == null) {
return ctx.failedFuture("DNS client is closed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.vertx.core.eventbus.impl.DefaultSerializableChecker;
import io.vertx.core.metrics.Measured;

import java.util.Objects;
import java.util.function.Function;

import static io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE;
Expand Down Expand Up @@ -175,7 +176,12 @@ default <T> Future<Message<T>> request(String address, @Nullable Object message)
* @param handler the handler that will process the received messages
* @return the event bus message consumer
*/
<T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler);
default <T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler) {
Objects.requireNonNull(handler, "handler");
MessageConsumer<T> consumer = localConsumer(address);
consumer.handler(handler);
return consumer;
}

/**
* Create a message sender against the specified address.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public <T> MessageConsumer<T> consumer(MessageConsumerOptions options, Handler<M
public <T> MessageConsumer<T> consumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, false, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
return new MessageConsumerImpl<>(vertx.getOrCreateContext().unwrap(), this, address, false, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
}

@Override
Expand All @@ -200,17 +200,18 @@ public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handl

@Override
public <T> MessageConsumer<T> localConsumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, true, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
return localConsumer(vertx.getOrCreateContext(), address);
}

@Override
public <T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler) {
Objects.requireNonNull(handler, "handler");
MessageConsumer<T> consumer = localConsumer(address);
consumer.handler(handler);
return consumer;
public <T> MessageConsumer<T> localConsumer(Context context, String address) {
checkStarted();
Objects.requireNonNull(context, "context");
Objects.requireNonNull(address, "address");
if (context.owner() != vertx) {
throw new IllegalArgumentException("Invalid context instance");
}
return new MessageConsumerImpl<>((ContextInternal) context, this, address, true, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@

package io.vertx.core.eventbus.impl;

import io.vertx.core.Context;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageConsumerOptions;

import java.util.Objects;

public interface EventBusInternal extends EventBus {

Expand All @@ -21,6 +26,8 @@ public interface EventBusInternal extends EventBus {
*/
void start(Promise<Void> promise);

<T> MessageConsumer<T> localConsumer(Context context, String address);

/**
* Close the event bus and release any resources held.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public class MessageConsumerImpl<T> extends HandlerRegistration<T> implements Me

MessageConsumerImpl(ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly, int maxBufferedMessages) {
super(context, eventBus, address, false);

if (context.isDuplicate()) {
throw new IllegalArgumentException("Duplicated context are not allowed");
}

this.localOnly = localOnly;
this.result = context.promise();
this.maxBufferedMessages = maxBufferedMessages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class ClientWebSocketImpl implements ClientWebSocketInternal {

@Override
public Future<WebSocket> connect(WebSocketConnectOptions options) {
return connect(client.vertx().getOrCreateContext(), options);
return connect(client.vertx().getOrCreateContext().unwrap(), options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ void createWebSocket(Http1xServerRequest request, PromiseInternal<ServerWebSocke
promise.fail(e);
return;
}
promise.complete(new ServerWebSocketHandshaker(request, handshaker, options));
promise.complete(new ServerWebSocketHandshaker(request, promise.context(), handshaker, options));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,22 +436,14 @@ public String getFormAttribute(String attributeName) {

@Override
public Future<ServerWebSocket> toWebSocket() {
return webSocketHandshake().compose(handshake -> handshake.accept());
}

/**
* @return a future of the un-accepted WebSocket
*/
Future<ServerWebSocketHandshake> webSocketHandshake() {
PromiseInternal<ServerWebSocketHandshake> promise = context.promise();
webSocketHandshake(promise);
return promise.future();
return webSocketHandshake(context.unwrap()).compose(handshake -> handshake.accept());
}

/**
* Handle the request when a WebSocket upgrade header is present.
*/
private void webSocketHandshake(PromiseInternal<ServerWebSocketHandshake> promise) {
Future<ServerWebSocketHandshake> webSocketHandshake(ContextInternal context) {
PromiseInternal<ServerWebSocketHandshake> promise = context.promise();
BufferInternal body = BufferInternal.buffer();
boolean[] failed = new boolean[1];
handler(buff -> {
Expand Down Expand Up @@ -482,6 +474,7 @@ private void webSocketHandshake(PromiseInternal<ServerWebSocketHandshake> promis
});
// In case we were paused
resume();
return promise.future();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void handle(HttpServerRequest req) {
if (wsHandler != null || wsHandshakeHandler != null) {
if (req.headers().contains(UPGRADE, WEBSOCKET, true) && handlers.server.wsAccept()) {
// Missing upgrade header + null request handler will be handled when creating the handshake by sending a 400 error
((Http1xServerRequest)req).webSocketHandshake().onComplete(ar -> {
((Http1xServerRequest)req).webSocketHandshake(((Http1xServerRequest)req).context.unwrap()).onComplete(ar -> {
if (ar.succeeded()) {
ServerWebSocketHandshaker handshake = (ServerWebSocketHandshaker) ar.result();
if (wsHandshakeHandler == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ public Future<HttpClientConnectionInternal> wrap(ContextInternal context, NetSoc
}

public Future<HttpClientConnectionInternal> httpConnect(ContextInternal context) {
if (context.isDuplicate()) {
throw new IllegalArgumentException("Cannot accept duplicate contexts");
}
Promise<NetSocket> promise = context.promise();
Future<NetSocket> future = promise.future();
// We perform the compose operation before calling connect to be sure that the composition happens
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public Future<HttpClientConnection> connect(HttpConnectOptions connect) {
server,
false,
0);
return (Future) connector.httpConnect(vertx.getOrCreateContext()).map(conn -> new UnpooledHttpClientConnection(conn).init());
return (Future) connector.httpConnect(vertx.getOrCreateContext().unwrap()).map(conn -> new UnpooledHttpClientConnection(conn).init());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,27 @@ public Future<HttpServer> listen() {
}

@Override
public synchronized Future<HttpServer> listen(SocketAddress address) {
public Future<HttpServer> listen(SocketAddress address) {
ContextInternal context = vertx.getOrCreateContext();
ContextInternal listenContext;
if (context.isEventLoopContext()) {
listenContext = context.unwrap();
} else {
listenContext = context.toBuilder()
.withThreadingModel(ThreadingModel.EVENT_LOOP)
.build();
}
Promise<HttpServer> promise = context.promise();
Supplier<ContextInternal> streamContextSupplier = context.unwrap()::duplicate;
listen(listenContext, context.threadingModel(), streamContextSupplier, address, promise);
return promise.future();
}

private synchronized void listen(ContextInternal listenContext,
ThreadingModel threadingModel,
Supplier<ContextInternal> streamContextSupplier,
SocketAddress address,
Promise<HttpServer> promise) {
if (requestHandler == null && webSocketHandler == null && webSocketHandhakeHandler == null) {
throw new IllegalStateException("Set request or WebSocket handler first");
}
Expand All @@ -181,23 +201,12 @@ public synchronized Future<HttpServer> listen(SocketAddress address) {
if (tcpOptions.getSslOptions() != null) {
configureApplicationLayerProtocols(tcpOptions.getSslOptions());
}
ContextInternal context = vertx.getOrCreateContext();
ContextInternal listenContext;
// Not sure of this
if (context.isEventLoopContext()) {
listenContext = context;
} else {
listenContext = context.toBuilder()
.withThreadingModel(ThreadingModel.EVENT_LOOP)
.build();
}
NetServerInternal server = vertx.createNetServer(tcpOptions);
Handler<Throwable> h = exceptionHandler;
Handler<Throwable> exceptionHandler = h != null ? h : DEFAULT_EXCEPTION_HANDLER;
server.exceptionHandler(exceptionHandler);
server.connectHandler(so -> {
NetSocketImpl soi = (NetSocketImpl) so;
Supplier<ContextInternal> streamContextSupplier = context::duplicate;
String host = address.isInetSocket() ? address.host() : "localhost";
int port = address.port();
String serverOrigin = (tcpOptions.isSsl() ? "https" : "http") + "://" + host + ":" + port;
Expand All @@ -211,7 +220,7 @@ public synchronized Future<HttpServer> listen(SocketAddress address) {
exceptionHandler);
HttpServerConnectionInitializer initializer = new HttpServerConnectionInitializer(
listenContext,
context.threadingModel(),
threadingModel,
streamContextSupplier,
this,
vertx,
Expand All @@ -224,15 +233,7 @@ public synchronized Future<HttpServer> listen(SocketAddress address) {
});
tcpServer = server;
closeSequence = new CloseSequence(p -> doClose(server, p), p -> doShutdown(server, p ));
Promise<HttpServer> result = context.promise();
tcpServer.listen(listenContext, address).onComplete(ar -> {
if (ar.succeeded()) {
result.complete(this);
} else {
result.fail(ar.cause());
}
});
return result.future();
tcpServer.listen(listenContext, address).map(this).onComplete(promise);
}

private void doShutdown(NetServer netServer, Completable<Void> p) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public class ServerWebSocketHandshaker extends FutureImpl<ServerWebSocket> imple
private final WebSocketServerHandshaker handshaker;
private boolean done;

public ServerWebSocketHandshaker(Http1xServerRequest request, WebSocketServerHandshaker handshaker, HttpServerOptions options) {
super(request.context);
public ServerWebSocketHandshaker(Http1xServerRequest request, ContextInternal context, WebSocketServerHandshaker handshaker, HttpServerOptions options) {
super(context);
this.request = request;
this.handshaker = handshaker;
this.options = options;
Expand Down Expand Up @@ -94,7 +94,7 @@ public Future<ServerWebSocket> accept() {
}
ServerWebSocket ws;
try {
ws = acceptHandshake();
ws = acceptHandshake(context);
} catch (Exception e) {
return rejectHandshake(BAD_REQUEST.code())
.transform(ar -> {
Expand Down Expand Up @@ -159,7 +159,7 @@ private Future<Void> rejectHandshake(int sc) {
return response.setStatusCode(sc).end(status.reasonPhrase());
}

private ServerWebSocket acceptHandshake() {
private ServerWebSocket acceptHandshake(ContextInternal context) {
Http1xServerConnection httpConn = (Http1xServerConnection) request.connection();
ChannelHandlerContext chctx = httpConn.channelHandlerContext();
Channel channel = chctx.channel();
Expand All @@ -175,9 +175,9 @@ private ServerWebSocket acceptHandshake() {
}
VertxHandler<WebSocketConnectionImpl> handler = VertxHandler.create(ctx -> {
long closingTimeoutMS = options.getWebSocketClosingTimeout() >= 0 ? options.getWebSocketClosingTimeout() * 1000L : 0L;
WebSocketConnectionImpl webSocketConn = new WebSocketConnectionImpl(request.context, ctx, true, closingTimeoutMS,httpConn.metrics);
WebSocketConnectionImpl webSocketConn = new WebSocketConnectionImpl(context, ctx, true, closingTimeoutMS,httpConn.metrics);
ServerWebSocketImpl webSocket = new ServerWebSocketImpl(
request.context(),
context,
webSocketConn,
handshaker.version() != WebSocketVersion.V00,
request,
Expand Down
Loading
Loading