Skip to content

Commit 076073f

Browse files
committed
Completion handled at gPRC layer.
1 parent 84d5a4d commit 076073f

10 files changed

Lines changed: 226 additions & 5 deletions

File tree

temporal-sdk/src/main/java/io/temporal/client/ActivityHandle.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.temporal.common.Experimental;
44
import java.lang.reflect.Type;
55
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.TimeUnit;
67
import javax.annotation.Nullable;
78

89
/**
@@ -31,6 +32,16 @@ public interface ActivityHandle<R> extends UntypedActivityHandle {
3132
*/
3233
CompletableFuture<R> getResultAsync();
3334

35+
/**
36+
* Returns a future that completes when the activity completes, or fails with {@link
37+
* java.util.concurrent.TimeoutException} if the activity does not complete within the specified
38+
* timeout.
39+
*
40+
* @param timeout maximum time to wait
41+
* @param unit unit of {@code timeout}
42+
*/
43+
CompletableFuture<R> getResultAsync(long timeout, TimeUnit unit);
44+
3445
/**
3546
* Wraps an {@link UntypedActivityHandle} with a known result type.
3647
*

temporal-sdk/src/main/java/io/temporal/client/ActivityHandleWrapper.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.lang.reflect.Type;
44
import java.util.concurrent.CompletableFuture;
5+
import java.util.concurrent.TimeUnit;
56
import javax.annotation.Nullable;
67

78
/**
@@ -33,6 +34,22 @@ public CompletableFuture<R> getResultAsync() {
3334
return delegate.getResultAsync(resultClass, resultType);
3435
}
3536

37+
@Override
38+
public CompletableFuture<R> getResultAsync(long timeout, TimeUnit unit) {
39+
return delegate.getResultAsync(timeout, unit, resultClass, resultType);
40+
}
41+
42+
@Override
43+
public <T> CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit, Class<T> clazz) {
44+
return delegate.getResultAsync(timeout, unit, clazz, null);
45+
}
46+
47+
@Override
48+
public <T> CompletableFuture<T> getResultAsync(
49+
long timeout, TimeUnit unit, Class<T> clazz, @Nullable Type type) {
50+
return delegate.getResultAsync(timeout, unit, clazz, type);
51+
}
52+
3653
@Override
3754
public String getActivityId() {
3855
return delegate.getActivityId();

temporal-sdk/src/main/java/io/temporal/client/UntypedActivityHandle.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import io.temporal.common.Experimental;
44
import java.lang.reflect.Type;
55
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.TimeoutException;
68
import javax.annotation.Nullable;
79

810
/**
@@ -64,6 +66,27 @@ public interface UntypedActivityHandle {
6466
*/
6567
<R> CompletableFuture<R> getResultAsync(Class<R> resultClass, @Nullable Type resultType);
6668

69+
/**
70+
* Returns a future that completes when the activity completes, or fails with {@link
71+
* TimeoutException} if the activity does not complete within the specified timeout.
72+
*
73+
* @param timeout maximum time to wait
74+
* @param unit unit of {@code timeout}
75+
* @param resultClass the class to deserialize the result into
76+
*/
77+
<R> CompletableFuture<R> getResultAsync(long timeout, TimeUnit unit, Class<R> resultClass);
78+
79+
/**
80+
* Returns a future for generic return types with a timeout.
81+
*
82+
* @param timeout maximum time to wait
83+
* @param unit unit of {@code timeout}
84+
* @param resultClass the class to deserialize the result into
85+
* @param resultType the generic type to use for deserialization; may be {@code null}
86+
*/
87+
<R> CompletableFuture<R> getResultAsync(
88+
long timeout, TimeUnit unit, Class<R> resultClass, @Nullable Type resultType);
89+
6790
/**
6891
* Describes the current state of the activity execution.
6992
*

temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityClientCallsInterceptor.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import io.temporal.common.Experimental;
1414
import java.lang.reflect.Type;
1515
import java.util.List;
16+
import java.util.concurrent.CompletableFuture;
17+
import java.util.concurrent.TimeUnit;
1618
import java.util.stream.Stream;
1719
import javax.annotation.Nullable;
1820

@@ -34,6 +36,18 @@ StartActivityOutput startActivity(StartActivityInput input)
3436
<R> GetActivityResultOutput<R> getActivityResult(GetActivityResultInput<R> input)
3537
throws ActivityFailedException;
3638

39+
default <R> CompletableFuture<GetActivityResultOutput<R>> getActivityResultAsync(
40+
GetActivityResultInput<R> input) {
41+
return CompletableFuture.supplyAsync(
42+
() -> {
43+
try {
44+
return getActivityResult(input);
45+
} catch (ActivityFailedException e) {
46+
throw new java.util.concurrent.CompletionException(e);
47+
}
48+
});
49+
}
50+
3751
DescribeActivityOutput describeActivity(DescribeActivityInput input);
3852

3953
CancelActivityOutput cancelActivity(CancelActivityInput input);
@@ -104,16 +118,31 @@ final class GetActivityResultInput<R> {
104118
private final @Nullable String runId;
105119
private final Class<R> resultClass;
106120
private final @Nullable Type resultType;
121+
private final long timeout;
122+
private final TimeUnit timeoutUnit;
107123

108124
public GetActivityResultInput(
109125
String activityId,
110126
@Nullable String runId,
111127
Class<R> resultClass,
112-
@Nullable Type resultType) {
128+
@Nullable Type resultType,
129+
long timeout,
130+
TimeUnit timeoutUnit) {
113131
this.activityId = activityId;
114132
this.runId = runId;
115133
this.resultClass = resultClass;
116134
this.resultType = resultType;
135+
this.timeout = timeout;
136+
this.timeoutUnit = timeoutUnit;
137+
}
138+
139+
/** No-timeout constructor: waits indefinitely. */
140+
public GetActivityResultInput(
141+
String activityId,
142+
@Nullable String runId,
143+
Class<R> resultClass,
144+
@Nullable Type resultType) {
145+
this(activityId, runId, resultClass, resultType, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
117146
}
118147

119148
/** Backward-compatible constructor that passes {@code null} for {@code resultType}. */
@@ -138,6 +167,14 @@ public Class<R> getResultClass() {
138167
public Type getResultType() {
139168
return resultType;
140169
}
170+
171+
public long getTimeout() {
172+
return timeout;
173+
}
174+
175+
public TimeUnit getTimeoutUnit() {
176+
return timeoutUnit;
177+
}
141178
}
142179

143180
@Experimental

temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityClientCallsInterceptorBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.temporal.client.ActivityAlreadyStartedException;
44
import io.temporal.client.ActivityFailedException;
5+
import java.util.concurrent.CompletableFuture;
56

67
/** Convenience base class for {@link ActivityClientCallsInterceptor} implementations. */
78
public class ActivityClientCallsInterceptorBase implements ActivityClientCallsInterceptor {
@@ -24,6 +25,12 @@ public <R> GetActivityResultOutput<R> getActivityResult(GetActivityResultInput<R
2425
return next.getActivityResult(input);
2526
}
2627

28+
@Override
29+
public <R> CompletableFuture<GetActivityResultOutput<R>> getActivityResultAsync(
30+
GetActivityResultInput<R> input) {
31+
return next.getActivityResultAsync(input);
32+
}
33+
2734
@Override
2835
public DescribeActivityOutput describeActivity(DescribeActivityInput input) {
2936
return next.describeActivity(input);

temporal-sdk/src/main/java/io/temporal/internal/client/ActivityHandleImpl.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.temporal.common.interceptors.ActivityClientCallsInterceptor;
77
import java.lang.reflect.Type;
88
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.TimeUnit;
910
import javax.annotation.Nullable;
1011

1112
/**
@@ -59,7 +60,23 @@ public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass) {
5960

6061
@Override
6162
public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass, @Nullable Type resultType) {
62-
return CompletableFuture.supplyAsync(() -> getResult(resultClass, resultType));
63+
return getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS, resultClass, resultType);
64+
}
65+
66+
@Override
67+
public <R> CompletableFuture<R> getResultAsync(
68+
long timeout, TimeUnit unit, Class<R> resultClass) {
69+
return getResultAsync(timeout, unit, resultClass, null);
70+
}
71+
72+
@Override
73+
public <R> CompletableFuture<R> getResultAsync(
74+
long timeout, TimeUnit unit, Class<R> resultClass, @Nullable Type resultType) {
75+
return clientCallsInterceptor
76+
.getActivityResultAsync(
77+
new ActivityClientCallsInterceptor.GetActivityResultInput<>(
78+
activityId, activityRunId, resultClass, resultType, timeout, unit))
79+
.thenApply(ActivityClientCallsInterceptor.GetActivityResultOutput::getResult);
6380
}
6481

6582
@Override

temporal-sdk/src/main/java/io/temporal/internal/client/RootActivityClientInvoker.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import com.google.common.collect.Iterators;
77
import com.google.protobuf.ByteString;
8+
import io.grpc.Deadline;
89
import io.grpc.Status;
910
import io.grpc.StatusRuntimeException;
1011
import io.temporal.api.activity.v1.ActivityExecutionOutcome;
@@ -25,6 +26,9 @@
2526
import io.temporal.serviceclient.StatusUtils;
2627
import java.lang.reflect.Type;
2728
import java.util.*;
29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.CompletionException;
31+
import java.util.concurrent.TimeoutException;
2832
import java.util.stream.StreamSupport;
2933

3034
/**
@@ -177,6 +181,87 @@ public <R> GetActivityResultOutput<R> getActivityResult(GetActivityResultInput<R
177181
}
178182
}
179183

184+
@Override
185+
public <R> CompletableFuture<GetActivityResultOutput<R>> getActivityResultAsync(
186+
GetActivityResultInput<R> input) {
187+
DataConverter dc = clientOptions.getDataConverter();
188+
Deadline deadline = Deadline.after(input.getTimeout(), input.getTimeoutUnit());
189+
return pollActivityUntilOutcome(input, deadline)
190+
.handle(
191+
(outcome, e) -> {
192+
if (e == null) {
193+
return decodeOutcome(outcome, input, dc);
194+
}
195+
throw handleAsyncException(e, deadline, input.getActivityId());
196+
});
197+
}
198+
199+
private CompletableFuture<ActivityExecutionOutcome> pollActivityUntilOutcome(
200+
GetActivityResultInput<?> input, Deadline deadline) {
201+
PollActivityExecutionRequest.Builder pollRequest =
202+
PollActivityExecutionRequest.newBuilder()
203+
.setNamespace(clientOptions.getNamespace())
204+
.setActivityId(input.getActivityId());
205+
if (input.getRunId() != null) {
206+
pollRequest.setRunId(input.getRunId());
207+
}
208+
return genericClient
209+
.pollActivityAsync(pollRequest.build(), deadline)
210+
.thenComposeAsync(
211+
response -> {
212+
if (!response.hasOutcome()) {
213+
return pollActivityUntilOutcome(input, deadline);
214+
}
215+
return CompletableFuture.completedFuture(response.getOutcome());
216+
});
217+
}
218+
219+
private static CompletionException handleAsyncException(
220+
Throwable e, Deadline deadline, String activityId) {
221+
Throwable cause = e instanceof CompletionException ? e.getCause() : e;
222+
if (deadline.isExpired()
223+
&& cause instanceof StatusRuntimeException
224+
&& Status.Code.DEADLINE_EXCEEDED.equals(
225+
((StatusRuntimeException) cause).getStatus().getCode())) {
226+
return new CompletionException(
227+
new TimeoutException(
228+
"Activity did not complete within timeout: activityId='" + activityId + "'"));
229+
}
230+
return e instanceof CompletionException ? (CompletionException) e : new CompletionException(e);
231+
}
232+
233+
private <R> GetActivityResultOutput<R> decodeOutcome(
234+
ActivityExecutionOutcome outcome, GetActivityResultInput<R> input, DataConverter dc) {
235+
switch (outcome.getValueCase()) {
236+
case RESULT:
237+
Type resultType =
238+
input.getResultType() != null ? input.getResultType() : input.getResultClass();
239+
@SuppressWarnings("unchecked")
240+
R result =
241+
(R)
242+
dc.fromPayloads(
243+
0,
244+
outcome.hasResult() ? Optional.of(outcome.getResult()) : Optional.empty(),
245+
input.getResultClass(),
246+
resultType);
247+
return new GetActivityResultOutput<>(result);
248+
case FAILURE:
249+
throw new java.util.concurrent.CompletionException(
250+
new ActivityFailedException(
251+
"Activity failed: activityId='" + input.getActivityId() + "'",
252+
dc.failureToException(outcome.getFailure())));
253+
default:
254+
throw new java.util.concurrent.CompletionException(
255+
new ActivityFailedException(
256+
"Activity completed with unexpected outcome '"
257+
+ outcome.getValueCase()
258+
+ "' for activityId='"
259+
+ input.getActivityId()
260+
+ "'",
261+
null));
262+
}
263+
}
264+
180265
@Override
181266
public DescribeActivityOutput describeActivity(DescribeActivityInput input) {
182267
DescribeActivityExecutionRequest.Builder req =

temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ ExecuteMultiOperationResponse executeMultiOperation(
7878
@Experimental
7979
PollActivityExecutionResponse pollActivity(PollActivityExecutionRequest request);
8080

81+
@Experimental
82+
CompletableFuture<PollActivityExecutionResponse> pollActivityAsync(
83+
PollActivityExecutionRequest request, @Nonnull Deadline deadline);
84+
8185
@Experimental
8286
DescribeActivityExecutionResponse describeActivity(DescribeActivityExecutionRequest request);
8387

temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,22 @@ public PollActivityExecutionResponse pollActivity(PollActivityExecutionRequest r
452452
new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, null));
453453
}
454454

455+
@Override
456+
public CompletableFuture<PollActivityExecutionResponse> pollActivityAsync(
457+
PollActivityExecutionRequest request, @Nonnull Deadline deadline) {
458+
return grpcRetryer.retryWithResultAsync(
459+
asyncThrottlerExecutor,
460+
() ->
461+
toCompletableFuture(
462+
service
463+
.futureStub()
464+
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
465+
.withOption(HISTORY_LONG_POLL_CALL_OPTIONS_KEY, true)
466+
.withDeadline(deadline)
467+
.pollActivityExecution(request)),
468+
new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, deadline));
469+
}
470+
455471
@Override
456472
public DescribeActivityExecutionResponse describeActivity(
457473
DescribeActivityExecutionRequest request) {

temporal-sdk/src/test/java/io/temporal/internal/client/ActivityHandleImplTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ public void testGetResultTyped() throws ActivityFailedException {
4545
public void testGetResultAsync() throws Exception {
4646
GetActivityResultOutput<String> output = mock(GetActivityResultOutput.class);
4747
when(output.getResult()).thenReturn("async-result");
48-
when(interceptor.getActivityResult(any(GetActivityResultInput.class))).thenReturn(output);
48+
when(interceptor.getActivityResultAsync(any(GetActivityResultInput.class)))
49+
.thenReturn(CompletableFuture.completedFuture(output));
4950

5051
UntypedActivityHandle handle = new ActivityHandleImpl("id", null, interceptor);
5152
CompletableFuture<String> future = handle.getResultAsync(String.class);
@@ -141,7 +142,8 @@ public void testFromUntypedWithExplicitTypePassesTypeToInterceptor()
141142
public void testFromUntypedGetResultAsyncNoArg() throws Exception {
142143
GetActivityResultOutput<String> output = mock(GetActivityResultOutput.class);
143144
when(output.getResult()).thenReturn("async-typed");
144-
when(interceptor.getActivityResult(any(GetActivityResultInput.class))).thenReturn(output);
145+
when(interceptor.getActivityResultAsync(any(GetActivityResultInput.class)))
146+
.thenReturn(CompletableFuture.completedFuture(output));
145147

146148
UntypedActivityHandle untyped = new ActivityHandleImpl("id", "run", interceptor);
147149
ActivityHandle<String> typed = ActivityHandle.fromUntyped(untyped, String.class);
@@ -154,7 +156,9 @@ public void testFromUntypedGetResultAsyncNoArg() throws Exception {
154156
public void testGetResultAsyncWrapsActivityFailedExceptionInRuntimeException() throws Exception {
155157
ActivityFailedException failure =
156158
new ActivityFailedException("activity failed", new RuntimeException("root cause"));
157-
when(interceptor.getActivityResult(any(GetActivityResultInput.class))).thenThrow(failure);
159+
CompletableFuture<GetActivityResultOutput<String>> failed = new CompletableFuture<>();
160+
failed.completeExceptionally(failure);
161+
when(interceptor.getActivityResultAsync(any(GetActivityResultInput.class))).thenReturn(failed);
158162

159163
UntypedActivityHandle handle = new ActivityHandleImpl("id", "run", interceptor);
160164
CompletableFuture<String> future = handle.getResultAsync(String.class);

0 commit comments

Comments
 (0)