2323import static io .temporal .internal .common .ProtobufTimeUtils .toJavaDuration ;
2424import static io .temporal .serviceclient .CheckedExceptionWrapper .wrap ;
2525
26+ import com .google .common .base .Preconditions ;
2627import com .google .common .base .Throwables ;
2728import com .google .protobuf .util .Durations ;
2829import com .google .protobuf .util .Timestamps ;
3637import io .temporal .api .query .v1 .WorkflowQuery ;
3738import io .temporal .api .query .v1 .WorkflowQueryResult ;
3839import io .temporal .api .workflowservice .v1 .PollWorkflowTaskQueueResponseOrBuilder ;
40+ import io .temporal .internal .Config ;
3941import io .temporal .internal .statemachines .StatesMachinesCallback ;
4042import io .temporal .internal .statemachines .WorkflowStateMachines ;
4143import io .temporal .internal .worker .*;
6163 */
6264class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
6365
64- /** Force new decision task after workflow task timeout multiplied by this coefficient. */
65- public static final double FORCED_DECISION_TIME_COEFFICIENT = 4d / 5d ;
66-
6766 private final WorkflowServiceStubs service ;
6867
6968 private final String namespace ;
@@ -86,6 +85,7 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
8685 private final WorkflowStateMachines workflowStateMachines ;
8786
8887 /** Number of non completed local activity tasks */
88+ // TODO move and maintain this counter inside workflowStateMachines
8989 private int localActivityTaskCount ;
9090
9191 private final ReplayWorkflowExecutor replayWorkflowExecutor ;
@@ -252,54 +252,58 @@ private void processLocalActivityRequests(long startTimeNs) throws InterruptedEx
252252 long durationUntilWFTHeartbeatNs =
253253 (long )
254254 (Durations .toNanos (startedEvent .getWorkflowTaskTimeout ())
255- * FORCED_DECISION_TIME_COEFFICIENT );
255+ * Config . WORKFLOW_TAK_HEARTBEAT_COEFFICIENT );
256256
257257 long nextWFTHeartbeatTimeNs = startTimeNs + durationUntilWFTHeartbeatNs ;
258+
258259 while (true ) {
259260 List <ExecuteLocalActivityParameters > laRequests =
260261 workflowStateMachines .takeLocalActivityRequests ();
262+ localActivityTaskCount += laRequests .size ();
263+
261264 for (ExecuteLocalActivityParameters laRequest : laRequests ) {
262265 // TODO(maxim): In the presence of workflow task heartbeat this timeout doesn't make
263266 // much sense. I believe we should add ScheduleToStart timeout for the local activities
264267 // as well.
265- Duration maxWaitTime = Duration .ofNanos (nextWFTHeartbeatTimeNs - System .nanoTime ());
266- if (maxWaitTime .isNegative ()) {
267- maxWaitTime = Duration .ZERO ;
268- }
268+ long maxWaitTimeNs = Math .max (nextWFTHeartbeatTimeNs - System .nanoTime (), 0 );
269269 boolean accepted =
270270 localActivityTaskPoller .apply (
271- new LocalActivityTask (laRequest , localActivityCompletionSink ), maxWaitTime );
272- localActivityTaskCount ++ ;
273- if (! accepted ) {
274- throw new IllegalStateException (
275- "Unable to schedule local activity for execution, no more slots available and local activity task queue is full" );
276- }
271+ new LocalActivityTask (laRequest , localActivityCompletionSink ),
272+ Duration . ofNanos ( maxWaitTimeNs )) ;
273+ Preconditions . checkState (
274+ accepted ,
275+ "Unable to schedule local activity for execution, "
276+ + "no more slots available and local activity task queue is full" );
277277 }
278+
278279 if (localActivityTaskCount == 0 ) {
279280 // No outstanding local activity requests
280281 break ;
281282 }
282- waitAndProcessLocalActivityCompletion (nextWFTHeartbeatTimeNs );
283- if (nextWFTHeartbeatTimeNs <= System .nanoTime ()) {
283+
284+ long maxWaitTimeTillHeartbeatNs = Math .max (nextWFTHeartbeatTimeNs - System .nanoTime (), 0 );
285+ ActivityTaskHandler .Result laCompletion =
286+ localActivityCompletionQueue .poll (maxWaitTimeTillHeartbeatNs , TimeUnit .NANOSECONDS );
287+ if (laCompletion == null ) {
288+ // Need to force a new task as we are out of time
284289 break ;
285290 }
286- }
287- }
288291
289- private void waitAndProcessLocalActivityCompletion (long nextForcedDecisionTimeNs )
290- throws InterruptedException {
291- long maxWaitTimeNs = nextForcedDecisionTimeNs - System .nanoTime ();
292- if (maxWaitTimeNs <= 0 ) {
293- return ;
294- }
295- ActivityTaskHandler .Result laCompletion ;
296- laCompletion = localActivityCompletionQueue .poll (maxWaitTimeNs , TimeUnit .NANOSECONDS );
297- if (laCompletion == null ) {
298- // Need to force a new task as nextForcedDecisionTime has passed.
299- return ;
292+ localActivityTaskCount --;
293+ workflowStateMachines .handleLocalActivityCompletion (laCompletion );
294+ // handleLocalActivityCompletion triggers eventLoop.
295+ // After this call, there may be new local activity requests available in
296+ // workflowStateMachines.takeLocalActivityRequests()
297+ // These requests need to be processed and accounted for, otherwise we may end up not
298+ // heartbeating and completing workflow task instead. So we have to make another iteration.
300299 }
301- localActivityTaskCount --;
302- workflowStateMachines .handleLocalActivityCompletion (laCompletion );
300+
301+ // it's safe to call and discard the result of takeLocalActivityRequests() here, because if it's
302+ // not empty - we are in trouble anyway
303+ Preconditions .checkState (
304+ workflowStateMachines .takeLocalActivityRequests ().isEmpty (),
305+ "[BUG] Local activities requests from the last event loop were not drained "
306+ + "and accounted in the outstanding local activities counter" );
303307 }
304308
305309 private class StatesMachinesCallbackImpl implements StatesMachinesCallback {
0 commit comments