Skip to content

Commit 95314a2

Browse files
authored
Versioning Override support (temporalio#2530)
1 parent e793db8 commit 95314a2

File tree

16 files changed

+200
-40
lines changed

16 files changed

+200
-40
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public static WorkflowOptions merge(
6666
.setLinks(o.getLinks())
6767
.setOnConflictOptions(o.getOnConflictOptions())
6868
.setPriority(o.getPriority())
69+
.setVersioningOverride(o.getVersioningOverride())
6970
.validateBuildWithDefaults();
7071
}
7172

@@ -115,6 +116,8 @@ public static final class Builder {
115116

116117
private Priority priority;
117118

119+
private VersioningOverride versioningOverride;
120+
118121
private Builder() {}
119122

120123
private Builder(WorkflowOptions options) {
@@ -143,6 +146,7 @@ private Builder(WorkflowOptions options) {
143146
this.links = options.links;
144147
this.onConflictOptions = options.onConflictOptions;
145148
this.priority = options.priority;
149+
this.versioningOverride = options.versioningOverride;
146150
}
147151

148152
/**
@@ -472,6 +476,13 @@ public Builder setPriority(Priority priority) {
472476
return this;
473477
}
474478

479+
/** Sets the versioning override to use when starting this workflow. */
480+
@Experimental
481+
public Builder setVersioningOverride(VersioningOverride versioningOverride) {
482+
this.versioningOverride = versioningOverride;
483+
return this;
484+
}
485+
475486
public WorkflowOptions build() {
476487
return new WorkflowOptions(
477488
workflowId,
@@ -495,7 +506,8 @@ public WorkflowOptions build() {
495506
completionCallbacks,
496507
links,
497508
onConflictOptions,
498-
priority);
509+
priority,
510+
versioningOverride);
499511
}
500512

501513
/**
@@ -524,7 +536,8 @@ public WorkflowOptions validateBuildWithDefaults() {
524536
completionCallbacks,
525537
links,
526538
onConflictOptions,
527-
priority);
539+
priority,
540+
versioningOverride);
528541
}
529542
}
530543

@@ -569,6 +582,7 @@ public WorkflowOptions validateBuildWithDefaults() {
569582
private final List<Link> links;
570583
private final OnConflictOptions onConflictOptions;
571584
private final Priority priority;
585+
private final VersioningOverride versioningOverride;
572586

573587
private WorkflowOptions(
574588
String workflowId,
@@ -592,7 +606,8 @@ private WorkflowOptions(
592606
List<Callback> completionCallbacks,
593607
List<Link> links,
594608
OnConflictOptions onConflictOptions,
595-
Priority priority) {
609+
Priority priority,
610+
VersioningOverride versioningOverride) {
596611
this.workflowId = workflowId;
597612
this.workflowIdReusePolicy = workflowIdReusePolicy;
598613
this.workflowRunTimeout = workflowRunTimeout;
@@ -615,6 +630,7 @@ private WorkflowOptions(
615630
this.links = links;
616631
this.onConflictOptions = onConflictOptions;
617632
this.priority = priority;
633+
this.versioningOverride = versioningOverride;
618634
}
619635

620636
public String getWorkflowId() {
@@ -721,6 +737,11 @@ public Priority getPriority() {
721737
return priority;
722738
}
723739

740+
@Experimental
741+
public VersioningOverride getVersioningOverride() {
742+
return versioningOverride;
743+
}
744+
724745
public Builder toBuilder() {
725746
return new Builder(this);
726747
}
@@ -751,7 +772,8 @@ public boolean equals(Object o) {
751772
&& Objects.equal(completionCallbacks, that.completionCallbacks)
752773
&& Objects.equal(links, that.links)
753774
&& Objects.equal(onConflictOptions, that.onConflictOptions)
754-
&& Objects.equal(priority, that.priority);
775+
&& Objects.equal(priority, that.priority)
776+
&& Objects.equal(versioningOverride, that.versioningOverride);
755777
}
756778

757779
@Override
@@ -778,7 +800,8 @@ public int hashCode() {
778800
completionCallbacks,
779801
links,
780802
onConflictOptions,
781-
priority);
803+
priority,
804+
versioningOverride);
782805
}
783806

784807
@Override
@@ -831,6 +854,8 @@ public String toString() {
831854
+ onConflictOptions
832855
+ ", priority="
833856
+ priority
857+
+ ", versioningOverride="
858+
+ versioningOverride
834859
+ '}';
835860
}
836861
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.temporal.common;
2+
3+
import javax.annotation.Nonnull;
4+
5+
/** Represents the override of a worker's versioning behavior for a workflow execution. */
6+
@Experimental
7+
public abstract class VersioningOverride {
8+
private VersioningOverride() {}
9+
10+
/** Workflow will be pinned to a specific deployment version. */
11+
public static final class PinnedVersioningOverride extends VersioningOverride {
12+
private final WorkerDeploymentVersion version;
13+
14+
/**
15+
* Creates a new PinnedVersioningOverride.
16+
*
17+
* @param version The worker deployment version to pin the workflow to.
18+
*/
19+
public PinnedVersioningOverride(@Nonnull WorkerDeploymentVersion version) {
20+
this.version = version;
21+
}
22+
23+
/**
24+
* @return The worker deployment version to pin the workflow to.
25+
*/
26+
public WorkerDeploymentVersion getVersion() {
27+
return version;
28+
}
29+
}
30+
31+
/** The workflow will auto-upgrade to the current deployment version on the next workflow task. */
32+
public static final class AutoUpgradeVersioningOverride extends VersioningOverride {
33+
public AutoUpgradeVersioningOverride() {}
34+
}
35+
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityInfoImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import io.temporal.api.common.v1.Payloads;
66
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
77
import io.temporal.common.Priority;
8-
import io.temporal.internal.common.PriorityUtils;
8+
import io.temporal.internal.common.ProtoConverters;
99
import io.temporal.internal.common.ProtobufTimeUtils;
1010
import io.temporal.workflow.Functions;
1111
import java.time.Duration;
@@ -138,7 +138,7 @@ public boolean isLocal() {
138138

139139
@Override
140140
public Priority getPriority() {
141-
return PriorityUtils.fromProto(response.getPriority());
141+
return ProtoConverters.fromProto(response.getPriority());
142142
}
143143

144144
@Override

temporal-sdk/src/main/java/io/temporal/internal/client/ScheduleProtoUtil.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import io.temporal.common.converter.DataConverter;
2525
import io.temporal.common.converter.EncodedValues;
2626
import io.temporal.internal.client.external.GenericWorkflowClient;
27-
import io.temporal.internal.common.PriorityUtils;
27+
import io.temporal.internal.common.ProtoConverters;
2828
import io.temporal.internal.common.ProtobufTimeUtils;
2929
import io.temporal.internal.common.RetryOptionsUtils;
3030
import io.temporal.internal.common.SearchAttributesUtil;
@@ -160,7 +160,11 @@ public ScheduleAction actionToProto(io.temporal.client.schedules.ScheduleAction
160160
workflowRequest.setHeader(grpcHeader);
161161

162162
if (wfOptions.getPriority() != null) {
163-
workflowRequest.setPriority(PriorityUtils.toProto(wfOptions.getPriority()));
163+
workflowRequest.setPriority(ProtoConverters.toProto(wfOptions.getPriority()));
164+
}
165+
if (wfOptions.getVersioningOverride() != null) {
166+
workflowRequest.setVersioningOverride(
167+
ProtoConverters.toProto(wfOptions.getVersioningOverride()));
164168
}
165169

166170
return ScheduleAction.newBuilder().setStartWorkflow(workflowRequest.build()).build();
@@ -466,7 +470,7 @@ public io.temporal.client.schedules.ScheduleAction protoToAction(@Nonnull Schedu
466470
}
467471

468472
if (startWfAction.hasPriority()) {
469-
wfOptionsBuilder.setPriority(PriorityUtils.fromProto(startWfAction.getPriority()));
473+
wfOptionsBuilder.setPriority(ProtoConverters.fromProto(startWfAction.getPriority()));
470474
}
471475

472476
builder.setOptions(wfOptionsBuilder.build());

temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.temporal.client.WorkflowOptions;
2020
import io.temporal.common.RetryOptions;
2121
import io.temporal.common.context.ContextPropagator;
22-
import io.temporal.internal.common.PriorityUtils;
22+
import io.temporal.internal.common.ProtoConverters;
2323
import io.temporal.internal.common.ProtobufTimeUtils;
2424
import io.temporal.internal.common.SearchAttributesUtil;
2525
import java.util.*;
@@ -119,7 +119,11 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest(
119119
}
120120

121121
if (options.getPriority() != null) {
122-
request.setPriority(PriorityUtils.toProto(options.getPriority()));
122+
request.setPriority(ProtoConverters.toProto(options.getPriority()));
123+
}
124+
125+
if (options.getVersioningOverride() != null) {
126+
request.setVersioningOverride(ProtoConverters.toProto(options.getVersioningOverride()));
123127
}
124128

125129
if (options.getSearchAttributes() != null && !options.getSearchAttributes().isEmpty()) {
@@ -202,6 +206,10 @@ SignalWithStartWorkflowExecutionRequest.Builder newSignalWithStartWorkflowExecut
202206
request.setUserMetadata(startParameters.getUserMetadata());
203207
}
204208

209+
if (startParameters.hasVersioningOverride()) {
210+
request.setVersioningOverride(startParameters.getVersioningOverride());
211+
}
212+
205213
return request;
206214
}
207215

temporal-sdk/src/main/java/io/temporal/internal/common/PriorityUtils.java

Lines changed: 0 additions & 22 deletions
This file was deleted.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.temporal.internal.common;
2+
3+
import io.temporal.api.common.v1.Priority;
4+
import io.temporal.api.enums.v1.VersioningBehavior;
5+
import io.temporal.common.VersioningOverride;
6+
import io.temporal.common.WorkerDeploymentVersion;
7+
import javax.annotation.Nonnull;
8+
9+
public class ProtoConverters {
10+
public static Priority toProto(io.temporal.common.Priority priority) {
11+
return Priority.newBuilder().setPriorityKey(priority.getPriorityKey()).build();
12+
}
13+
14+
@Nonnull
15+
public static io.temporal.common.Priority fromProto(@Nonnull Priority priority) {
16+
return io.temporal.common.Priority.newBuilder()
17+
.setPriorityKey(priority.getPriorityKey())
18+
.build();
19+
}
20+
21+
public static io.temporal.api.deployment.v1.WorkerDeploymentVersion toProto(
22+
WorkerDeploymentVersion v) {
23+
return io.temporal.api.deployment.v1.WorkerDeploymentVersion.newBuilder()
24+
.setBuildId(v.getBuildId())
25+
.setDeploymentName(v.getDeploymentName())
26+
.build();
27+
}
28+
29+
@SuppressWarnings("deprecation")
30+
public static io.temporal.api.workflow.v1.VersioningOverride toProto(VersioningOverride v) {
31+
if (v instanceof VersioningOverride.PinnedVersioningOverride) {
32+
VersioningOverride.PinnedVersioningOverride pv =
33+
(VersioningOverride.PinnedVersioningOverride) v;
34+
io.temporal.api.workflow.v1.VersioningOverride.PinnedOverride.Builder pinnedBuilder =
35+
io.temporal.api.workflow.v1.VersioningOverride.PinnedOverride.newBuilder()
36+
.setVersion(toProto(pv.getVersion()));
37+
38+
pinnedBuilder.setBehavior(
39+
io.temporal.api.workflow.v1.VersioningOverride.PinnedOverrideBehavior
40+
.PINNED_OVERRIDE_BEHAVIOR_PINNED);
41+
42+
return io.temporal.api.workflow.v1.VersioningOverride.newBuilder()
43+
.setBehavior(VersioningBehavior.VERSIONING_BEHAVIOR_PINNED)
44+
.setPinnedVersion(pv.getVersion().toCanonicalString())
45+
.setPinned(pinnedBuilder.build())
46+
.build();
47+
} else {
48+
return io.temporal.api.workflow.v1.VersioningOverride.newBuilder()
49+
.setBehavior(VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE)
50+
.setAutoUpgrade(true)
51+
.build();
52+
}
53+
}
54+
55+
private ProtoConverters() {}
56+
}

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,12 +260,12 @@ private Result failureToWFTResult(
260260
throws Exception {
261261
String workflowType = workflowTask.getWorkflowType().getName();
262262
if (e instanceof WorkflowExecutionException) {
263+
@SuppressWarnings("deprecation")
263264
RespondWorkflowTaskCompletedRequest response =
264265
RespondWorkflowTaskCompletedRequest.newBuilder()
265266
.setTaskToken(workflowTask.getTaskToken())
266267
.setIdentity(options.getIdentity())
267268
.setNamespace(namespace)
268-
// TODO: Set stamp or not based on capabilities
269269
.setBinaryChecksum(options.getBuildId())
270270
.addCommands(
271271
Command.newBuilder()

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ private void handleSingleEventLookahead(HistoryEvent event) {
357357
case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
358358
WorkflowTaskCompletedEventAttributes completedEvent =
359359
event.getWorkflowTaskCompletedEventAttributes();
360+
@SuppressWarnings("deprecation")
360361
String maybeBuildId = completedEvent.getWorkerVersion().getBuildId();
361362
if (!maybeBuildId.isEmpty()) {
362363
currentTaskBuildId = maybeBuildId;

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ private ExecuteActivityParameters constructExecuteActivityParameters(
608608
makeUserMetaData(options.getSummary(), null, dataConverterWithCurrentWorkflowContext);
609609

610610
if (options.getPriority() != null) {
611-
attributes.setPriority(PriorityUtils.toProto(options.getPriority()));
611+
attributes.setPriority(ProtoConverters.toProto(options.getPriority()));
612612
}
613613

614614
return new ExecuteActivityParameters(attributes, options.getCancellationType(), userMetadata);
@@ -935,7 +935,7 @@ private StartChildWorkflowExecutionParameters createChildWorkflowParameters(
935935
replayContext.getTaskQueue().equals(options.getTaskQueue())));
936936
}
937937
if (options.getPriority() != null) {
938-
attributes.setPriority(PriorityUtils.toProto(options.getPriority()));
938+
attributes.setPriority(ProtoConverters.toProto(options.getPriority()));
939939
}
940940
return new StartChildWorkflowExecutionParameters(
941941
attributes, options.getCancellationType(), metadata);

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import io.temporal.api.common.v1.WorkflowExecution;
55
import io.temporal.common.Priority;
66
import io.temporal.common.RetryOptions;
7-
import io.temporal.internal.common.PriorityUtils;
7+
import io.temporal.internal.common.ProtoConverters;
88
import io.temporal.internal.replay.ReplayWorkflowContext;
99
import io.temporal.workflow.WorkflowInfo;
1010
import java.time.Duration;
@@ -154,7 +154,7 @@ public Optional<String> getCurrentBuildId() {
154154

155155
@Override
156156
public Priority getPriority() {
157-
return PriorityUtils.fromProto(context.getPriority());
157+
return ProtoConverters.fromProto(context.getPriority());
158158
}
159159

160160
@Override

0 commit comments

Comments
 (0)