Skip to content

Commit 98ff39f

Browse files
committed
Honor user timeout for get workflow result (#504)
1 parent a029ae2 commit 98ff39f

File tree

7 files changed

+184
-24
lines changed

7 files changed

+184
-24
lines changed

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

+20-6
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,12 @@ private static HistoryEvent getInstanceCloseEvent(
213213
r.setNextPageToken(pageToken);
214214
r.setWaitForNewEvent(true);
215215
r.setSkipArchival(true);
216+
RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit);
216217
try {
217218
response =
218-
Retryer.retryWithResult(retryParameters, () -> service.GetWorkflowExecutionHistory(r));
219+
Retryer.retryWithResult(
220+
retryOptions,
221+
() -> service.GetWorkflowExecutionHistoryWithTimeout(r, unit.toMillis(timeout)));
219222
} catch (EntityNotExistsError e) {
220223
if (e.activeCluster != null
221224
&& e.currentCluster != null
@@ -292,7 +295,7 @@ private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
292295
request.setWaitForNewEvent(true);
293296
request.setNextPageToken(pageToken);
294297
CompletableFuture<GetWorkflowExecutionHistoryResponse> response =
295-
getWorkflowExecutionHistoryAsync(service, request);
298+
getWorkflowExecutionHistoryAsync(service, request, timeout, unit);
296299
return response.thenComposeAsync(
297300
(r) -> {
298301
long elapsedTime = System.currentTimeMillis() - start;
@@ -339,15 +342,25 @@ private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
339342
});
340343
}
341344

345+
private static RetryOptions getRetryOptionWithTimeout(long timeout, TimeUnit unit) {
346+
return new RetryOptions.Builder(retryParameters)
347+
.setExpiration(Duration.ofSeconds(unit.toSeconds(timeout)))
348+
.build();
349+
}
350+
342351
private static CompletableFuture<GetWorkflowExecutionHistoryResponse>
343352
getWorkflowExecutionHistoryAsync(
344-
IWorkflowService service, GetWorkflowExecutionHistoryRequest r) {
353+
IWorkflowService service,
354+
GetWorkflowExecutionHistoryRequest r,
355+
long timeout,
356+
TimeUnit unit) {
357+
RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit);
345358
return Retryer.retryWithResultAsync(
346-
retryParameters,
359+
retryOptions,
347360
() -> {
348361
CompletableFuture<GetWorkflowExecutionHistoryResponse> result = new CompletableFuture<>();
349362
try {
350-
service.GetWorkflowExecutionHistory(
363+
service.GetWorkflowExecutionHistoryWithTimeout(
351364
r,
352365
new AsyncMethodCallback<GetWorkflowExecutionHistoryResponse>() {
353366
@Override
@@ -359,7 +372,8 @@ public void onComplete(GetWorkflowExecutionHistoryResponse response) {
359372
public void onError(Exception exception) {
360373
result.completeExceptionally(exception);
361374
}
362-
});
375+
},
376+
unit.toMillis(timeout));
363377
} catch (TException e) {
364378
result.completeExceptionally(e);
365379
}

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

+17
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,15 @@ public void GetWorkflowExecutionHistory(
583583
impl.GetWorkflowExecutionHistory(getRequest, resultHandler);
584584
}
585585

586+
@Override
587+
public void GetWorkflowExecutionHistoryWithTimeout(
588+
GetWorkflowExecutionHistoryRequest getRequest,
589+
AsyncMethodCallback resultHandler,
590+
Long timeoutInMillis)
591+
throws TException {
592+
impl.GetWorkflowExecutionHistoryWithTimeout(getRequest, resultHandler, timeoutInMillis);
593+
}
594+
586595
@Override
587596
public void PollForDecisionTask(
588597
PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler)
@@ -838,6 +847,14 @@ public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistory(
838847
return impl.GetWorkflowExecutionHistory(getRequest);
839848
}
840849

850+
@Override
851+
public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistoryWithTimeout(
852+
GetWorkflowExecutionHistoryRequest getRequest, Long timeoutInMillis)
853+
throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError,
854+
TException {
855+
return impl.GetWorkflowExecutionHistoryWithTimeout(getRequest, timeoutInMillis);
856+
}
857+
841858
@Override
842859
public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskRequest pollRequest)
843860
throws BadRequestError, InternalServiceError, ServiceBusyError, TException {

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

+17
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,15 @@ public void GetWorkflowExecutionHistory(
434434
impl.GetWorkflowExecutionHistory(getRequest, resultHandler);
435435
}
436436

437+
@Override
438+
public void GetWorkflowExecutionHistoryWithTimeout(
439+
GetWorkflowExecutionHistoryRequest getRequest,
440+
AsyncMethodCallback resultHandler,
441+
Long timeoutInMillis)
442+
throws TException {
443+
impl.GetWorkflowExecutionHistoryWithTimeout(getRequest, resultHandler, timeoutInMillis);
444+
}
445+
437446
@Override
438447
public void PollForDecisionTask(
439448
PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler)
@@ -689,6 +698,14 @@ public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistory(
689698
return impl.GetWorkflowExecutionHistory(getRequest);
690699
}
691700

701+
@Override
702+
public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistoryWithTimeout(
703+
GetWorkflowExecutionHistoryRequest getRequest, Long timeoutInMillis)
704+
throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError,
705+
TException {
706+
return impl.GetWorkflowExecutionHistoryWithTimeout(getRequest, timeoutInMillis);
707+
}
708+
692709
@Override
693710
public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskRequest pollRequest)
694711
throws BadRequestError, InternalServiceError, ServiceBusyError, TException {

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java

+19
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,15 @@ public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistory(
328328
return store.getWorkflowExecutionHistory(mutableState.getExecutionId(), getRequest);
329329
}
330330

331+
@Override
332+
public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistoryWithTimeout(
333+
GetWorkflowExecutionHistoryRequest getRequest, Long timeoutInMillis)
334+
throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError,
335+
TException {
336+
337+
return GetWorkflowExecutionHistory(getRequest);
338+
}
339+
331340
@Override
332341
public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskRequest pollRequest)
333342
throws BadRequestError, InternalServiceError, ServiceBusyError, TException {
@@ -807,6 +816,16 @@ public void GetWorkflowExecutionHistory(
807816
});
808817
}
809818

819+
@SuppressWarnings("unchecked") // Generator ignores that AsyncMethodCallback is generic
820+
@Override
821+
public void GetWorkflowExecutionHistoryWithTimeout(
822+
GetWorkflowExecutionHistoryRequest getRequest,
823+
AsyncMethodCallback resultHandler,
824+
Long timeoutInMillis)
825+
throws TException {
826+
GetWorkflowExecutionHistory(getRequest, resultHandler);
827+
}
828+
810829
@Override
811830
public void PollForDecisionTask(
812831
PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler) throws TException {

src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java

+31
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,40 @@
1717

1818
package com.uber.cadence.serviceclient;
1919

20+
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
21+
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
2022
import com.uber.cadence.WorkflowService.AsyncIface;
2123
import com.uber.cadence.WorkflowService.Iface;
24+
import org.apache.thrift.TException;
25+
import org.apache.thrift.async.AsyncMethodCallback;
2226

2327
public interface IWorkflowService extends Iface, AsyncIface {
2428
void close();
29+
30+
/**
31+
* GetWorkflowExecutionHistoryWithTimeout get workflow history same as GetWorkflowExecutionHistory
32+
* but with timeout.
33+
*
34+
* @param getRequest
35+
* @param timeoutInMillis
36+
* @return GetWorkflowExecutionHistoryResponse
37+
* @throws TException
38+
*/
39+
GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistoryWithTimeout(
40+
GetWorkflowExecutionHistoryRequest getRequest, Long timeoutInMillis) throws TException;
41+
42+
/**
43+
* GetWorkflowExecutionHistoryWithTimeout get workflow history asynchronously same as
44+
* GetWorkflowExecutionHistory but with timeout.
45+
*
46+
* @param getRequest
47+
* @param resultHandler
48+
* @param timeoutInMillis
49+
* @throws org.apache.thrift.TException
50+
*/
51+
void GetWorkflowExecutionHistoryWithTimeout(
52+
GetWorkflowExecutionHistoryRequest getRequest,
53+
AsyncMethodCallback resultHandler,
54+
Long timeoutInMillis)
55+
throws TException;
2556
}

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

+54-18
Original file line numberDiff line numberDiff line change
@@ -823,19 +823,28 @@ private StartWorkflowExecutionResponse startWorkflowExecution(
823823
}
824824
}
825825

826+
@Override
827+
public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistoryWithTimeout(
828+
GetWorkflowExecutionHistoryRequest request, Long timeoutInMillis) throws TException {
829+
return measureRemoteCall(
830+
ServiceMethod.GET_WORKFLOW_EXECUTION_HISTORY,
831+
() -> getWorkflowExecutionHistory(request, timeoutInMillis));
832+
}
833+
826834
@Override
827835
public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistory(
828836
GetWorkflowExecutionHistoryRequest request) throws TException {
829837
return measureRemoteCall(
830-
ServiceMethod.GET_WORKFLOW_EXECUTION_HISTORY, () -> getWorkflowExecutionHistory(request));
838+
ServiceMethod.GET_WORKFLOW_EXECUTION_HISTORY,
839+
() -> getWorkflowExecutionHistory(request, null));
831840
}
832841

833842
private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
834-
GetWorkflowExecutionHistoryRequest getRequest) throws TException {
843+
GetWorkflowExecutionHistoryRequest getRequest, Long timeoutInMillis) throws TException {
835844
ThriftResponse<WorkflowService.GetWorkflowExecutionHistory_result> response = null;
836845
try {
837846
ThriftRequest<WorkflowService.GetWorkflowExecutionHistory_args> request =
838-
buildGetWorkflowExecutionHistoryThriftRequest(getRequest);
847+
buildGetWorkflowExecutionHistoryThriftRequest(getRequest, timeoutInMillis);
839848
response = doRemoteCall(request);
840849
WorkflowService.GetWorkflowExecutionHistory_result result =
841850
response.getBody(WorkflowService.GetWorkflowExecutionHistory_result.class);
@@ -870,21 +879,20 @@ private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
870879
}
871880

872881
private ThriftRequest<WorkflowService.GetWorkflowExecutionHistory_args>
873-
buildGetWorkflowExecutionHistoryThriftRequest(GetWorkflowExecutionHistoryRequest getRequest) {
874-
ThriftRequest<WorkflowService.GetWorkflowExecutionHistory_args> request;
882+
buildGetWorkflowExecutionHistoryThriftRequest(
883+
GetWorkflowExecutionHistoryRequest getRequest, Long timeoutInMillis) {
884+
875885
if (getRequest.isWaitForNewEvent()) {
876-
request =
877-
buildThriftRequest(
878-
"GetWorkflowExecutionHistory",
879-
new WorkflowService.GetWorkflowExecutionHistory_args(getRequest),
880-
options.getRpcLongPollTimeoutMillis());
886+
timeoutInMillis =
887+
validateAndUpdateTimeout(timeoutInMillis, options.getRpcLongPollTimeoutMillis());
881888
} else {
882-
request =
883-
buildThriftRequest(
884-
"GetWorkflowExecutionHistory",
885-
new WorkflowService.GetWorkflowExecutionHistory_args(getRequest));
889+
timeoutInMillis = validateAndUpdateTimeout(timeoutInMillis, options.getRpcTimeoutMillis());
886890
}
887-
return request;
891+
892+
return buildThriftRequest(
893+
"GetWorkflowExecutionHistory",
894+
new WorkflowService.GetWorkflowExecutionHistory_args(getRequest),
895+
timeoutInMillis);
888896
}
889897

890898
@Override
@@ -2297,15 +2305,43 @@ public void StartWorkflowExecution(
22972305
throw new UnsupportedOperationException("not implemented");
22982306
}
22992307

2308+
private Long validateAndUpdateTimeout(Long timeoutInMillis, Long defaultTimeoutInMillis) {
2309+
if (timeoutInMillis == null || timeoutInMillis <= 0 || timeoutInMillis == Long.MAX_VALUE) {
2310+
timeoutInMillis = defaultTimeoutInMillis;
2311+
} else {
2312+
timeoutInMillis = Math.min(timeoutInMillis, defaultTimeoutInMillis);
2313+
}
2314+
return timeoutInMillis;
2315+
}
2316+
2317+
@SuppressWarnings({"unchecked", "FutureReturnValueIgnored"})
2318+
@Override
2319+
public void GetWorkflowExecutionHistoryWithTimeout(
2320+
GetWorkflowExecutionHistoryRequest getRequest,
2321+
AsyncMethodCallback resultHandler,
2322+
Long timeoutInMillis) {
2323+
2324+
getWorkflowExecutionHistory(getRequest, resultHandler, timeoutInMillis);
2325+
}
2326+
23002327
@SuppressWarnings({"unchecked", "FutureReturnValueIgnored"})
23012328
@Override
23022329
public void GetWorkflowExecutionHistory(
23032330
GetWorkflowExecutionHistoryRequest getRequest, AsyncMethodCallback resultHandler) {
2304-
CompletableFuture<ThriftResponse<GetWorkflowExecutionHistory_result>> response = null;
2331+
2332+
getWorkflowExecutionHistory(getRequest, resultHandler, null);
2333+
}
2334+
2335+
private void getWorkflowExecutionHistory(
2336+
GetWorkflowExecutionHistoryRequest getRequest,
2337+
AsyncMethodCallback resultHandler,
2338+
Long timeoutInMillis) {
2339+
23052340
ThriftRequest<WorkflowService.GetWorkflowExecutionHistory_args> request =
2306-
buildGetWorkflowExecutionHistoryThriftRequest(getRequest);
2307-
response = doRemoteCallAsync(request);
2341+
buildGetWorkflowExecutionHistoryThriftRequest(getRequest, timeoutInMillis);
23082342

2343+
CompletableFuture<ThriftResponse<GetWorkflowExecutionHistory_result>> response =
2344+
doRemoteCallAsync(request);
23092345
response
23102346
.whenComplete(
23112347
(r, e) -> {

src/test/java/com/uber/cadence/internal/common/RetryerTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public void testExpiration() throws InterruptedException {
3636
.setInitialInterval(Duration.ofMillis(10))
3737
.setMaximumInterval(Duration.ofMillis(100))
3838
.setExpiration(Duration.ofMillis(500))
39+
.setMaximumAttempts(20)
3940
.validateBuildWithDefaults();
4041
long start = System.currentTimeMillis();
4142
try {
@@ -105,4 +106,29 @@ public void testInterruptedException() throws InterruptedException {
105106
}
106107
assertTrue(System.currentTimeMillis() - start < 100000);
107108
}
109+
110+
@Test
111+
public void testMaxAttempt() throws InterruptedException {
112+
RetryOptions options =
113+
new RetryOptions.Builder()
114+
.setInitialInterval(Duration.ofMillis(10))
115+
.setMaximumInterval(Duration.ofMillis(100))
116+
.setExpiration(Duration.ofMillis(500))
117+
.setMaximumAttempts(1)
118+
.validateBuildWithDefaults();
119+
long start = System.currentTimeMillis();
120+
try {
121+
Retryer.retryWithResultAsync(
122+
options,
123+
() -> {
124+
throw new IllegalArgumentException("simulated");
125+
})
126+
.get();
127+
fail("unreachable");
128+
} catch (ExecutionException e) {
129+
assertTrue(e.getCause() instanceof IllegalArgumentException);
130+
assertEquals("simulated", e.getCause().getMessage());
131+
}
132+
assertTrue(System.currentTimeMillis() - start < 500);
133+
}
108134
}

0 commit comments

Comments
 (0)