@@ -125,7 +125,7 @@ public static byte[] getWorkflowExecutionResult(
125
125
TimeUnit unit )
126
126
throws TimeoutException , CancellationException , WorkflowExecutionFailedException ,
127
127
WorkflowTerminatedException , WorkflowTimedOutException , EntityNotExistsError {
128
- // getIntanceCloseEvent waits for workflow completion including new runs.
128
+ // getInstanceCloseEvent waits for workflow completion including new runs.
129
129
HistoryEvent closeEvent =
130
130
getInstanceCloseEvent (service , domain , workflowExecution , timeout , unit );
131
131
return getResultFromCloseEvent (workflowExecution , workflowType , closeEvent );
@@ -213,9 +213,12 @@ private static HistoryEvent getInstanceCloseEvent(
213
213
r .setNextPageToken (pageToken );
214
214
r .setWaitForNewEvent (true );
215
215
r .setSkipArchival (true );
216
+ RetryOptions retryOptions = getRetryOptionWithTimeout (timeout , unit );
216
217
try {
217
218
response =
218
- Retryer .retryWithResult (retryParameters , () -> service .GetWorkflowExecutionHistory (r ));
219
+ Retryer .retryWithResult (
220
+ retryOptions ,
221
+ () -> service .GetWorkflowExecutionHistoryWithTimeout (r , unit .toMillis (timeout )));
219
222
} catch (EntityNotExistsError e ) {
220
223
if (e .activeCluster != null
221
224
&& e .currentCluster != null
@@ -289,12 +292,14 @@ private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
289
292
request .setDomain (domain );
290
293
request .setExecution (workflowExecution );
291
294
request .setHistoryEventFilterType (HistoryEventFilterType .CLOSE_EVENT );
295
+ request .setWaitForNewEvent (true );
292
296
request .setNextPageToken (pageToken );
293
297
CompletableFuture <GetWorkflowExecutionHistoryResponse > response =
294
- getWorkflowExecutionHistoryAsync (service , request );
298
+ getWorkflowExecutionHistoryAsync (service , request , timeout , unit );
295
299
return response .thenComposeAsync (
296
300
(r ) -> {
297
- if (timeout != 0 && System .currentTimeMillis () - start > unit .toMillis (timeout )) {
301
+ long elapsedTime = System .currentTimeMillis () - start ;
302
+ if (timeout != 0 && elapsedTime > unit .toMillis (timeout )) {
298
303
throw CheckedExceptionWrapper .wrap (
299
304
new TimeoutException (
300
305
"WorkflowId="
@@ -310,7 +315,7 @@ private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
310
315
if (history == null || history .getEvents ().size () == 0 ) {
311
316
// Empty poll returned
312
317
return getInstanceCloseEventAsync (
313
- service , domain , workflowExecution , pageToken , timeout , unit );
318
+ service , domain , workflowExecution , pageToken , timeout - elapsedTime , unit );
314
319
}
315
320
HistoryEvent event = history .getEvents ().get (0 );
316
321
if (!isWorkflowExecutionCompletedEvent (event )) {
@@ -326,21 +331,36 @@ private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
326
331
.getWorkflowExecutionContinuedAsNewEventAttributes ()
327
332
.getNewExecutionRunId ());
328
333
return getInstanceCloseEventAsync (
329
- service , domain , nextWorkflowExecution , r .getNextPageToken (), timeout , unit );
334
+ service ,
335
+ domain ,
336
+ nextWorkflowExecution ,
337
+ r .getNextPageToken (),
338
+ timeout - elapsedTime ,
339
+ unit );
330
340
}
331
341
return CompletableFuture .completedFuture (event );
332
342
});
333
343
}
334
344
345
+ private static RetryOptions getRetryOptionWithTimeout (long timeout , TimeUnit unit ) {
346
+ return new RetryOptions .Builder (retryParameters )
347
+ .setExpiration (Duration .ofSeconds (unit .toSeconds (timeout )))
348
+ .build ();
349
+ }
350
+
335
351
private static CompletableFuture <GetWorkflowExecutionHistoryResponse >
336
352
getWorkflowExecutionHistoryAsync (
337
- IWorkflowService service , GetWorkflowExecutionHistoryRequest r ) {
353
+ IWorkflowService service ,
354
+ GetWorkflowExecutionHistoryRequest r ,
355
+ long timeout ,
356
+ TimeUnit unit ) {
357
+ RetryOptions retryOptions = getRetryOptionWithTimeout (timeout , unit );
338
358
return Retryer .retryWithResultAsync (
339
- retryParameters ,
359
+ retryOptions ,
340
360
() -> {
341
361
CompletableFuture <GetWorkflowExecutionHistoryResponse > result = new CompletableFuture <>();
342
362
try {
343
- service .GetWorkflowExecutionHistory (
363
+ service .GetWorkflowExecutionHistoryWithTimeout (
344
364
r ,
345
365
new AsyncMethodCallback <GetWorkflowExecutionHistoryResponse >() {
346
366
@ Override
@@ -352,7 +372,8 @@ public void onComplete(GetWorkflowExecutionHistoryResponse response) {
352
372
public void onError (Exception exception ) {
353
373
result .completeExceptionally (exception );
354
374
}
355
- });
375
+ },
376
+ unit .toMillis (timeout ));
356
377
} catch (TException e ) {
357
378
result .completeExceptionally (e );
358
379
}
0 commit comments