-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Description
Version
4.5.12
Context
Before 4.5.11, when duplicating a duplicated context, it always have empty local context data at beginning, just like what the Javadoc says.
However, in 4.5.12, due to the changes in #5441, the local context data is copied.
There are 2 concerns here:
-
It may break existing code that rely on empty local context on duplicating
DuplicatedContext
In our use case, we put a map in the local context data (usinggetLocal()/putLocal(), we used it beforeContextLocalwas introduced) to track some data.
We were expecting duplicatingDuplicatedContextalso starts with empty local context data, just like duplicatingContextImpl, so that our map can track only what’s run on the current duplicated context.
However, after the behavior change in 4.5.12, since the local context data is copied to newDuplicatedContextwhen duplicating fromDuplicatedContext, we got many duplicated contexts referencing same map instance and messed things up.One option for us could be clearing the local context data on any places that may produce duplicated context (e.g., every message consumer), but it requires lots of changes, and having unnecessary overhead of copying and then clearing local context data.
Another option for us is to move our map to a separateContextLocal, and customize a duplicator to always return null. (a minor problem is that if such local data is only used in a small part of contexts, the overhead of reserving one more slot for every context/duplicated context may not worth it)Why changing the behavior of existing
getLocal()/putLocal()use cases?
Can the duplicator ofContextInternal.LOCAL_MAPbe changed to always return null?
Or at least support configuring the default behavior of duplicating context to not copying? -
Inconsistent behavior between duplicating
ContextImplandDuplicatedContext
Before 4.5.12, the behavior was consistent – it always create aDuplicatedContextwith no local context data.
In 4.5.12, duplicatingContextImplwill not copy the local context data inContextImplintoDuplicatedContext, while duplicatingDuplicatedContextwill copy the local context data to the newDuplicatedContext.This is really confusing me... is it by design to not allow the customized duplicator of
ContextLocalto not work when duplicating fromContextImpl?
Do you have a reproducer?
Tried to create a unit test to illustrate one kind of use cases:
@Test
public void testLocalsForDuplicatedContext() {
Logger LOG = LogManager.getLogger(ContextTest.class);
final String RESOURCE_TRACKER_KEY = "resource-tracker-key";
final String COMMAND_ADDRESS = "command-address";
final String EVENT_ADDRESS_PREFIX = "event-address-";
final String RESOURCE_1 = "resource-1";
final String RESOURCE_2 = "resource-2";
final String COMMAND_ID = "command-id-1";
final Supplier<ConcurrentHashMap<String, Integer>> getResourceTracker = () -> {
Context ctx = vertx.getOrCreateContext();
ConcurrentHashMap<String, Integer> ret = ctx.getLocal(RESOURCE_TRACKER_KEY);
if (ret == null) {
LOG.info("Creating resource tracker, context: {}", ContextInternal.current());
ret = new ConcurrentHashMap<>();
ctx.putLocal(RESOURCE_TRACKER_KEY, ret);
}
return ret;
};
final AtomicReference<String> errorMessage = new AtomicReference<>();
final Consumer<String> useResource = resource -> {
ConcurrentHashMap<String, Integer> resourceTracker = getResourceTracker.get();
LOG.info("Using {}, tracker: {}, context: {}", resource, resourceTracker, ContextInternal.current());
if (resourceTracker.containsKey(resource)) {
errorMessage.set("Resource " + resource + " is already in use");
}
resourceTracker.merge(resource, 1, Integer::sum);
};
final Consumer<String> releaseResource = resource -> {
ConcurrentHashMap<String, Integer> resourceTracker = getResourceTracker.get();
LOG.info("Releasing {}, tracker: {}, context: {}", resource, resourceTracker, ContextInternal.current());
resourceTracker.computeIfPresent(resource, (k, v) -> v == 1 ? null : v - 1);
};
vertx.eventBus().<String>localConsumer(COMMAND_ADDRESS, msg -> {
String id = msg.body();
LOG.info("Received command {}, context: {}", id, ContextInternal.current());
AtomicBoolean isCancelled = new AtomicBoolean();
useResource.accept(RESOURCE_1);
releaseResource.accept(RESOURCE_1);
MessageConsumer<String> eventConsumer = vertx.eventBus().consumer(EVENT_ADDRESS_PREFIX + id, event -> {
LOG.info("Received event {}, context: {}", event.body(), ContextInternal.current());
isCancelled.set(true);
useResource.accept(RESOURCE_2);
releaseResource.accept(RESOURCE_2);
});
eventConsumer.completionHandler(ar -> {
assertTrue(ar.succeeded());
vertx.executeBlocking(() -> {
useResource.accept(RESOURCE_2);
try {
// simulate some works
for (int i = 0; i < 200; i++) {
if (isCancelled.get()) {
return "cancelled";
}
Thread.sleep(10);
}
return "done";
} finally {
releaseResource.accept(RESOURCE_2);
}
}).onComplete(ar2 -> {
eventConsumer.unregister();
if (ar2.succeeded()) {
msg.reply(ar2.result());
} else {
msg.reply("failed: " + ar2.cause().getMessage());
}
});
});
}).completionHandler(ar -> {
assertTrue(ar.succeeded());
vertx.eventBus().request(COMMAND_ADDRESS, COMMAND_ID, ar1 -> {
assertTrue(ar1.succeeded());
assertEquals("cancelled", ar1.result().body());
assertNull(errorMessage.get());
testComplete();
});
vertx.setTimer(500, i -> vertx.eventBus().publish(EVENT_ADDRESS_PREFIX + COMMAND_ID, "cancel"));
});
await();
}In 4.5.11, it passes, and in 4.5.12, it fails with:
java.lang.AssertionError: expected null, but was:<Resource resource-2 is already in use>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotNull(Assert.java:756)
at org.junit.Assert.assertNull(Assert.java:738)
at org.junit.Assert.assertNull(Assert.java:748)
at io.vertx.test.core.AsyncTestBase.assertNull(AsyncTestBase.java:250)
at io.vertx.core.ContextTest.lambda$testLocalsForDuplicatedContext$141(ContextTest.java:1290)
at io.vertx.core.impl.future.FutureImpl$4.onSuccess(FutureImpl.java:176)
at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:60)
at io.vertx.core.impl.ContextImpl.execute(ContextImpl.java:312)
at io.vertx.core.impl.ContextImpl.execute(ContextImpl.java:302)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:57)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:259)
at io.vertx.core.Promise.complete(Promise.java:66)
at io.vertx.core.eventbus.impl.ReplyHandler.dispatch(ReplyHandler.java:97)
at io.vertx.core.eventbus.impl.HandlerRegistration$InboundDeliveryContext.execute(HandlerRegistration.java:137)
at io.vertx.core.eventbus.impl.DeliveryContextBase.next(DeliveryContextBase.java:80)
at io.vertx.core.eventbus.impl.DeliveryContextBase.dispatch(DeliveryContextBase.java:43)
at io.vertx.core.eventbus.impl.HandlerRegistration.dispatch(HandlerRegistration.java:98)
at io.vertx.core.eventbus.impl.ReplyHandler.doReceive(ReplyHandler.java:81)
at io.vertx.core.eventbus.impl.HandlerRegistration.lambda$receive$0(HandlerRegistration.java:49)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)