Skip to content

Commit b457d0c

Browse files
KovaliovNAvancexu
authored andcommitted
Added supports contextPropagators for localActivity. (#515)
1 parent 2141e1d commit b457d0c

File tree

5 files changed

+312
-5
lines changed

5 files changed

+312
-5
lines changed

src/main/java/com/uber/cadence/activity/LocalActivityOptions.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121

2222
import com.uber.cadence.common.MethodRetry;
2323
import com.uber.cadence.common.RetryOptions;
24+
import com.uber.cadence.context.ContextPropagator;
2425
import java.time.Duration;
26+
import java.util.List;
2527
import java.util.Objects;
2628

2729
/** Options used to configure how an local activity is invoked. */
@@ -50,12 +52,14 @@ public static LocalActivityOptions merge(
5052
ActivityOptions.mergeDuration(
5153
a.scheduleToCloseTimeoutSeconds(), o.getScheduleToCloseTimeout()))
5254
.setRetryOptions(RetryOptions.merge(r, o.getRetryOptions()))
55+
.setContextPropagators(o.getContextPropagators())
5356
.validateAndBuildWithDefaults();
5457
}
5558

5659
public static final class Builder {
5760
private Duration scheduleToCloseTimeout;
5861
private RetryOptions retryOptions;
62+
private List<ContextPropagator> contextPropagators;
5963

6064
public Builder() {}
6165

@@ -83,25 +87,32 @@ public Builder setRetryOptions(RetryOptions retryOptions) {
8387
return this;
8488
}
8589

90+
public Builder setContextPropagators(List<ContextPropagator> contextPropagators) {
91+
this.contextPropagators = contextPropagators;
92+
return this;
93+
}
94+
8695
public LocalActivityOptions build() {
87-
return new LocalActivityOptions(scheduleToCloseTimeout, retryOptions);
96+
return new LocalActivityOptions(scheduleToCloseTimeout, retryOptions, contextPropagators);
8897
}
8998

9099
public LocalActivityOptions validateAndBuildWithDefaults() {
91100
RetryOptions ro = null;
92101
if (retryOptions != null) {
93102
ro = new RetryOptions.Builder(retryOptions).validateBuildWithDefaults();
94103
}
95-
return new LocalActivityOptions(roundUpToSeconds(scheduleToCloseTimeout), ro);
104+
return new LocalActivityOptions(roundUpToSeconds(scheduleToCloseTimeout), ro, contextPropagators);
96105
}
97106
}
98107

99108
private final Duration scheduleToCloseTimeout;
100109
private final RetryOptions retryOptions;
110+
private final List<ContextPropagator> contextPropagators;
101111

102-
private LocalActivityOptions(Duration scheduleToCloseTimeout, RetryOptions retryOptions) {
112+
private LocalActivityOptions(Duration scheduleToCloseTimeout, RetryOptions retryOptions, List<ContextPropagator> contextPropagators) {
103113
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
104114
this.retryOptions = retryOptions;
115+
this.contextPropagators = contextPropagators;
105116
}
106117

107118
public Duration getScheduleToCloseTimeout() {
@@ -112,6 +123,10 @@ public RetryOptions getRetryOptions() {
112123
return retryOptions;
113124
}
114125

126+
public List<ContextPropagator> getContextPropagators() {
127+
return contextPropagators;
128+
}
129+
115130
@Override
116131
public String toString() {
117132
return "LocalActivityOptions{"

src/main/java/com/uber/cadence/internal/replay/ExecuteLocalActivityParameters.java

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.uber.cadence.WorkflowExecution;
2222
import com.uber.cadence.common.RetryOptions;
2323
import java.util.Arrays;
24+
import java.util.Map;
2425

2526
public class ExecuteLocalActivityParameters {
2627

@@ -33,6 +34,7 @@ public class ExecuteLocalActivityParameters {
3334
private RetryOptions retryOptions;
3435
private long elapsedTime;
3536
private int attempt;
37+
private Map<String, byte[]> context;
3638

3739
public ExecuteLocalActivityParameters() {}
3840

@@ -230,6 +232,14 @@ public void setWorkflowExecution(WorkflowExecution workflowExecution) {
230232
this.workflowExecution = workflowExecution;
231233
}
232234

235+
public Map<String, byte[]> getContext() {
236+
return context;
237+
}
238+
239+
public void setContext(Map<String, byte[]> context) {
240+
this.context = context;
241+
}
242+
233243
@Override
234244
public String toString() {
235245
return "ExecuteLocalActivityParameters{"

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -323,11 +323,11 @@ private ExecuteActivityParameters constructExecuteActivityParameters(
323323

324324
private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
325325
String name, LocalActivityOptions options, byte[] input, long elapsed, int attempt) {
326-
ExecuteLocalActivityParameters parameters = new ExecuteLocalActivityParameters();
327-
parameters
326+
ExecuteLocalActivityParameters parameters = new ExecuteLocalActivityParameters()
328327
.withActivityType(new ActivityType().setName(name))
329328
.withInput(input)
330329
.withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds());
330+
331331
RetryOptions retryOptions = options.getRetryOptions();
332332
if (retryOptions != null) {
333333
parameters.setRetryOptions(retryOptions);
@@ -336,6 +336,11 @@ private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
336336
parameters.setElapsedTime(elapsed);
337337
parameters.setWorkflowDomain(this.context.getDomain());
338338
parameters.setWorkflowExecution(this.context.getWorkflowExecution());
339+
340+
List<ContextPropagator> propagators = Optional.ofNullable(options.getContextPropagators())
341+
.orElse(contextPropagators);
342+
parameters.setContext(extractContextsAndConvertToBytes(propagators));
343+
339344
return parameters;
340345
}
341346

src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java

+18
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.time.Duration;
3434
import java.util.Map;
3535
import java.util.Objects;
36+
import java.util.Optional;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.function.BiFunction;
3839
import java.util.function.Consumer;
@@ -163,6 +164,8 @@ private TaskHandlerImpl(ActivityTaskHandler handler) {
163164

164165
@Override
165166
public void handle(Task task) throws Exception {
167+
propagateContext(task.params);
168+
166169
task.taskStartTime = System.currentTimeMillis();
167170
ActivityTaskHandler.Result result = handleLocalActivity(task);
168171

@@ -256,4 +259,19 @@ private ActivityTaskHandler.Result handleLocalActivity(Task task) throws Interru
256259
}
257260
}
258261
}
262+
263+
private void propagateContext(ExecuteLocalActivityParameters params) {
264+
if (options.getContextPropagators() == null || options.getContextPropagators().isEmpty()) {
265+
return;
266+
}
267+
268+
Optional.ofNullable(params.getContext())
269+
.filter(context -> !context.isEmpty())
270+
.ifPresent(this::restoreContext);
271+
}
272+
273+
private void restoreContext(Map<String, byte[]> context) {
274+
options.getContextPropagators()
275+
.forEach(propagator -> propagator.setCurrentContext(propagator.deserializeContext(context)));
276+
}
259277
}

0 commit comments

Comments
 (0)