Skip to content

Commit 5e83557

Browse files
committed
CaN USE_RAMPING_VERSION versioning behaviour
1 parent d426e7c commit 5e83557

5 files changed

Lines changed: 135 additions & 5 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ jobs:
8484

8585
- name: Start containerized server and dependencies
8686
env:
87-
TEMPORAL_CLI_VERSION: 1.6.1-server-1.31.0-151.0
87+
TEMPORAL_CLI_VERSION: 1.7.0
8888
run: |
8989
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
9090
tar -xzf temporal_cli.tar.gz

temporal-sdk/src/main/java/io/temporal/common/InitialVersioningBehavior.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,26 @@ public enum InitialVersioningBehavior {
1111
* upgrading to the latest version. After the first workflow task completes, the workflow uses
1212
* whatever versioning behavior is specified in the workflow code.
1313
*/
14-
AUTO_UPGRADE
14+
AUTO_UPGRADE,
15+
16+
/**
17+
* Use the Ramping Version of the workflow's task queue at start time, regardless of the
18+
* workflow's Target Version.
19+
*
20+
* <p>After the first workflow task completes, the workflow uses whatever {@link
21+
* VersioningBehavior} is specified in the workflow code. If there is no Ramping Version by the
22+
* time that the first workflow task is dispatched, it is sent to the Current Version.
23+
*
24+
* <p>It is highly discouraged to use this if the workflow is annotated with {@link
25+
* VersioningBehavior#AUTO_UPGRADE} behavior, because this setting only applies to the first task
26+
* of the workflow. If, after the first task, the workflow is AutoUpgrade, it behaves like a
27+
* normal AutoUpgrade workflow and goes to the Target Version, which may be the Current Version
28+
* instead of the Ramping Version.
29+
*
30+
* <p>Note that if the workflow being continued has a Pinned override, that override is inherited
31+
* by the new workflow run regardless of the {@link InitialVersioningBehavior} specified in the
32+
* continue-as-new command. Versioning Override always takes precedence until it is removed
33+
* manually via UpdateWorkflowExecutionOptions.
34+
*/
35+
USE_RAMPING_VERSION
1536
}

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,6 +1422,11 @@ public void continueAsNew(ContinueAsNewInput input) {
14221422
io.temporal.api.enums.v1.ContinueAsNewVersioningBehavior
14231423
.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE);
14241424
break;
1425+
case USE_RAMPING_VERSION:
1426+
attributes.setInitialVersioningBehavior(
1427+
io.temporal.api.enums.v1.ContinueAsNewVersioningBehavior
1428+
.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_USE_RAMPING_VERSION);
1429+
break;
14251430
}
14261431
}
14271432
}

temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,96 @@ public String execute(int attempt) {
525525
}
526526
}
527527

528+
@WorkflowInterface
529+
public interface ContinueAsNewWithRampingVersionWorkflow {
530+
@WorkflowMethod
531+
String execute(int attempt);
532+
533+
@SignalMethod
534+
void continueAsNew();
535+
}
536+
537+
public static class TestWorkerVersioningCanUseRampingVersionV1
538+
implements ContinueAsNewWithRampingVersionWorkflow {
539+
private boolean continueAsNew;
540+
541+
@Override
542+
@WorkflowVersioningBehavior(VersioningBehavior.PINNED)
543+
public String execute(int attempt) {
544+
if (attempt > 0) {
545+
return "v1.0";
546+
}
547+
Workflow.await(() -> continueAsNew);
548+
ContinueAsNewOptions options =
549+
ContinueAsNewOptions.newBuilder()
550+
.setInitialVersioningBehavior(InitialVersioningBehavior.USE_RAMPING_VERSION)
551+
.build();
552+
ContinueAsNewWithRampingVersionWorkflow next =
553+
Workflow.newContinueAsNewStub(ContinueAsNewWithRampingVersionWorkflow.class, options);
554+
next.execute(attempt + 1);
555+
throw new RuntimeException("unreachable");
556+
}
557+
558+
@Override
559+
public void continueAsNew() {
560+
continueAsNew = true;
561+
}
562+
}
563+
564+
public static class TestWorkerVersioningCanUseRampingVersionV2
565+
implements ContinueAsNewWithRampingVersionWorkflow {
566+
@Override
567+
@WorkflowVersioningBehavior(VersioningBehavior.PINNED)
568+
public String execute(int attempt) {
569+
return "v2.0";
570+
}
571+
572+
@Override
573+
public void continueAsNew() {}
574+
}
575+
576+
@Test
577+
public void testContinueAsNewWithRampingVersion() {
578+
assumeTrue("Test Server doesn't support versioning", SDKTestWorkflowRule.useExternalService);
579+
580+
WorkerDeploymentVersion v1 =
581+
new WorkerDeploymentVersion(testWorkflowRule.getDeploymentName(), "1.0");
582+
WorkerDeploymentVersion v2 =
583+
new WorkerDeploymentVersion(testWorkflowRule.getDeploymentName(), "2.0");
584+
585+
Worker w1 = testWorkflowRule.newWorkerWithBuildID("1.0");
586+
w1.registerWorkflowImplementationTypes(TestWorkerVersioningCanUseRampingVersionV1.class);
587+
w1.start();
588+
589+
Worker w2 = testWorkflowRule.newWorkerWithBuildID("2.0");
590+
w2.registerWorkflowImplementationTypes(TestWorkerVersioningCanUseRampingVersionV2.class);
591+
w2.start();
592+
593+
waitUntilWorkerDeploymentVisible(v1);
594+
DescribeWorkerDeploymentResponse d1 = waitUntilWorkerDeploymentVisible(v2);
595+
SetWorkerDeploymentCurrentVersionResponse currentResp =
596+
setCurrentVersion(v1, d1.getConflictToken());
597+
waitForRoutingConfigPropagation(v1);
598+
599+
ContinueAsNewWithRampingVersionWorkflow wf =
600+
testWorkflowRule.newWorkflowStubTimeoutOptions(
601+
ContinueAsNewWithRampingVersionWorkflow.class, "can-use-ramping-version");
602+
WorkflowExecution we = WorkflowClient.start(wf::execute, 0);
603+
waitForWorkflowRunningOnVersion(we.getWorkflowId(), "1.0");
604+
605+
setRampingVersion(v2, 0, currentResp.getConflictToken());
606+
waitForRoutingConfigPropagation(v1, v2);
607+
608+
wf.continueAsNew();
609+
610+
String result =
611+
testWorkflowRule
612+
.getWorkflowClient()
613+
.newUntypedWorkflowStub(we.getWorkflowId())
614+
.getResult(String.class);
615+
Assert.assertEquals("v2.0", result);
616+
}
617+
528618
@Test
529619
public void testContinueAsNewWithVersionUpgrade() {
530620
assumeTrue("Test Server doesn't support versioning", SDKTestWorkflowRule.useExternalService);
@@ -641,6 +731,12 @@ private SetWorkerDeploymentRampingVersionResponse setRampingVersion(
641731

642732
@SuppressWarnings("deprecation")
643733
private void waitForRoutingConfigPropagation(WorkerDeploymentVersion v) {
734+
waitForRoutingConfigPropagation(v, null);
735+
}
736+
737+
@SuppressWarnings("deprecation")
738+
private void waitForRoutingConfigPropagation(
739+
WorkerDeploymentVersion currentVersion, WorkerDeploymentVersion rampingVersion) {
644740
Eventually.assertEventually(
645741
Duration.ofSeconds(15),
646742
() -> {
@@ -652,14 +748,22 @@ private void waitForRoutingConfigPropagation(WorkerDeploymentVersion v) {
652748
.describeWorkerDeployment(
653749
DescribeWorkerDeploymentRequest.newBuilder()
654750
.setNamespace(testWorkflowRule.getTestEnvironment().getNamespace())
655-
.setDeploymentName(v.getDeploymentName())
751+
.setDeploymentName(currentVersion.getDeploymentName())
656752
.build());
657753
Assert.assertEquals(
658-
v.getBuildId(),
754+
currentVersion.getBuildId(),
659755
resp.getWorkerDeploymentInfo()
660756
.getRoutingConfig()
661757
.getCurrentDeploymentVersion()
662758
.getBuildId());
759+
if (rampingVersion != null) {
760+
Assert.assertEquals(
761+
rampingVersion.getBuildId(),
762+
resp.getWorkerDeploymentInfo()
763+
.getRoutingConfig()
764+
.getRampingDeploymentVersion()
765+
.getBuildId());
766+
}
663767
// Check routing config update is not in progress
664768
int state = resp.getWorkerDeploymentInfo().getRoutingConfigUpdateStateValue();
665769
Assert.assertNotEquals(

0 commit comments

Comments
 (0)