Skip to content

Commit d2fcda6

Browse files
authored
- allows for local only codec lookup when in clustered mode (#4952)
* - allows for local only codec lookup when in clustered mode * - is local decided by caller
1 parent a3aaa0a commit d2fcda6

File tree

4 files changed

+26
-19
lines changed

4 files changed

+26
-19
lines changed

src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,14 @@ public EventBus send(String address, Object message) {
119119

120120
@Override
121121
public EventBus send(String address, Object message, DeliveryOptions options) {
122-
MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
122+
MessageImpl msg = createMessage(true, isLocalOnly(options), address, options.getHeaders(), message, options.getCodecName());
123123
sendOrPubInternal(msg, options, null);
124124
return this;
125125
}
126126

127127
@Override
128128
public <T> Future<Message<T>> request(String address, Object message, DeliveryOptions options) {
129-
MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
129+
MessageImpl msg = createMessage(true, isLocalOnly(options), address, options.getHeaders(), message, options.getCodecName());
130130
ReplyHandler<T> handler = createReplyHandler(msg, true, options);
131131
sendOrPubInternal(msg, options, handler);
132132
return handler.result();
@@ -165,15 +165,15 @@ public EventBus publish(String address, Object message) {
165165

166166
@Override
167167
public EventBus publish(String address, Object message, DeliveryOptions options) {
168-
sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null);
168+
sendOrPubInternal(createMessage(false, isLocalOnly(options), address, options.getHeaders(), message, options.getCodecName()), options, null);
169169
return this;
170170
}
171171

172172
@Override
173173
public <T> MessageConsumer<T> consumer(String address) {
174174
checkStarted();
175175
Objects.requireNonNull(address, "address");
176-
return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, false);
176+
return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, false);
177177
}
178178

179179
@Override
@@ -188,7 +188,7 @@ public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handl
188188
public <T> MessageConsumer<T> localConsumer(String address) {
189189
checkStarted();
190190
Objects.requireNonNull(address, "address");
191-
return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, true);
191+
return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, true);
192192
}
193193

194194
@Override
@@ -253,9 +253,9 @@ public EventBusMetrics<?> getMetrics() {
253253
return metrics;
254254
}
255255

256-
public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
256+
public MessageImpl createMessage(boolean send, boolean localOnly, String address, MultiMap headers, Object body, String codecName) {
257257
Objects.requireNonNull(address, "no null address accepted");
258-
MessageCodec codec = codecManager.lookupCodec(body, codecName, true);
258+
MessageCodec codec = codecManager.lookupCodec(body, codecName, localOnly);
259259
@SuppressWarnings("unchecked")
260260
MessageImpl msg = new MessageImpl(address, headers, body, codec, send, this);
261261
return msg;
@@ -382,7 +382,7 @@ protected ReplyException deliverMessageLocally(MessageImpl msg) {
382382
if (metrics != null) {
383383
metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, handlers.size());
384384
}
385-
for (HandlerHolder holder: handlers) {
385+
for (HandlerHolder holder : handlers) {
386386
if (messageLocal || !holder.isLocalOnly()) {
387387
holder.handler.receive(msg.copyBeforeReceive());
388388
}
@@ -412,8 +412,8 @@ protected String generateReplyAddress() {
412412
}
413413

414414
<T> ReplyHandler<T> createReplyHandler(MessageImpl message,
415-
boolean src,
416-
DeliveryOptions options) {
415+
boolean src,
416+
DeliveryOptions options) {
417417
long timeout = options.getSendTimeout();
418418
String replyAddress = generateReplyAddress();
419419
message.setReplyAddress(replyAddress);
@@ -423,7 +423,7 @@ <T> ReplyHandler<T> createReplyHandler(MessageImpl message,
423423
}
424424

425425
public <T> OutboundDeliveryContext<T> newSendContext(MessageImpl message, DeliveryOptions options,
426-
ReplyHandler<T> handler) {
426+
ReplyHandler<T> handler) {
427427
return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler);
428428
}
429429

@@ -435,7 +435,7 @@ public <T> void sendOrPubInternal(OutboundDeliveryContext<T> senderCtx) {
435435
}
436436

437437
public <T> Future<Void> sendOrPubInternal(MessageImpl message, DeliveryOptions options,
438-
ReplyHandler<T> handler) {
438+
ReplyHandler<T> handler) {
439439
checkStarted();
440440
OutboundDeliveryContext<T> ctx = newSendContext(message, options, handler);
441441
sendOrPubInternal(ctx);
@@ -478,7 +478,7 @@ private void removeInterceptor(AtomicReferenceFieldUpdater<EventBusImpl, Handler
478478
while (true) {
479479
Handler[] interceptors = updater.get(this);
480480
int idx = -1;
481-
for (int i = 0;i < interceptors.length;i++) {
481+
for (int i = 0; i < interceptors.length; i++) {
482482
if (interceptors[i].equals(interceptor)) {
483483
idx = i;
484484
break;
@@ -495,5 +495,12 @@ private void removeInterceptor(AtomicReferenceFieldUpdater<EventBusImpl, Handler
495495
}
496496
}
497497
}
498+
499+
private boolean isLocalOnly(DeliveryOptions options) {
500+
if (vertx.isClustered()) {
501+
return options.isLocalOnly();
502+
}
503+
return true;
504+
}
498505
}
499506

src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions op
119119
}
120120

121121
protected MessageImpl createReply(Object message, DeliveryOptions options) {
122-
MessageImpl reply = bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName());
122+
MessageImpl reply = bus.createMessage(true, isLocal(), replyAddress, options.getHeaders(), message, options.getCodecName());
123123
reply.trace = trace;
124124
return reply;
125125
}

src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,9 @@
1212
package io.vertx.core.eventbus.impl;
1313

1414
import io.vertx.core.Future;
15-
import io.vertx.core.Promise;
1615
import io.vertx.core.Vertx;
1716
import io.vertx.core.eventbus.*;
1817
import io.vertx.core.impl.ContextInternal;
19-
import io.vertx.core.impl.VertxInternal;
2018

2119
/**
2220
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
@@ -27,6 +25,7 @@ public class MessageProducerImpl<T> implements MessageProducer<T> {
2725
private final EventBusImpl bus;
2826
private final boolean send;
2927
private final String address;
28+
private final boolean localOnly;
3029
private DeliveryOptions options;
3130

3231
public MessageProducerImpl(Vertx vertx, String address, boolean send, DeliveryOptions options) {
@@ -35,6 +34,7 @@ public MessageProducerImpl(Vertx vertx, String address, boolean send, DeliveryOp
3534
this.address = address;
3635
this.send = send;
3736
this.options = options;
37+
this.localOnly = vertx.isClustered() ? options.isLocalOnly() : true;
3838
}
3939

4040
@Override
@@ -45,7 +45,7 @@ public synchronized MessageProducer<T> deliveryOptions(DeliveryOptions options)
4545

4646
@Override
4747
public Future<Void> write(T body) {
48-
MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), body, options.getCodecName());
48+
MessageImpl msg = bus.createMessage(send, localOnly, address, options.getHeaders(), body, options.getCodecName());
4949
return bus.sendOrPubInternal(msg, options, null);
5050
}
5151

src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,9 @@ public void close(Promise<Void> promise) {
145145
}
146146

147147
@Override
148-
public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
148+
public MessageImpl createMessage(boolean send, boolean local, String address, MultiMap headers, Object body, String codecName) {
149149
Objects.requireNonNull(address, "no null address accepted");
150-
MessageCodec codec = codecManager.lookupCodec(body, codecName, false);
150+
MessageCodec codec = codecManager.lookupCodec(body, codecName, local);
151151
@SuppressWarnings("unchecked")
152152
ClusteredMessage msg = new ClusteredMessage(nodeId, address, headers, body, codec, send, this);
153153
return msg;

0 commit comments

Comments
 (0)