Skip to content

Commit 95cb139

Browse files
authored
Add unsetCurrentContext to ContextPropagator interface (#898)
* Add unsetCurrentContext to ContextPropagator interface * add unit test for unset * fix order in unset context in workflow thread
1 parent 794dc95 commit 95cb139

File tree

7 files changed

+70
-17
lines changed

7 files changed

+70
-17
lines changed

src/main/java/com/uber/cadence/context/ContextPropagator.java

+3
Original file line numberDiff line numberDiff line change
@@ -136,4 +136,7 @@ public interface ContextPropagator {
136136

137137
/** Sets the current context */
138138
void setCurrentContext(Object context);
139+
140+
/** Unsets the current context. This is called when the context is no longer needed */
141+
void unsetCurrentContext();
139142
}

src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java

+6
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,10 @@ public static void propagateContextToCurrentThread(Map<String, Object> contextDa
6767
}
6868
}
6969
}
70+
71+
public static void unsetCurrentContext() {
72+
for (ContextPropagator propagator : contextPropagators.get()) {
73+
propagator.unsetCurrentContext();
74+
}
75+
}
7076
}

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

+1
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public void run() {
132132
}
133133
threadContext.setUnhandledException(e);
134134
} finally {
135+
ContextThreadLocal.unsetCurrentContext();
135136
DeterministicRunnerImpl.setCurrentThreadInternal(null);
136137
threadContext.setStatus(Status.DONE);
137138
thread.setName(originalName);

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

+7
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public void handle(PollForActivityTaskResponse task) throws Exception {
174174
MDC.remove(LoggerTag.ACTIVITY_TYPE);
175175
MDC.remove(LoggerTag.WORKFLOW_ID);
176176
MDC.remove(LoggerTag.RUN_ID);
177+
unsetCurrentContext();
177178
}
178179
}
179180

@@ -200,6 +201,12 @@ void propagateContext(PollForActivityTaskResponse response) {
200201
}
201202
}
202203

204+
void unsetCurrentContext() {
205+
for (ContextPropagator propagator : options.getContextPropagators()) {
206+
propagator.unsetCurrentContext();
207+
}
208+
}
209+
203210
@Override
204211
public Throwable wrapFailure(PollForActivityTaskResponse task, Throwable failure) {
205212
WorkflowExecution execution = task.getWorkflowExecution();

src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java

+6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.uber.cadence.MarkerRecordedEventAttributes;
2323
import com.uber.cadence.PollForActivityTaskResponse;
2424
import com.uber.cadence.common.RetryOptions;
25+
import com.uber.cadence.context.ContextPropagator;
2526
import com.uber.cadence.internal.common.LocalActivityMarkerData;
2627
import com.uber.cadence.internal.metrics.MetricsTag;
2728
import com.uber.cadence.internal.metrics.MetricsType;
@@ -168,6 +169,7 @@ public void handle(Task task) throws Exception {
168169
task.eventConsumer.accept(event);
169170
} finally {
170171
span.finish();
172+
unsetCurrentContext();
171173
}
172174
}
173175

@@ -241,6 +243,10 @@ private void propagateContext(ExecuteLocalActivityParameters params) {
241243
.ifPresent(this::restoreContext);
242244
}
243245

246+
private void unsetCurrentContext() {
247+
options.getContextPropagators().forEach(ContextPropagator::unsetCurrentContext);
248+
}
249+
244250
private void restoreContext(Map<String, byte[]> context) {
245251
options
246252
.getContextPropagators()

src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,9 @@ public void setCurrentContext(Object context) {
218218
String propagatedContextName = ((Context) context).getContextName();
219219
wrapperContext.newContext(propagatedContextName);
220220
}
221+
222+
@Override
223+
public void unsetCurrentContext() {}
221224
}
222225

223226
private class WrapperContext {

src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java

+44-17
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package com.uber.cadence.internal.testing;
1919

20-
import static org.junit.Assert.assertEquals;
21-
import static org.junit.Assert.assertTrue;
22-
import static org.junit.Assert.fail;
20+
import static org.junit.Assert.*;
2321
import static org.mockito.Matchers.anyString;
2422
import static org.mockito.Mockito.mock;
2523
import static org.mockito.Mockito.when;
@@ -45,21 +43,10 @@
4543
import com.uber.cadence.testing.TestEnvironmentOptions;
4644
import com.uber.cadence.testing.TestWorkflowEnvironment;
4745
import com.uber.cadence.worker.Worker;
48-
import com.uber.cadence.workflow.ActivityTimeoutException;
49-
import com.uber.cadence.workflow.Async;
50-
import com.uber.cadence.workflow.ChildWorkflowOptions;
51-
import com.uber.cadence.workflow.ChildWorkflowTimedOutException;
52-
import com.uber.cadence.workflow.Promise;
53-
import com.uber.cadence.workflow.SignalMethod;
54-
import com.uber.cadence.workflow.Workflow;
55-
import com.uber.cadence.workflow.WorkflowMethod;
46+
import com.uber.cadence.workflow.*;
5647
import java.nio.charset.StandardCharsets;
5748
import java.time.Duration;
58-
import java.util.Collections;
59-
import java.util.List;
60-
import java.util.Map;
61-
import java.util.Optional;
62-
import java.util.UUID;
49+
import java.util.*;
6350
import java.util.concurrent.CancellationException;
6451
import java.util.concurrent.CompletableFuture;
6552
import java.util.concurrent.ExecutionException;
@@ -742,6 +729,37 @@ public void testMockedChildSimulatedTimeout() {
742729

743730
public static class TestContextPropagator implements ContextPropagator {
744731

732+
// public static class TestContext {
733+
// private final String testKey;
734+
// private final Integer level;
735+
//
736+
// public TestContext(String testKey, Integer level) {
737+
// this.testKey = testKey;
738+
// this.level = level;
739+
// }
740+
//
741+
// public String getTestKey() {
742+
// return testKey;
743+
// }
744+
// public Integer getLevel() {
745+
// return level;
746+
// }
747+
//
748+
// public Map<String, byte[]> ToMap() {
749+
// return "TestContext{testKey=" + testKey + ", level=" + level + "}";
750+
// }
751+
// }
752+
753+
private int level;
754+
755+
TestContextPropagator() {
756+
level = 0;
757+
}
758+
759+
public int getLevel() {
760+
return level;
761+
}
762+
745763
@Override
746764
public String getName() {
747765
return this.getClass().getName();
@@ -774,6 +792,13 @@ public Object getCurrentContext() {
774792
@Override
775793
public void setCurrentContext(Object context) {
776794
MDC.put("test", String.valueOf(context));
795+
level++;
796+
}
797+
798+
@Override
799+
public void unsetCurrentContext() {
800+
MDC.remove("test");
801+
level--;
777802
}
778803
}
779804

@@ -793,13 +818,15 @@ public void testWorkflowContextPropagation() {
793818
testEnvironment.start();
794819
MDC.put("test", "testing123");
795820
WorkflowClient client = testEnvironment.newWorkflowClient();
821+
TestContextPropagator propagator = new TestContextPropagator();
796822
WorkflowOptions options =
797823
new WorkflowOptions.Builder()
798-
.setContextPropagators(Collections.singletonList(new TestContextPropagator()))
824+
.setContextPropagators(Collections.singletonList(propagator))
799825
.build();
800826
TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options);
801827
String result = workflow.workflow1("input1");
802828
assertEquals("testing123", result);
829+
assertEquals(0, propagator.getLevel());
803830
}
804831

805832
public static class ContextPropagationParentWorkflowImpl implements ParentWorkflow {

0 commit comments

Comments
 (0)