Skip to content

Commit 6065780

Browse files
authored
CaN USE_RAMPING_VERSION versioning behaviour (#2034)
1 parent b2ad2d3 commit 6065780

6 files changed

Lines changed: 127 additions & 2 deletions

File tree

packages/common/src/worker-deployments.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,25 @@ export type AutoUpgradeVersioningOverride = 'AUTO_UPGRADE';
7878
* behavior, the base version of the new run will be the Target Version as described above, but the
7979
* effective version will be whatever is specified by the Versioning Override until the override is removed.
8080
*
81+
* USE_RAMPING_VERSION - Use the Ramping Version of the workflow's task queue at start time, regardless of the workflow's
82+
* Target Version (according to f(workflow_id, ramp_percentage)). After the first workflow task completes,
83+
* the workflow will use whatever Versioning Behavior it is annotated with. If there is no Ramping
84+
* Version by the time that the first workflow task is dispatched, it will be sent to the Current Version.
85+
*
86+
* It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
87+
* this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
88+
* is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
89+
* may be the Current Version instead of the Ramping Version.
90+
*
91+
* Note that if the workflow being continued has a Pinned override, that override will be inherited by the
92+
* new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
93+
* command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions.
94+
*
8195
* @experimental Versioning semantics with continue-as-new are experimental and may change in the future.
8296
*/
8397
export const InitialVersioningBehavior = {
8498
AUTO_UPGRADE: 'AUTO_UPGRADE',
99+
USE_RAMPING_VERSION: 'USE_RAMPING_VERSION',
85100
} as const;
86101
export type InitialVersioningBehavior = (typeof InitialVersioningBehavior)[keyof typeof InitialVersioningBehavior];
87102

@@ -94,6 +109,7 @@ export const [encodeInitialVersioningBehavior, decodeInitialVersioningBehavior]
94109
>(
95110
{
96111
[InitialVersioningBehavior.AUTO_UPGRADE]: 1,
112+
[InitialVersioningBehavior.USE_RAMPING_VERSION]: 2,
97113
UNSPECIFIED: 0,
98114
} as const,
99115
'CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_'

packages/core-bridge/sdk-core

Submodule sdk-core updated 44 files

packages/core-bridge/src/client.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,13 @@ async fn client_invoke_workflow_service(
264264
"CountActivityExecutions" => rpc_call!(connection, call, count_activity_executions),
265265
"CountSchedules" => rpc_call!(connection, call, count_schedules),
266266
"CountWorkflowExecutions" => rpc_call!(connection, call, count_workflow_executions),
267+
"CountNexusOperationExecutions" => rpc_call!(connection, call, count_nexus_operation_executions),
267268
"CreateSchedule" => rpc_call!(connection, call, create_schedule),
269+
"CreateWorkerDeployment" => rpc_call!(connection, call, create_worker_deployment),
270+
"CreateWorkerDeploymentVersion" => rpc_call!(connection, call, create_worker_deployment_version),
268271
"CreateWorkflowRule" => rpc_call!(connection, call, create_workflow_rule),
269272
"DeleteActivityExecution" => rpc_call!(connection, call, delete_activity_execution),
273+
"DeleteNexusOperationExecution" => rpc_call!(connection, call, delete_nexus_operation_execution),
270274
"DeleteSchedule" => rpc_call!(connection, call, delete_schedule),
271275
"DeleteWorkerDeployment" => rpc_call!(connection, call, delete_worker_deployment),
272276
"DeleteWorkerDeploymentVersion" => {
@@ -277,6 +281,7 @@ async fn client_invoke_workflow_service(
277281
"DescribeBatchOperation" => rpc_call!(connection, call, describe_batch_operation),
278282
"DescribeActivityExecution" => rpc_call!(connection, call, describe_activity_execution),
279283
"DescribeDeployment" => rpc_call!(connection, call, describe_deployment),
284+
"DescribeNexusOperationExecution" => rpc_call!(connection, call, describe_nexus_operation_execution),
280285
"DescribeWorker" => rpc_call!(connection, call, describe_worker),
281286
"DeprecateNamespace" => rpc_call!(connection, call, deprecate_namespace),
282287
"DescribeNamespace" => rpc_call!(connection, call, describe_namespace),
@@ -316,6 +321,7 @@ async fn client_invoke_workflow_service(
316321
}
317322
"ListDeployments" => rpc_call!(connection, call, list_deployments),
318323
"ListNamespaces" => rpc_call!(connection, call, list_namespaces),
324+
"ListNexusOperationExecutions" => rpc_call!(connection, call, list_nexus_operation_executions),
319325
"ListOpenWorkflowExecutions" => rpc_call!(connection, call, list_open_workflow_executions),
320326
"ListScheduleMatchingTimes" => rpc_call!(connection, call, list_schedule_matching_times),
321327
"ListSchedules" => rpc_call!(connection, call, list_schedules),
@@ -329,6 +335,7 @@ async fn client_invoke_workflow_service(
329335
"PauseWorkflowExecution" => rpc_call!(connection, call, pause_workflow_execution),
330336
"PollActivityExecution" => rpc_call!(connection, call, poll_activity_execution),
331337
"PollActivityTaskQueue" => rpc_call!(connection, call, poll_activity_task_queue),
338+
"PollNexusOperationExecution" => rpc_call!(connection, call, poll_nexus_operation_execution),
332339
"PollNexusTaskQueue" => rpc_call!(connection, call, poll_nexus_task_queue),
333340
"PollWorkflowExecutionUpdate" => {
334341
rpc_call!(connection, call, poll_workflow_execution_update)
@@ -346,6 +353,9 @@ async fn client_invoke_workflow_service(
346353
"RequestCancelActivityExecution" => {
347354
rpc_call!(connection, call, request_cancel_activity_execution)
348355
}
356+
"RequestCancelNexusOperationExecution" => {
357+
rpc_call!(connection, call, request_cancel_nexus_operation_execution)
358+
}
349359
"RequestCancelWorkflowExecution" => {
350360
rpc_call!(connection, call, request_cancel_workflow_execution)
351361
}
@@ -392,8 +402,10 @@ async fn client_invoke_workflow_service(
392402
"StartActivityExecution" => rpc_call!(connection, call, start_activity_execution),
393403
"StartWorkflowExecution" => rpc_call!(connection, call, start_workflow_execution),
394404
"StartBatchOperation" => rpc_call!(connection, call, start_batch_operation),
405+
"StartNexusOperationExecution" => rpc_call!(connection, call, start_nexus_operation_execution),
395406
"StopBatchOperation" => rpc_call!(connection, call, stop_batch_operation),
396407
"TerminateActivityExecution" => rpc_call!(connection, call, terminate_activity_execution),
408+
"TerminateNexusOperationExecution" => rpc_call!(connection, call, terminate_nexus_operation_execution),
397409
"TerminateWorkflowExecution" => rpc_call!(connection, call, terminate_workflow_execution),
398410
"TriggerWorkflowRule" => rpc_call!(connection, call, trigger_workflow_rule),
399411
"UnpauseActivity" => rpc_call!(connection, call, unpause_activity),
@@ -402,6 +414,7 @@ async fn client_invoke_workflow_service(
402414
"UpdateNamespace" => rpc_call!(connection, call, update_namespace),
403415
"UpdateSchedule" => rpc_call!(connection, call, update_schedule),
404416
"UpdateWorkerConfig" => rpc_call!(connection, call, update_worker_config),
417+
"UpdateWorkerDeploymentVersionComputeConfig" => rpc_call!(connection, call, update_worker_deployment_version_compute_config),
405418
"UpdateWorkerDeploymentVersionMetadata" => {
406419
rpc_call!(connection, call, update_worker_deployment_version_metadata)
407420
}
@@ -416,6 +429,9 @@ async fn client_invoke_workflow_service(
416429
"UpdateWorkerVersioningRules" => {
417430
rpc_call!(connection, call, update_worker_versioning_rules)
418431
}
432+
"ValidateWorkerDeploymentVersionComputeConfig" => {
433+
rpc_call!(connection, call, validate_worker_deployment_version_compute_config)
434+
}
419435
_ => Err(BridgeError::TypeError {
420436
field: None,
421437
message: format!("Unknown RPC call {}", call.rpc),

packages/test/src/deployment-versioning-can-v1/index.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
import { setWorkflowOptions, workflowInfo, sleep, makeContinueAsNewFunc } from '@temporalio/workflow';
1+
import {
2+
condition,
3+
setHandler,
4+
setWorkflowOptions,
5+
workflowInfo,
6+
sleep,
7+
makeContinueAsNewFunc,
8+
} from '@temporalio/workflow';
9+
import { unblockSignal } from '../workflows';
210

311
setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithVersionUpgrade);
412
export async function continueAsNewWithVersionUpgrade(attempt: number): Promise<string> {
@@ -17,3 +25,19 @@ export async function continueAsNewWithVersionUpgrade(attempt: number): Promise<
1725
}
1826
}
1927
}
28+
29+
setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithRampingVersion);
30+
export async function continueAsNewWithRampingVersion(attempt: number): Promise<string> {
31+
if (attempt > 0) {
32+
return 'v1.0';
33+
}
34+
35+
let shouldContinue = false;
36+
setHandler(unblockSignal, () => void (shouldContinue = true));
37+
await condition(() => shouldContinue);
38+
39+
const canWithRampingVersion = makeContinueAsNewFunc<typeof continueAsNewWithRampingVersion>({
40+
initialVersioningBehavior: 'USE_RAMPING_VERSION',
41+
});
42+
return await canWithRampingVersion(attempt + 1);
43+
}

packages/test/src/deployment-versioning-can-v2/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,9 @@ setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithVersionUpg
55
export async function continueAsNewWithVersionUpgrade(attempt: number): Promise<string> {
66
return 'v2.0';
77
}
8+
9+
setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithRampingVersion);
10+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
11+
export async function continueAsNewWithRampingVersion(attempt: number): Promise<string> {
12+
return 'v2.0';
13+
}

packages/test/src/test-worker-deployment-versioning.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,69 @@ test('ContinueAsNew with version upgrade', async (t) => {
604604
await worker2Promise;
605605
});
606606

607+
test('ContinueAsNew with ramping version', async (t) => {
608+
const taskQueue = 'can-ramping-version-' + randomUUID();
609+
const deploymentName = 'deployment-can-ramping-' + randomUUID();
610+
const { client, nativeConnection } = t.context.env;
611+
const { createNativeConnection } = helpers(t);
612+
613+
const v1 = { buildId: '1.0', deploymentName };
614+
const v2 = { buildId: '2.0', deploymentName };
615+
616+
const worker1 = await Worker.create({
617+
workflowsPath: require.resolve('./deployment-versioning-can-v1'),
618+
taskQueue,
619+
workerDeploymentOptions: {
620+
useWorkerVersioning: true,
621+
version: v1,
622+
defaultVersioningBehavior: 'PINNED',
623+
},
624+
connection: nativeConnection,
625+
});
626+
const worker1Promise = worker1.run();
627+
worker1Promise.catch((err) => t.fail('Worker 1.0 error: ' + err));
628+
629+
const worker2Connection = await createNativeConnection();
630+
t.teardown(() => worker2Connection.close());
631+
const worker2 = await Worker.create({
632+
workflowsPath: require.resolve('./deployment-versioning-can-v2'),
633+
taskQueue,
634+
workerDeploymentOptions: {
635+
useWorkerVersioning: true,
636+
version: v2,
637+
defaultVersioningBehavior: 'PINNED',
638+
},
639+
connection: worker2Connection,
640+
});
641+
const worker2Promise = worker2.run();
642+
worker2Promise.catch((err) => t.fail('Worker 2.0 error: ' + err));
643+
644+
const describeResp1 = await waitUntilWorkerDeploymentVisible(client, v1);
645+
const setResp1 = await setCurrentDeploymentVersion(client, describeResp1.conflictToken, v1);
646+
await waitForRoutingConfigPropagation(client, deploymentName, v1.buildId);
647+
648+
const handle = await client.workflow.start('continueAsNewWithRampingVersion', {
649+
args: [0],
650+
taskQueue,
651+
workflowId: 'can-ramping-version-' + randomUUID(),
652+
});
653+
await waitForWorkflowRunningOnVersion(client, handle, v1.buildId);
654+
655+
await waitUntilWorkerDeploymentVisible(client, v2);
656+
await setRampingVersion(client, setResp1.conflictToken, v2, 0);
657+
await waitForRoutingConfigPropagation(client, deploymentName, v1.buildId, v2.buildId);
658+
659+
await handle.signal(unblockSignal);
660+
661+
const result = await handle.result();
662+
t.is(result, 'v2.0');
663+
664+
worker1.shutdown();
665+
worker2.shutdown();
666+
await worker1Promise;
667+
await worker2Promise;
668+
});
669+
607670
///////////////////////////////
608671

609672
/**

0 commit comments

Comments
 (0)