diff --git a/packages/common/src/worker-deployments.ts b/packages/common/src/worker-deployments.ts index e5317d02b..e3f70dc49 100644 --- a/packages/common/src/worker-deployments.ts +++ b/packages/common/src/worker-deployments.ts @@ -78,10 +78,25 @@ export type AutoUpgradeVersioningOverride = 'AUTO_UPGRADE'; * behavior, the base version of the new run will be the Target Version as described above, but the * effective version will be whatever is specified by the Versioning Override until the override is removed. * + * USE_RAMPING_VERSION - Use the Ramping Version of the workflow's task queue at start time, regardless of the workflow's + * Target Version (according to f(workflow_id, ramp_percentage)). After the first workflow task completes, + * the workflow will use whatever Versioning Behavior it is annotated with. If there is no Ramping + * Version by the time that the first workflow task is dispatched, it will be sent to the Current Version. + * + * It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because + * this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow + * is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which + * may be the Current Version instead of the Ramping Version. + * + * Note that if the workflow being continued has a Pinned override, that override will be inherited by the + * new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new + * command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions. + * * @experimental Versioning semantics with continue-as-new are experimental and may change in the future. */ export const InitialVersioningBehavior = { AUTO_UPGRADE: 'AUTO_UPGRADE', + USE_RAMPING_VERSION: 'USE_RAMPING_VERSION', } as const; export type InitialVersioningBehavior = (typeof InitialVersioningBehavior)[keyof typeof InitialVersioningBehavior]; @@ -94,6 +109,7 @@ export const [encodeInitialVersioningBehavior, decodeInitialVersioningBehavior] >( { [InitialVersioningBehavior.AUTO_UPGRADE]: 1, + [InitialVersioningBehavior.USE_RAMPING_VERSION]: 2, UNSPECIFIED: 0, } as const, 'CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_' diff --git a/packages/core-bridge/sdk-core b/packages/core-bridge/sdk-core index 30c4a3135..2872b5363 160000 --- a/packages/core-bridge/sdk-core +++ b/packages/core-bridge/sdk-core @@ -1 +1 @@ -Subproject commit 30c4a3135f617d9d465a6478a8154fefd5466e87 +Subproject commit 2872b5363e1b745cfb90313ebc7a507c5d25c398 diff --git a/packages/core-bridge/src/client.rs b/packages/core-bridge/src/client.rs index b3f560c21..ecc7f949b 100644 --- a/packages/core-bridge/src/client.rs +++ b/packages/core-bridge/src/client.rs @@ -264,9 +264,13 @@ async fn client_invoke_workflow_service( "CountActivityExecutions" => rpc_call!(connection, call, count_activity_executions), "CountSchedules" => rpc_call!(connection, call, count_schedules), "CountWorkflowExecutions" => rpc_call!(connection, call, count_workflow_executions), + "CountNexusOperationExecutions" => rpc_call!(connection, call, count_nexus_operation_executions), "CreateSchedule" => rpc_call!(connection, call, create_schedule), + "CreateWorkerDeployment" => rpc_call!(connection, call, create_worker_deployment), + "CreateWorkerDeploymentVersion" => rpc_call!(connection, call, create_worker_deployment_version), "CreateWorkflowRule" => rpc_call!(connection, call, create_workflow_rule), "DeleteActivityExecution" => rpc_call!(connection, call, delete_activity_execution), + "DeleteNexusOperationExecution" => rpc_call!(connection, call, delete_nexus_operation_execution), "DeleteSchedule" => rpc_call!(connection, call, delete_schedule), "DeleteWorkerDeployment" => rpc_call!(connection, call, delete_worker_deployment), "DeleteWorkerDeploymentVersion" => { @@ -277,6 +281,7 @@ async fn client_invoke_workflow_service( "DescribeBatchOperation" => rpc_call!(connection, call, describe_batch_operation), "DescribeActivityExecution" => rpc_call!(connection, call, describe_activity_execution), "DescribeDeployment" => rpc_call!(connection, call, describe_deployment), + "DescribeNexusOperationExecution" => rpc_call!(connection, call, describe_nexus_operation_execution), "DescribeWorker" => rpc_call!(connection, call, describe_worker), "DeprecateNamespace" => rpc_call!(connection, call, deprecate_namespace), "DescribeNamespace" => rpc_call!(connection, call, describe_namespace), @@ -316,6 +321,7 @@ async fn client_invoke_workflow_service( } "ListDeployments" => rpc_call!(connection, call, list_deployments), "ListNamespaces" => rpc_call!(connection, call, list_namespaces), + "ListNexusOperationExecutions" => rpc_call!(connection, call, list_nexus_operation_executions), "ListOpenWorkflowExecutions" => rpc_call!(connection, call, list_open_workflow_executions), "ListScheduleMatchingTimes" => rpc_call!(connection, call, list_schedule_matching_times), "ListSchedules" => rpc_call!(connection, call, list_schedules), @@ -329,6 +335,7 @@ async fn client_invoke_workflow_service( "PauseWorkflowExecution" => rpc_call!(connection, call, pause_workflow_execution), "PollActivityExecution" => rpc_call!(connection, call, poll_activity_execution), "PollActivityTaskQueue" => rpc_call!(connection, call, poll_activity_task_queue), + "PollNexusOperationExecution" => rpc_call!(connection, call, poll_nexus_operation_execution), "PollNexusTaskQueue" => rpc_call!(connection, call, poll_nexus_task_queue), "PollWorkflowExecutionUpdate" => { rpc_call!(connection, call, poll_workflow_execution_update) @@ -346,6 +353,9 @@ async fn client_invoke_workflow_service( "RequestCancelActivityExecution" => { rpc_call!(connection, call, request_cancel_activity_execution) } + "RequestCancelNexusOperationExecution" => { + rpc_call!(connection, call, request_cancel_nexus_operation_execution) + } "RequestCancelWorkflowExecution" => { rpc_call!(connection, call, request_cancel_workflow_execution) } @@ -392,8 +402,10 @@ async fn client_invoke_workflow_service( "StartActivityExecution" => rpc_call!(connection, call, start_activity_execution), "StartWorkflowExecution" => rpc_call!(connection, call, start_workflow_execution), "StartBatchOperation" => rpc_call!(connection, call, start_batch_operation), + "StartNexusOperationExecution" => rpc_call!(connection, call, start_nexus_operation_execution), "StopBatchOperation" => rpc_call!(connection, call, stop_batch_operation), "TerminateActivityExecution" => rpc_call!(connection, call, terminate_activity_execution), + "TerminateNexusOperationExecution" => rpc_call!(connection, call, terminate_nexus_operation_execution), "TerminateWorkflowExecution" => rpc_call!(connection, call, terminate_workflow_execution), "TriggerWorkflowRule" => rpc_call!(connection, call, trigger_workflow_rule), "UnpauseActivity" => rpc_call!(connection, call, unpause_activity), @@ -402,6 +414,7 @@ async fn client_invoke_workflow_service( "UpdateNamespace" => rpc_call!(connection, call, update_namespace), "UpdateSchedule" => rpc_call!(connection, call, update_schedule), "UpdateWorkerConfig" => rpc_call!(connection, call, update_worker_config), + "UpdateWorkerDeploymentVersionComputeConfig" => rpc_call!(connection, call, update_worker_deployment_version_compute_config), "UpdateWorkerDeploymentVersionMetadata" => { rpc_call!(connection, call, update_worker_deployment_version_metadata) } @@ -416,6 +429,9 @@ async fn client_invoke_workflow_service( "UpdateWorkerVersioningRules" => { rpc_call!(connection, call, update_worker_versioning_rules) } + "ValidateWorkerDeploymentVersionComputeConfig" => { + rpc_call!(connection, call, validate_worker_deployment_version_compute_config) + } _ => Err(BridgeError::TypeError { field: None, message: format!("Unknown RPC call {}", call.rpc), diff --git a/packages/test/src/deployment-versioning-can-v1/index.ts b/packages/test/src/deployment-versioning-can-v1/index.ts index 6825d7b5e..543208521 100644 --- a/packages/test/src/deployment-versioning-can-v1/index.ts +++ b/packages/test/src/deployment-versioning-can-v1/index.ts @@ -1,4 +1,12 @@ -import { setWorkflowOptions, workflowInfo, sleep, makeContinueAsNewFunc } from '@temporalio/workflow'; +import { + condition, + setHandler, + setWorkflowOptions, + workflowInfo, + sleep, + makeContinueAsNewFunc, +} from '@temporalio/workflow'; +import { unblockSignal } from '../workflows'; setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithVersionUpgrade); export async function continueAsNewWithVersionUpgrade(attempt: number): Promise { @@ -17,3 +25,19 @@ export async function continueAsNewWithVersionUpgrade(attempt: number): Promise< } } } + +setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithRampingVersion); +export async function continueAsNewWithRampingVersion(attempt: number): Promise { + if (attempt > 0) { + return 'v1.0'; + } + + let shouldContinue = false; + setHandler(unblockSignal, () => void (shouldContinue = true)); + await condition(() => shouldContinue); + + const canWithRampingVersion = makeContinueAsNewFunc({ + initialVersioningBehavior: 'USE_RAMPING_VERSION', + }); + return await canWithRampingVersion(attempt + 1); +} diff --git a/packages/test/src/deployment-versioning-can-v2/index.ts b/packages/test/src/deployment-versioning-can-v2/index.ts index 1250af538..d92286dc0 100644 --- a/packages/test/src/deployment-versioning-can-v2/index.ts +++ b/packages/test/src/deployment-versioning-can-v2/index.ts @@ -5,3 +5,9 @@ setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithVersionUpg export async function continueAsNewWithVersionUpgrade(attempt: number): Promise { return 'v2.0'; } + +setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithRampingVersion); +// eslint-disable-next-line @typescript-eslint/no-unused-vars +export async function continueAsNewWithRampingVersion(attempt: number): Promise { + return 'v2.0'; +} diff --git a/packages/test/src/test-worker-deployment-versioning.ts b/packages/test/src/test-worker-deployment-versioning.ts index d010bcc92..698c755dd 100644 --- a/packages/test/src/test-worker-deployment-versioning.ts +++ b/packages/test/src/test-worker-deployment-versioning.ts @@ -604,6 +604,69 @@ test('ContinueAsNew with version upgrade', async (t) => { await worker2Promise; }); +test('ContinueAsNew with ramping version', async (t) => { + const taskQueue = 'can-ramping-version-' + randomUUID(); + const deploymentName = 'deployment-can-ramping-' + randomUUID(); + const { client, nativeConnection } = t.context.env; + const { createNativeConnection } = helpers(t); + + const v1 = { buildId: '1.0', deploymentName }; + const v2 = { buildId: '2.0', deploymentName }; + + const worker1 = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-can-v1'), + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: v1, + defaultVersioningBehavior: 'PINNED', + }, + connection: nativeConnection, + }); + const worker1Promise = worker1.run(); + worker1Promise.catch((err) => t.fail('Worker 1.0 error: ' + err)); + + const worker2Connection = await createNativeConnection(); + t.teardown(() => worker2Connection.close()); + const worker2 = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-can-v2'), + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: v2, + defaultVersioningBehavior: 'PINNED', + }, + connection: worker2Connection, + }); + const worker2Promise = worker2.run(); + worker2Promise.catch((err) => t.fail('Worker 2.0 error: ' + err)); + + const describeResp1 = await waitUntilWorkerDeploymentVisible(client, v1); + const setResp1 = await setCurrentDeploymentVersion(client, describeResp1.conflictToken, v1); + await waitForRoutingConfigPropagation(client, deploymentName, v1.buildId); + + const handle = await client.workflow.start('continueAsNewWithRampingVersion', { + args: [0], + taskQueue, + workflowId: 'can-ramping-version-' + randomUUID(), + }); + await waitForWorkflowRunningOnVersion(client, handle, v1.buildId); + + await waitUntilWorkerDeploymentVisible(client, v2); + await setRampingVersion(client, setResp1.conflictToken, v2, 0); + await waitForRoutingConfigPropagation(client, deploymentName, v1.buildId, v2.buildId); + + await handle.signal(unblockSignal); + + const result = await handle.result(); + t.is(result, 'v2.0'); + + worker1.shutdown(); + worker2.shutdown(); + await worker1Promise; + await worker2Promise; +}); + /////////////////////////////// /**