Skip to content

Commit d4bf5b1

Browse files
committed
.start() pulls headers from context propagators.
1 parent 62e9c3b commit d4bf5b1

2 files changed

Lines changed: 62 additions & 2 deletions

File tree

temporal-sdk/src/main/java/io/temporal/client/ActivityClientImpl.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.temporal.client;
22

33
import com.uber.m3.tally.Scope;
4+
import io.temporal.api.common.v1.Payload;
5+
import io.temporal.common.context.ContextPropagator;
46
import io.temporal.common.interceptors.ActivityClientCallsInterceptor;
57
import io.temporal.common.interceptors.ActivityClientInterceptor;
68
import io.temporal.common.interceptors.Header;
@@ -15,6 +17,9 @@
1517
import java.lang.reflect.Method;
1618
import java.lang.reflect.Type;
1719
import java.util.Arrays;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
1823
import java.util.concurrent.CompletableFuture;
1924
import java.util.stream.Stream;
2025
import javax.annotation.Nullable;
@@ -304,7 +309,7 @@ public UntypedActivityHandle start(
304309
activityType,
305310
Arrays.asList(args != null ? args : new Object[0]),
306311
options,
307-
Header.empty()));
312+
propagatedHeader()));
308313
return new ActivityHandleImpl(output.getActivityId(), output.getActivityRunId(), invoker);
309314
}
310315

@@ -410,6 +415,18 @@ public <R> ActivityHandle<R> getHandle(
410415
return ActivityHandle.fromUntyped(untyped, resultClass, resultType);
411416
}
412417

418+
private Header propagatedHeader() {
419+
List<ContextPropagator> propagators = options.getContextPropagators();
420+
if (propagators.isEmpty()) {
421+
return Header.empty();
422+
}
423+
Map<String, Payload> result = new HashMap<>();
424+
for (ContextPropagator propagator : propagators) {
425+
result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
426+
}
427+
return new Header(result);
428+
}
429+
413430
// ---- List / count ----
414431

415432
@Override

temporal-sdk/src/test/java/io/temporal/client/ActivityClientImplTest.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11
package io.temporal.client;
22

3+
import static org.junit.Assert.*;
34
import static org.mockito.ArgumentMatchers.any;
45
import static org.mockito.Mockito.*;
56

67
import com.uber.m3.tally.Scope;
78
import io.temporal.activity.ActivityInterface;
89
import io.temporal.activity.ActivityMethod;
10+
import io.temporal.api.common.v1.Payload;
11+
import io.temporal.common.context.ContextPropagator;
12+
import io.temporal.common.interceptors.ActivityClientCallsInterceptor;
13+
import io.temporal.common.interceptors.ActivityClientCallsInterceptorBase;
14+
import io.temporal.common.interceptors.ActivityClientInterceptor;
15+
import io.temporal.common.interceptors.Header;
916
import io.temporal.serviceclient.WorkflowServiceStubs;
1017
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
1118
import io.temporal.workflow.Functions;
1219
import java.time.Duration;
20+
import java.util.Collections;
21+
import java.util.concurrent.atomic.AtomicReference;
1322
import org.junit.Before;
1423
import org.junit.Test;
1524

@@ -27,12 +36,13 @@ public interface WrongActivity {
2736
void doWrong();
2837
}
2938

39+
private WorkflowServiceStubs stubs;
3040
private ActivityClient client;
3141
private StartActivityOptions options;
3242

3343
@Before
3444
public void setUp() {
35-
WorkflowServiceStubs stubs = mock(WorkflowServiceStubs.class);
45+
stubs = mock(WorkflowServiceStubs.class);
3646
WorkflowServiceStubsOptions stubsOptions = mock(WorkflowServiceStubsOptions.class);
3747
Scope scope = mock(Scope.class);
3848
when(stubs.getOptions()).thenReturn(stubsOptions);
@@ -48,6 +58,39 @@ public void setUp() {
4858
.build();
4959
}
5060

61+
@Test
62+
public void testContextPropagatorHeaderIsIncluded() {
63+
Payload payload = Payload.newBuilder().build();
64+
ContextPropagator propagator = mock(ContextPropagator.class);
65+
when(propagator.getCurrentContext()).thenReturn("ctx");
66+
when(propagator.serializeContext("ctx"))
67+
.thenReturn(Collections.singletonMap("my-key", payload));
68+
69+
AtomicReference<Header> capturedHeader = new AtomicReference<>();
70+
ActivityClientInterceptor capturingInterceptor =
71+
next ->
72+
new ActivityClientCallsInterceptorBase(next) {
73+
@Override
74+
public ActivityClientCallsInterceptor.StartActivityOutput startActivity(
75+
ActivityClientCallsInterceptor.StartActivityInput input) {
76+
capturedHeader.set(input.getHeader());
77+
return new ActivityClientCallsInterceptor.StartActivityOutput("fake-id", null);
78+
}
79+
};
80+
81+
ActivityClientOptions opts =
82+
ActivityClientOptions.newBuilder()
83+
.setContextPropagators(Collections.singletonList(propagator))
84+
.setInterceptors(Collections.singletonList(capturingInterceptor))
85+
.build();
86+
87+
ActivityClient clientWithPropagators = ActivityClient.newInstance(stubs, opts);
88+
clientWithPropagators.start(ProbedActivity.class, ProbedActivity::doIt, options);
89+
90+
assertNotNull(capturedHeader.get());
91+
assertEquals(payload, capturedHeader.get().getValues().get("my-key"));
92+
}
93+
5194
@Test(expected = NoSuchMethodError.class)
5295
@SuppressWarnings("unchecked")
5396
public void testStartWithMethodFromWrongClass() {

0 commit comments

Comments
 (0)