Skip to content

Commit b0e6180

Browse files
committed
CaN USE_RAMPING_VERSION versioning behaviour
1 parent b2ad2d3 commit b0e6180

4 files changed

Lines changed: 110 additions & 1 deletion

File tree

packages/common/src/worker-deployments.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,25 @@ export type AutoUpgradeVersioningOverride = 'AUTO_UPGRADE';
7878
* behavior, the base version of the new run will be the Target Version as described above, but the
7979
* effective version will be whatever is specified by the Versioning Override until the override is removed.
8080
*
81+
* USE_RAMPING_VERSION - Use the Ramping Version of the workflow's task queue at start time, regardless of the workflow's
82+
* Target Version (according to f(workflow_id, ramp_percentage)). After the first workflow task completes,
83+
* the workflow will use whatever Versioning Behavior it is annotated with. If there is no Ramping
84+
* Version by the time that the first workflow task is dispatched, it will be sent to the Current Version.
85+
*
86+
* It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
87+
* this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
88+
* is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
89+
* may be the Current Version instead of the Ramping Version.
90+
*
91+
* Note that if the workflow being continued has a Pinned override, that override will be inherited by the
92+
* new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
93+
* command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions.
94+
*
8195
* @experimental Versioning semantics with continue-as-new are experimental and may change in the future.
8296
*/
8397
export const InitialVersioningBehavior = {
8498
AUTO_UPGRADE: 'AUTO_UPGRADE',
99+
USE_RAMPING_VERSION: 'USE_RAMPING_VERSION',
85100
} as const;
86101
export type InitialVersioningBehavior = (typeof InitialVersioningBehavior)[keyof typeof InitialVersioningBehavior];
87102

@@ -94,6 +109,7 @@ export const [encodeInitialVersioningBehavior, decodeInitialVersioningBehavior]
94109
>(
95110
{
96111
[InitialVersioningBehavior.AUTO_UPGRADE]: 1,
112+
[InitialVersioningBehavior.USE_RAMPING_VERSION]: 2,
97113
UNSPECIFIED: 0,
98114
} as const,
99115
'CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_'

packages/test/src/deployment-versioning-can-v1/index.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
import { setWorkflowOptions, workflowInfo, sleep, makeContinueAsNewFunc } from '@temporalio/workflow';
1+
import {
2+
condition,
3+
setHandler,
4+
setWorkflowOptions,
5+
workflowInfo,
6+
sleep,
7+
makeContinueAsNewFunc,
8+
} from '@temporalio/workflow';
9+
import { unblockSignal } from '../workflows';
210

311
setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithVersionUpgrade);
412
export async function continueAsNewWithVersionUpgrade(attempt: number): Promise<string> {
@@ -17,3 +25,19 @@ export async function continueAsNewWithVersionUpgrade(attempt: number): Promise<
1725
}
1826
}
1927
}
28+
29+
setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithRampingVersion);
30+
export async function continueAsNewWithRampingVersion(attempt: number): Promise<string> {
31+
if (attempt > 0) {
32+
return 'v1.0';
33+
}
34+
35+
let shouldContinue = false;
36+
setHandler(unblockSignal, () => void (shouldContinue = true));
37+
await condition(() => shouldContinue);
38+
39+
const canWithRampingVersion = makeContinueAsNewFunc<typeof continueAsNewWithRampingVersion>({
40+
initialVersioningBehavior: 'USE_RAMPING_VERSION',
41+
});
42+
return await canWithRampingVersion(attempt + 1);
43+
}

packages/test/src/deployment-versioning-can-v2/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,9 @@ setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithVersionUpg
55
export async function continueAsNewWithVersionUpgrade(attempt: number): Promise<string> {
66
return 'v2.0';
77
}
8+
9+
setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithRampingVersion);
10+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
11+
export async function continueAsNewWithRampingVersion(attempt: number): Promise<string> {
12+
return 'v2.0';
13+
}

packages/test/src/test-worker-deployment-versioning.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,69 @@ test('ContinueAsNew with version upgrade', async (t) => {
604604
await worker2Promise;
605605
});
606606

607+
test('ContinueAsNew with ramping version', async (t) => {
608+
const taskQueue = 'can-ramping-version-' + randomUUID();
609+
const deploymentName = 'deployment-can-ramping-' + randomUUID();
610+
const { client, nativeConnection } = t.context.env;
611+
const { createNativeConnection } = helpers(t);
612+
613+
const v1 = { buildId: '1.0', deploymentName };
614+
const v2 = { buildId: '2.0', deploymentName };
615+
616+
const worker1 = await Worker.create({
617+
workflowsPath: require.resolve('./deployment-versioning-can-v1'),
618+
taskQueue,
619+
workerDeploymentOptions: {
620+
useWorkerVersioning: true,
621+
version: v1,
622+
defaultVersioningBehavior: 'PINNED',
623+
},
624+
connection: nativeConnection,
625+
});
626+
const worker1Promise = worker1.run();
627+
worker1Promise.catch((err) => t.fail('Worker 1.0 error: ' + err));
628+
629+
const worker2Connection = await createNativeConnection();
630+
t.teardown(() => worker2Connection.close());
631+
const worker2 = await Worker.create({
632+
workflowsPath: require.resolve('./deployment-versioning-can-v2'),
633+
taskQueue,
634+
workerDeploymentOptions: {
635+
useWorkerVersioning: true,
636+
version: v2,
637+
defaultVersioningBehavior: 'PINNED',
638+
},
639+
connection: worker2Connection,
640+
});
641+
const worker2Promise = worker2.run();
642+
worker2Promise.catch((err) => t.fail('Worker 2.0 error: ' + err));
643+
644+
const describeResp1 = await waitUntilWorkerDeploymentVisible(client, v1);
645+
const setResp1 = await setCurrentDeploymentVersion(client, describeResp1.conflictToken, v1);
646+
await waitForRoutingConfigPropagation(client, deploymentName, v1.buildId);
647+
648+
const handle = await client.workflow.start('continueAsNewWithRampingVersion', {
649+
args: [0],
650+
taskQueue,
651+
workflowId: 'can-ramping-version-' + randomUUID(),
652+
});
653+
await waitForWorkflowRunningOnVersion(client, handle, v1.buildId);
654+
655+
await waitUntilWorkerDeploymentVisible(client, v2);
656+
await setRampingVersion(client, setResp1.conflictToken, v2, 0);
657+
await waitForRoutingConfigPropagation(client, deploymentName, v1.buildId, v2.buildId);
658+
659+
await handle.signal(unblockSignal);
660+
661+
const result = await handle.result();
662+
t.is(result, 'v2.0');
663+
664+
worker1.shutdown();
665+
worker2.shutdown();
666+
await worker1Promise;
667+
await worker2Promise;
668+
});
669+
607670
///////////////////////////////
608671

609672
/**

0 commit comments

Comments
 (0)