Skip to content

Commit 9d9e294

Browse files
authored
Merge branch 'master' into worker-heartbeat-final
2 parents 4578791 + fb87ce6 commit 9d9e294

24 files changed

Lines changed: 1398 additions & 24 deletions

File tree

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.temporal.common;
2+
3+
/**
4+
* Specifies the versioning behavior for the first task of a new workflow run started via
5+
* continue-as-new.
6+
*/
7+
@Experimental
8+
public enum InitialVersioningBehavior {
9+
/**
10+
* Start the new run with {@link VersioningBehavior#AUTO_UPGRADE} behavior for the first task,
11+
* upgrading to the latest version. After the first workflow task completes, the workflow uses
12+
* whatever versioning behavior is specified in the workflow code.
13+
*/
14+
AUTO_UPGRADE
15+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.temporal.common;
2+
3+
/**
4+
* Reason(s) why the server suggests a workflow should continue-as-new. Multiple reasons can be true
5+
* at the same time.
6+
*/
7+
@Experimental
8+
public enum SuggestContinueAsNewReason {
9+
/** Workflow history size is getting too large. */
10+
HISTORY_SIZE_TOO_LARGE,
11+
/** Workflow history has too many events. */
12+
TOO_MANY_HISTORY_EVENTS,
13+
/** Workflow's count of completed plus in-flight updates is too large. */
14+
TOO_MANY_UPDATES
15+
}

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
import io.temporal.api.sdk.v1.UserMetadata;
99
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
1010
import io.temporal.common.RetryOptions;
11+
import io.temporal.common.SuggestContinueAsNewReason;
1112
import io.temporal.internal.common.SdkFlag;
1213
import io.temporal.internal.statemachines.*;
1314
import io.temporal.workflow.Functions;
1415
import io.temporal.workflow.Functions.Func;
1516
import io.temporal.workflow.Functions.Func1;
1617
import java.time.Duration;
18+
import java.util.List;
1719
import java.util.Map;
1820
import java.util.Optional;
1921
import java.util.Random;
@@ -357,6 +359,17 @@ Integer getVersion(
357359
*/
358360
boolean isContinueAsNewSuggested();
359361

362+
/**
363+
* @return the reasons why continue-as-new is suggested, or an empty list if not suggested. This
364+
* value changes during the lifetime of a Workflow Execution.
365+
*/
366+
List<SuggestContinueAsNewReason> getSuggestContinueAsNewReasons();
367+
368+
/**
369+
* @return true if the target worker deployment version has changed for this workflow.
370+
*/
371+
boolean isTargetWorkerDeploymentVersionChanged();
372+
360373
/**
361374
* @return true if cancellation of the workflow is requested.
362375
*/

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
1111
import io.temporal.api.sdk.v1.UserMetadata;
1212
import io.temporal.common.RetryOptions;
13+
import io.temporal.common.SuggestContinueAsNewReason;
1314
import io.temporal.failure.CanceledFailure;
1415
import io.temporal.internal.common.ProtobufTimeUtils;
1516
import io.temporal.internal.common.SdkFlag;
@@ -416,6 +417,16 @@ public boolean isContinueAsNewSuggested() {
416417
return workflowStateMachines.isContinueAsNewSuggested();
417418
}
418419

420+
@Override
421+
public List<SuggestContinueAsNewReason> getSuggestContinueAsNewReasons() {
422+
return workflowStateMachines.getSuggestContinueAsNewReasons();
423+
}
424+
425+
@Override
426+
public boolean isTargetWorkerDeploymentVersionChanged() {
427+
return workflowStateMachines.isTargetWorkerDeploymentVersionChanged();
428+
}
429+
419430
/*
420431
* MUTABLE STATE OPERATIONS
421432
*/

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.temporal.api.protocol.v1.Message;
2121
import io.temporal.api.sdk.v1.UserMetadata;
2222
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
23+
import io.temporal.common.SuggestContinueAsNewReason;
2324
import io.temporal.failure.CanceledFailure;
2425
import io.temporal.internal.common.*;
2526
import io.temporal.internal.history.LocalActivityMarkerUtils;
@@ -52,7 +53,7 @@ enum HandleEventStatus {
5253
/** Initial set of SDK flags that will be set on all new workflow executions. */
5354
@VisibleForTesting
5455
public static List<SdkFlag> initialFlags =
55-
Collections.unmodifiableList(Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
56+
Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION);
5657

5758
/**
5859
* Keep track of the change versions that have been seen by the SDK. This is used to generate the
@@ -88,6 +89,10 @@ enum HandleEventStatus {
8889

8990
private boolean isContinueAsNewSuggested;
9091

92+
private List<SuggestContinueAsNewReason> suggestContinueAsNewReasons = Collections.emptyList();
93+
94+
private boolean isTargetWorkerDeploymentVersionChanged;
95+
9196
/**
9297
* EventId of the last event seen by these state machines. Events earlier than this one will be
9398
* discarded.
@@ -276,6 +281,14 @@ public boolean isContinueAsNewSuggested() {
276281
return isContinueAsNewSuggested;
277282
}
278283

284+
public List<SuggestContinueAsNewReason> getSuggestContinueAsNewReasons() {
285+
return suggestContinueAsNewReasons;
286+
}
287+
288+
public boolean isTargetWorkerDeploymentVersionChanged() {
289+
return isTargetWorkerDeploymentVersionChanged;
290+
}
291+
279292
public void setReplaying(boolean replaying) {
280293
this.replaying = replaying;
281294
}
@@ -1493,7 +1506,9 @@ public void workflowTaskStarted(
14931506
long currentTimeMillis,
14941507
boolean nonProcessedWorkflowTask,
14951508
long historySize,
1496-
boolean isContinueAsNewSuggested) {
1509+
boolean isContinueAsNewSuggested,
1510+
List<SuggestContinueAsNewReason> suggestContinueAsNewReasons,
1511+
boolean isTargetWorkerDeploymentVersionChanged) {
14971512
setCurrentTimeMillis(currentTimeMillis);
14981513
for (CancellableCommand cancellableCommand : commands) {
14991514
cancellableCommand.handleWorkflowTaskStarted();
@@ -1509,6 +1524,9 @@ public void workflowTaskStarted(
15091524
WorkflowStateMachines.this.lastWFTStartedEventId = startedEventId;
15101525
WorkflowStateMachines.this.historySize = historySize;
15111526
WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested;
1527+
WorkflowStateMachines.this.suggestContinueAsNewReasons = suggestContinueAsNewReasons;
1528+
WorkflowStateMachines.this.isTargetWorkerDeploymentVersionChanged =
1529+
isTargetWorkerDeploymentVersionChanged;
15121530

15131531
eventLoop();
15141532
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowTaskStateMachine.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
import io.temporal.api.enums.v1.EventType;
55
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
66
import io.temporal.api.history.v1.WorkflowTaskFailedEventAttributes;
7+
import io.temporal.common.SuggestContinueAsNewReason;
8+
import java.util.ArrayList;
9+
import java.util.Collections;
10+
import java.util.List;
711
import java.util.Objects;
812

913
final class WorkflowTaskStateMachine
@@ -32,7 +36,9 @@ void workflowTaskStarted(
3236
long currentTimeMillis,
3337
boolean nonProcessedWorkflowTask,
3438
long historySize,
35-
boolean isContinueAsNewSuggested);
39+
boolean isContinueAsNewSuggested,
40+
List<SuggestContinueAsNewReason> suggestContinueAsNewReasons,
41+
boolean isTargetWorkerDeploymentVersionChanged);
3642

3743
void updateRunId(String currentRunId);
3844
}
@@ -46,6 +52,8 @@ void workflowTaskStarted(
4652
private long startedEventId;
4753
private long historySize;
4854
private boolean isContinueAsNewSuggested;
55+
private List<SuggestContinueAsNewReason> suggestContinueAsNewReasons = Collections.emptyList();
56+
private boolean isTargetWorkerDeploymentVersionChanged;
4957

5058
public static WorkflowTaskStateMachine newInstance(
5159
long workflowTaskStartedEventId, Listener listener) {
@@ -103,6 +111,15 @@ private void handleStarted() {
103111
historySize = currentEvent.getWorkflowTaskStartedEventAttributes().getHistorySizeBytes();
104112
isContinueAsNewSuggested =
105113
currentEvent.getWorkflowTaskStartedEventAttributes().getSuggestContinueAsNew();
114+
suggestContinueAsNewReasons =
115+
convertSuggestContinueAsNewReasons(
116+
currentEvent
117+
.getWorkflowTaskStartedEventAttributes()
118+
.getSuggestContinueAsNewReasonsList());
119+
isTargetWorkerDeploymentVersionChanged =
120+
currentEvent
121+
.getWorkflowTaskStartedEventAttributes()
122+
.getTargetWorkerDeploymentVersionChanged();
106123

107124
// The last started event in the history. So no completed is expected.
108125
if (currentEvent.getEventId() >= workflowTaskStartedEventId && !hasNextEvent) {
@@ -121,7 +138,33 @@ private void handleCompleted() {
121138
eventTimeOfTheLastWorkflowStartTask,
122139
lastTaskInHistory,
123140
historySize,
124-
isContinueAsNewSuggested);
141+
isContinueAsNewSuggested,
142+
suggestContinueAsNewReasons,
143+
isTargetWorkerDeploymentVersionChanged);
144+
}
145+
146+
private static List<SuggestContinueAsNewReason> convertSuggestContinueAsNewReasons(
147+
List<io.temporal.api.enums.v1.SuggestContinueAsNewReason> protoReasons) {
148+
if (protoReasons.isEmpty()) {
149+
return Collections.emptyList();
150+
}
151+
List<SuggestContinueAsNewReason> reasons = new ArrayList<>(protoReasons.size());
152+
for (io.temporal.api.enums.v1.SuggestContinueAsNewReason proto : protoReasons) {
153+
switch (proto) {
154+
case SUGGEST_CONTINUE_AS_NEW_REASON_HISTORY_SIZE_TOO_LARGE:
155+
reasons.add(SuggestContinueAsNewReason.HISTORY_SIZE_TOO_LARGE);
156+
break;
157+
case SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_HISTORY_EVENTS:
158+
reasons.add(SuggestContinueAsNewReason.TOO_MANY_HISTORY_EVENTS);
159+
break;
160+
case SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_UPDATES:
161+
reasons.add(SuggestContinueAsNewReason.TOO_MANY_UPDATES);
162+
break;
163+
default:
164+
break;
165+
}
166+
}
167+
return Collections.unmodifiableList(reasons);
125168
}
126169

127170
private void handleFailed() {

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
11741174
* Previously the SDK would yield on the getVersion call to the scheduler. This is not ideal because it can lead to non-deterministic
11751175
* scheduling if the getVersion call was removed.
11761176
* */
1177-
if (replayContext.checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) {
1177+
if (replayContext.tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) {
11781178
// This can happen if we are replaying a workflow and encounter a getVersion call that did not
11791179
// exist on the original execution and the range does not include the default version.
11801180
if (versionToUse == null) {
@@ -1415,6 +1415,15 @@ public void continueAsNew(ContinueAsNewInput input) {
14151415
.determineUseCompatibleFlag(
14161416
replayContext.getTaskQueue().equals(options.getTaskQueue())));
14171417
}
1418+
if (options.getInitialVersioningBehavior() != null) {
1419+
switch (options.getInitialVersioningBehavior()) {
1420+
case AUTO_UPGRADE:
1421+
attributes.setInitialVersioningBehavior(
1422+
io.temporal.api.enums.v1.ContinueAsNewVersioningBehavior
1423+
.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE);
1424+
break;
1425+
}
1426+
}
14181427
}
14191428

14201429
if (options == null && replayContext.getRetryOptions() != null) {

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import io.temporal.api.common.v1.WorkflowExecution;
55
import io.temporal.common.Priority;
66
import io.temporal.common.RetryOptions;
7+
import io.temporal.common.SuggestContinueAsNewReason;
78
import io.temporal.internal.common.ProtoConverters;
89
import io.temporal.internal.replay.ReplayWorkflowContext;
910
import io.temporal.workflow.WorkflowInfo;
1011
import java.time.Duration;
12+
import java.util.List;
1113
import java.util.Optional;
1214
import javax.annotation.Nonnull;
1315
import javax.annotation.Nullable;
@@ -147,6 +149,16 @@ public boolean isContinueAsNewSuggested() {
147149
return context.isContinueAsNewSuggested();
148150
}
149151

152+
@Override
153+
public List<SuggestContinueAsNewReason> getSuggestContinueAsNewReasons() {
154+
return context.getSuggestContinueAsNewReasons();
155+
}
156+
157+
@Override
158+
public boolean isTargetWorkerDeploymentVersionChanged() {
159+
return context.isTargetWorkerDeploymentVersionChanged();
160+
}
161+
150162
@Override
151163
public Optional<String> getCurrentBuildId() {
152164
return context.getCurrentBuildId();

temporal-sdk/src/main/java/io/temporal/internal/worker/ExecutorThreadFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,23 @@ class ExecutorThreadFactory implements ThreadFactory {
77
private final String threadPrefix;
88

99
private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
10+
private final ClassLoader contextClassLoader;
1011
private final AtomicInteger threadIndex = new AtomicInteger();
1112

1213
public ExecutorThreadFactory(String threadPrefix, Thread.UncaughtExceptionHandler eh) {
1314
this.threadPrefix = threadPrefix;
1415
this.uncaughtExceptionHandler = eh;
16+
this.contextClassLoader = Thread.currentThread().getContextClassLoader();
1517
}
1618

1719
@Override
1820
public Thread newThread(Runnable r) {
1921
Thread result = new Thread(r);
2022
result.setName(threadPrefix + ": " + threadIndex.incrementAndGet());
2123
result.setUncaughtExceptionHandler(uncaughtExceptionHandler);
24+
if (contextClassLoader != null) {
25+
result.setContextClassLoader(contextClassLoader);
26+
}
2227
return result;
2328
}
2429
}

temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -315,10 +315,7 @@ public Throwable wrapFailure(NexusTask task, Throwable failure) {
315315
"Failure processing nexus response: " + response.getRequest().toString(), failure);
316316
}
317317

318-
/**
319-
* @return true if the handler reported a failure or error
320-
*/
321-
@SuppressWarnings("deprecation") // Uses deprecated operationError
318+
@SuppressWarnings("deprecation") // Uses hasOperationError()/getOperationError() for compat
322319
private boolean handleNexusTask(NexusTask task, Scope metricsScope) {
323320
PollNexusTaskQueueResponseOrBuilder pollResponse = task.getResponse();
324321
ByteString taskToken = pollResponse.getTaskToken();

0 commit comments

Comments
 (0)