Skip to content

Commit eddc53e

Browse files
committed
Add overload to ActivityHandle.getResult with timeout.
Implemented this by adding a Deadline to RootActivityClientInvoker.getActivityResult. Also: In tests, use .start(...).get() instead of .executeAsync, for clarity. Added stopwatch check to testGetActivityResultAsyncTimeoutAbortsPolling to avoid possibility of false negative.
1 parent 2eaf28a commit eddc53e

15 files changed

Lines changed: 142 additions & 17 deletions

temporal-sdk/src/main/java/io/temporal/client/ActivityHandle.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.lang.reflect.Type;
55
import java.util.concurrent.CompletableFuture;
66
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.TimeoutException;
78
import javax.annotation.Nullable;
89

910
/**
@@ -27,6 +28,18 @@ public interface ActivityHandle<R> extends UntypedActivityHandle {
2728
*/
2829
R getResult();
2930

31+
/**
32+
* Blocks until the standalone activity completes and returns the typed result, or throws if the
33+
* client-side timeout expires first.
34+
*
35+
* @param timeout maximum time to wait
36+
* @param unit unit of {@code timeout}
37+
* @throws ActivityFailedException if the activity failed, timed out on the server, or was
38+
* cancelled
39+
* @throws TimeoutException if {@code timeout} expires before the activity completes
40+
*/
41+
R getResult(long timeout, TimeUnit unit) throws TimeoutException;
42+
3043
/**
3144
* Returns a future that completes when the activity completes and resolves to the typed result.
3245
*/

temporal-sdk/src/main/java/io/temporal/client/ActivityHandleImpl.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.lang.reflect.Type;
44
import java.util.concurrent.CompletableFuture;
55
import java.util.concurrent.TimeUnit;
6+
import java.util.concurrent.TimeoutException;
67
import javax.annotation.Nullable;
78

89
/**
@@ -29,6 +30,22 @@ public R getResult() {
2930
return delegate.getResult(resultClass, resultType);
3031
}
3132

33+
@Override
34+
public R getResult(long timeout, TimeUnit unit) throws TimeoutException {
35+
return delegate.getResult(timeout, unit, resultClass, resultType);
36+
}
37+
38+
@Override
39+
public <T> T getResult(long timeout, TimeUnit unit, Class<T> clazz) throws TimeoutException {
40+
return delegate.getResult(timeout, unit, clazz, null);
41+
}
42+
43+
@Override
44+
public <T> T getResult(long timeout, TimeUnit unit, Class<T> clazz, @Nullable Type type)
45+
throws TimeoutException {
46+
return delegate.getResult(timeout, unit, clazz, type);
47+
}
48+
3249
@Override
3350
public CompletableFuture<R> getResultAsync() {
3451
return delegate.getResultAsync(resultClass, resultType);

temporal-sdk/src/main/java/io/temporal/client/UntypedActivityHandle.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,36 @@ public interface UntypedActivityHandle {
5151
*/
5252
<R> R getResult(Class<R> resultClass, @Nullable Type resultType);
5353

54+
/**
55+
* Blocks until the standalone activity completes and returns the typed result, or throws if the
56+
* timeout expires before the activity completes.
57+
*
58+
* @param timeout maximum time to wait
59+
* @param unit unit of {@code timeout}
60+
* @param resultClass the class to deserialize the result into
61+
* @throws ActivityFailedException if the activity failed, timed out on the server, or was
62+
* cancelled
63+
* @throws TimeoutException if the client-side {@code timeout} expires before the activity
64+
* completes
65+
*/
66+
<R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass) throws TimeoutException;
67+
68+
/**
69+
* Blocks until the standalone activity completes and returns the typed result, or throws if the
70+
* timeout expires. Use this overload for generic return types (e.g. {@code List<String>}).
71+
*
72+
* @param timeout maximum time to wait
73+
* @param unit unit of {@code timeout}
74+
* @param resultClass the class to deserialize the result into
75+
* @param resultType the generic type to use for deserialization; may be {@code null}
76+
* @throws ActivityFailedException if the activity failed, timed out on the server, or was
77+
* cancelled
78+
* @throws TimeoutException if the client-side {@code timeout} expires before the activity
79+
* completes
80+
*/
81+
<R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass, @Nullable Type resultType)
82+
throws TimeoutException;
83+
5484
/**
5585
* Returns a future that completes when the activity completes and resolves to the typed result.
5686
*

temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityClientCallsInterceptor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.List;
1212
import java.util.concurrent.CompletableFuture;
1313
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.TimeoutException;
1415
import java.util.stream.Stream;
1516
import javax.annotation.Nullable;
1617

@@ -47,7 +48,8 @@ public interface ActivityClientCallsInterceptor {
4748
* @return output wrapping the deserialized activity result
4849
* @throws ActivityFailedException if the activity failed or was cancelled
4950
*/
50-
<R> GetActivityResultOutput<R> getActivityResult(GetActivityResultInput<R> input);
51+
<R> GetActivityResultOutput<R> getActivityResult(GetActivityResultInput<R> input)
52+
throws TimeoutException;
5153

5254
/**
5355
* Returns the current execution description for a standalone activity. If a long-poll token from
@@ -117,7 +119,7 @@ default <R> CompletableFuture<GetActivityResultOutput<R>> getActivityResultAsync
117119
() -> {
118120
try {
119121
return getActivityResult(input);
120-
} catch (ActivityFailedException e) {
122+
} catch (ActivityFailedException | TimeoutException e) {
121123
throw new java.util.concurrent.CompletionException(e);
122124
}
123125
});

temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityClientCallsInterceptorBase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.temporal.common.interceptors;
22

33
import java.util.concurrent.CompletableFuture;
4+
import java.util.concurrent.TimeoutException;
45

56
/** Convenience base class for {@link ActivityClientCallsInterceptor} implementations. */
67
public class ActivityClientCallsInterceptorBase implements ActivityClientCallsInterceptor {
@@ -17,7 +18,8 @@ public StartActivityOutput startActivity(StartActivityInput input) {
1718
}
1819

1920
@Override
20-
public <R> GetActivityResultOutput<R> getActivityResult(GetActivityResultInput<R> input) {
21+
public <R> GetActivityResultOutput<R> getActivityResult(GetActivityResultInput<R> input)
22+
throws TimeoutException {
2123
return next.getActivityResult(input);
2224
}
2325

temporal-sdk/src/main/java/io/temporal/internal/client/ActivityHandleImpl.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.lang.reflect.Type;
77
import java.util.concurrent.CompletableFuture;
88
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.TimeoutException;
910
import java.util.concurrent.atomic.AtomicReference;
1011
import javax.annotation.Nullable;
1112

@@ -46,10 +47,32 @@ public <R> R getResult(Class<R> resultClass) {
4647

4748
@Override
4849
public <R> R getResult(Class<R> resultClass, @Nullable Type resultType) {
50+
try {
51+
return clientCallsInterceptor
52+
.getActivityResult(
53+
new ActivityClientCallsInterceptor.GetActivityResultInput<>(
54+
activityId, activityRunId, resultClass, resultType))
55+
.getResult();
56+
} catch (TimeoutException e) {
57+
// unreachable: no-timeout input uses Long.MAX_VALUE deadline
58+
throw new RuntimeException(e);
59+
}
60+
}
61+
62+
@Override
63+
public <R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass)
64+
throws TimeoutException {
65+
return getResult(timeout, unit, resultClass, null);
66+
}
67+
68+
@Override
69+
public <R> R getResult(
70+
long timeout, TimeUnit unit, Class<R> resultClass, @Nullable Type resultType)
71+
throws TimeoutException {
4972
return clientCallsInterceptor
5073
.getActivityResult(
5174
new ActivityClientCallsInterceptor.GetActivityResultInput<>(
52-
activityId, activityRunId, resultClass, resultType))
75+
activityId, activityRunId, resultClass, resultType, timeout, unit))
5376
.getResult();
5477
}
5578

temporal-sdk/src/main/java/io/temporal/internal/client/RootActivityClientInvoker.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,11 @@ public StartActivityOutput startActivity(StartActivityInput input) {
122122
}
123123

124124
@Override
125-
public <R> GetActivityResultOutput<R> getActivityResult(GetActivityResultInput<R> input) {
125+
public <R> GetActivityResultOutput<R> getActivityResult(GetActivityResultInput<R> input)
126+
throws TimeoutException {
126127
String namespace = clientOptions.getNamespace();
127128
DataConverter dc = clientOptions.getDataConverter();
129+
Deadline deadline = Deadline.after(input.getTimeout(), input.getTimeoutUnit());
128130

129131
while (true) {
130132
PollActivityExecutionRequest.Builder pollRequest =
@@ -135,7 +137,18 @@ public <R> GetActivityResultOutput<R> getActivityResult(GetActivityResultInput<R
135137
pollRequest.setRunId(input.getRunId());
136138
}
137139

138-
PollActivityExecutionResponse pollResponse = genericClient.pollActivity(pollRequest.build());
140+
PollActivityExecutionResponse pollResponse;
141+
try {
142+
pollResponse = genericClient.pollActivity(pollRequest.build(), deadline);
143+
} catch (StatusRuntimeException e) {
144+
if (deadline.isExpired() && Status.Code.DEADLINE_EXCEEDED.equals(e.getStatus().getCode())) {
145+
throw new TimeoutException(
146+
"Activity did not complete within timeout: activityId='"
147+
+ input.getActivityId()
148+
+ "'");
149+
}
150+
throw e;
151+
}
139152

140153
if (!pollResponse.hasOutcome()) {
141154
if (Thread.currentThread().isInterrupted()) {

temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ ExecuteMultiOperationResponse executeMultiOperation(
7878
@Experimental
7979
PollActivityExecutionResponse pollActivity(PollActivityExecutionRequest request);
8080

81+
@Experimental
82+
PollActivityExecutionResponse pollActivity(
83+
PollActivityExecutionRequest request, @Nonnull Deadline deadline);
84+
8185
@Experimental
8286
CompletableFuture<PollActivityExecutionResponse> pollActivityAsync(
8387
PollActivityExecutionRequest request, @Nonnull Deadline deadline);

temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,20 @@ public PollActivityExecutionResponse pollActivity(PollActivityExecutionRequest r
452452
new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, null));
453453
}
454454

455+
@Override
456+
public PollActivityExecutionResponse pollActivity(
457+
PollActivityExecutionRequest request, @Nonnull Deadline deadline) {
458+
return grpcRetryer.retryWithResult(
459+
() ->
460+
service
461+
.blockingStub()
462+
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
463+
.withOption(HISTORY_LONG_POLL_CALL_OPTIONS_KEY, true)
464+
.withDeadline(deadline)
465+
.pollActivityExecution(request),
466+
new GrpcRetryer.GrpcRetryerOptions(DefaultStubLongPollRpcRetryOptions.INSTANCE, deadline));
467+
}
468+
455469
@Override
456470
public CompletableFuture<PollActivityExecutionResponse> pollActivityAsync(
457471
PollActivityExecutionRequest request, @Nonnull Deadline deadline) {

temporal-sdk/src/test/java/io/temporal/client/functional/GetActivityResultAsyncOverServerLongPollWaitTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ private StartActivityOptions slowOpts() {
6868
@Test(timeout = 2 * ACTIVITY_LONG_POLL_TIMEOUT_SECONDS * 1000)
6969
public void testGetResultAsync() throws ExecutionException, InterruptedException {
7070
assumeTrue(SDKTestWorkflowRule.useExternalService);
71-
newActivityClient().executeAsync(SlowActivity.class, SlowActivity::run, slowOpts()).get();
71+
newActivityClient()
72+
.start(SlowActivity.class, SlowActivity::run, slowOpts())
73+
.getResultAsync()
74+
.get();
7275
}
7376
}

0 commit comments

Comments
 (0)