Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/common/src/worker-deployments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,25 @@ export type AutoUpgradeVersioningOverride = 'AUTO_UPGRADE';
* behavior, the base version of the new run will be the Target Version as described above, but the
* effective version will be whatever is specified by the Versioning Override until the override is removed.
*
* USE_RAMPING_VERSION - Use the Ramping Version of the workflow's task queue at start time, regardless of the workflow's
* Target Version (according to f(workflow_id, ramp_percentage)). After the first workflow task completes,
* the workflow will use whatever Versioning Behavior it is annotated with. If there is no Ramping
* Version by the time that the first workflow task is dispatched, it will be sent to the Current Version.
*
* It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
* this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
* is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
* may be the Current Version instead of the Ramping Version.
*
* Note that if the workflow being continued has a Pinned override, that override will be inherited by the
* new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
* command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions.
*
* @experimental Versioning semantics with continue-as-new are experimental and may change in the future.
*/
export const InitialVersioningBehavior = {
AUTO_UPGRADE: 'AUTO_UPGRADE',
USE_RAMPING_VERSION: 'USE_RAMPING_VERSION',
} as const;
export type InitialVersioningBehavior = (typeof InitialVersioningBehavior)[keyof typeof InitialVersioningBehavior];

Expand All @@ -94,6 +109,7 @@ export const [encodeInitialVersioningBehavior, decodeInitialVersioningBehavior]
>(
{
[InitialVersioningBehavior.AUTO_UPGRADE]: 1,
[InitialVersioningBehavior.USE_RAMPING_VERSION]: 2,
UNSPECIFIED: 0,
} as const,
'CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_'
Expand Down
2 changes: 1 addition & 1 deletion packages/core-bridge/sdk-core
Submodule sdk-core updated 44 files
+108 −0 crates/client/src/grpc.rs
+2 −0 crates/common/build.rs
+0 −2 crates/common/protos/api_upstream/.github/PULL_REQUEST_TEMPLATE.md
+1 −1 crates/common/protos/api_upstream/.github/workflows/trigger-api-go-update.yml
+3 −1 crates/common/protos/api_upstream/.gitignore
+32 −0 crates/common/protos/api_upstream/nexus-rpc/temporal-proto-models-nexusrpc.yaml
+2,965 −791 crates/common/protos/api_upstream/openapi/openapiv2.json
+2,201 −186 crates/common/protos/api_upstream/openapi/openapiv3.yaml
+24 −0 crates/common/protos/api_upstream/temporal/api/activity/v1/message.proto
+37 −0 crates/common/protos/api_upstream/temporal/api/callback/v1/message.proto
+6 −3 crates/common/protos/api_upstream/temporal/api/command/v1/message.proto
+36 −0 crates/common/protos/api_upstream/temporal/api/common/v1/message.proto
+68 −0 crates/common/protos/api_upstream/temporal/api/compute/v1/config.proto
+35 −0 crates/common/protos/api_upstream/temporal/api/compute/v1/provider.proto
+28 −0 crates/common/protos/api_upstream/temporal/api/compute/v1/scaler.proto
+40 −17 crates/common/protos/api_upstream/temporal/api/deployment/v1/message.proto
+3 −3 crates/common/protos/api_upstream/temporal/api/enums/v1/deployment.proto
+2 −0 crates/common/protos/api_upstream/temporal/api/enums/v1/event_type.proto
+60 −0 crates/common/protos/api_upstream/temporal/api/enums/v1/nexus.proto
+5 −0 crates/common/protos/api_upstream/temporal/api/enums/v1/task_queue.proto
+19 −5 crates/common/protos/api_upstream/temporal/api/enums/v1/workflow.proto
+8 −0 crates/common/protos/api_upstream/temporal/api/errordetails/v1/message.proto
+4 −0 crates/common/protos/api_upstream/temporal/api/failure/v1/message.proto
+51 −4 crates/common/protos/api_upstream/temporal/api/history/v1/message.proto
+150 −1 crates/common/protos/api_upstream/temporal/api/nexus/v1/message.proto
+27 −0 crates/common/protos/api_upstream/temporal/api/nexusservices/workerservice/v1/request_response.proto
+20 −0 crates/common/protos/api_upstream/temporal/api/sdk/v1/external_storage.proto
+5 −1 crates/common/protos/api_upstream/temporal/api/taskqueue/v1/message.proto
+24 −0 crates/common/protos/api_upstream/temporal/api/worker/v1/message.proto
+61 −3 crates/common/protos/api_upstream/temporal/api/workflow/v1/message.proto
+438 −5 crates/common/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto
+207 −7 crates/common/protos/api_upstream/temporal/api/workflowservice/v1/service.proto
+4 −1 crates/common/src/protos/history_builder.rs
+14 −0 crates/common/src/protos/mod.rs
+91 −0 crates/sdk-core-c-bridge/src/client.rs
+4 −0 crates/sdk-core/src/core_tests/workers.rs
+1 −0 crates/sdk-core/src/worker/activities.rs
+10 −0 crates/sdk-core/src/worker/client.rs
+1 −0 crates/sdk-core/src/worker/client/mocks.rs
+1 −0 crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs
+1 −0 crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs
+4 −1 crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs
+5 −1 crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs
+1 −0 crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs
16 changes: 16 additions & 0 deletions packages/core-bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,13 @@ async fn client_invoke_workflow_service(
"CountActivityExecutions" => rpc_call!(connection, call, count_activity_executions),
"CountSchedules" => rpc_call!(connection, call, count_schedules),
"CountWorkflowExecutions" => rpc_call!(connection, call, count_workflow_executions),
"CountNexusOperationExecutions" => rpc_call!(connection, call, count_nexus_operation_executions),
"CreateSchedule" => rpc_call!(connection, call, create_schedule),
"CreateWorkerDeployment" => rpc_call!(connection, call, create_worker_deployment),
"CreateWorkerDeploymentVersion" => rpc_call!(connection, call, create_worker_deployment_version),
"CreateWorkflowRule" => rpc_call!(connection, call, create_workflow_rule),
"DeleteActivityExecution" => rpc_call!(connection, call, delete_activity_execution),
"DeleteNexusOperationExecution" => rpc_call!(connection, call, delete_nexus_operation_execution),
"DeleteSchedule" => rpc_call!(connection, call, delete_schedule),
"DeleteWorkerDeployment" => rpc_call!(connection, call, delete_worker_deployment),
"DeleteWorkerDeploymentVersion" => {
Expand All @@ -277,6 +281,7 @@ async fn client_invoke_workflow_service(
"DescribeBatchOperation" => rpc_call!(connection, call, describe_batch_operation),
"DescribeActivityExecution" => rpc_call!(connection, call, describe_activity_execution),
"DescribeDeployment" => rpc_call!(connection, call, describe_deployment),
"DescribeNexusOperationExecution" => rpc_call!(connection, call, describe_nexus_operation_execution),
"DescribeWorker" => rpc_call!(connection, call, describe_worker),
"DeprecateNamespace" => rpc_call!(connection, call, deprecate_namespace),
"DescribeNamespace" => rpc_call!(connection, call, describe_namespace),
Expand Down Expand Up @@ -316,6 +321,7 @@ async fn client_invoke_workflow_service(
}
"ListDeployments" => rpc_call!(connection, call, list_deployments),
"ListNamespaces" => rpc_call!(connection, call, list_namespaces),
"ListNexusOperationExecutions" => rpc_call!(connection, call, list_nexus_operation_executions),
"ListOpenWorkflowExecutions" => rpc_call!(connection, call, list_open_workflow_executions),
"ListScheduleMatchingTimes" => rpc_call!(connection, call, list_schedule_matching_times),
"ListSchedules" => rpc_call!(connection, call, list_schedules),
Expand All @@ -329,6 +335,7 @@ async fn client_invoke_workflow_service(
"PauseWorkflowExecution" => rpc_call!(connection, call, pause_workflow_execution),
"PollActivityExecution" => rpc_call!(connection, call, poll_activity_execution),
"PollActivityTaskQueue" => rpc_call!(connection, call, poll_activity_task_queue),
"PollNexusOperationExecution" => rpc_call!(connection, call, poll_nexus_operation_execution),
"PollNexusTaskQueue" => rpc_call!(connection, call, poll_nexus_task_queue),
"PollWorkflowExecutionUpdate" => {
rpc_call!(connection, call, poll_workflow_execution_update)
Expand All @@ -346,6 +353,9 @@ async fn client_invoke_workflow_service(
"RequestCancelActivityExecution" => {
rpc_call!(connection, call, request_cancel_activity_execution)
}
"RequestCancelNexusOperationExecution" => {
rpc_call!(connection, call, request_cancel_nexus_operation_execution)
}
"RequestCancelWorkflowExecution" => {
rpc_call!(connection, call, request_cancel_workflow_execution)
}
Expand Down Expand Up @@ -392,8 +402,10 @@ async fn client_invoke_workflow_service(
"StartActivityExecution" => rpc_call!(connection, call, start_activity_execution),
"StartWorkflowExecution" => rpc_call!(connection, call, start_workflow_execution),
"StartBatchOperation" => rpc_call!(connection, call, start_batch_operation),
"StartNexusOperationExecution" => rpc_call!(connection, call, start_nexus_operation_execution),
"StopBatchOperation" => rpc_call!(connection, call, stop_batch_operation),
"TerminateActivityExecution" => rpc_call!(connection, call, terminate_activity_execution),
"TerminateNexusOperationExecution" => rpc_call!(connection, call, terminate_nexus_operation_execution),
"TerminateWorkflowExecution" => rpc_call!(connection, call, terminate_workflow_execution),
"TriggerWorkflowRule" => rpc_call!(connection, call, trigger_workflow_rule),
"UnpauseActivity" => rpc_call!(connection, call, unpause_activity),
Expand All @@ -402,6 +414,7 @@ async fn client_invoke_workflow_service(
"UpdateNamespace" => rpc_call!(connection, call, update_namespace),
"UpdateSchedule" => rpc_call!(connection, call, update_schedule),
"UpdateWorkerConfig" => rpc_call!(connection, call, update_worker_config),
"UpdateWorkerDeploymentVersionComputeConfig" => rpc_call!(connection, call, update_worker_deployment_version_compute_config),
"UpdateWorkerDeploymentVersionMetadata" => {
rpc_call!(connection, call, update_worker_deployment_version_metadata)
}
Expand All @@ -416,6 +429,9 @@ async fn client_invoke_workflow_service(
"UpdateWorkerVersioningRules" => {
rpc_call!(connection, call, update_worker_versioning_rules)
}
"ValidateWorkerDeploymentVersionComputeConfig" => {
rpc_call!(connection, call, validate_worker_deployment_version_compute_config)
}
_ => Err(BridgeError::TypeError {
field: None,
message: format!("Unknown RPC call {}", call.rpc),
Expand Down
26 changes: 25 additions & 1 deletion packages/test/src/deployment-versioning-can-v1/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
import { setWorkflowOptions, workflowInfo, sleep, makeContinueAsNewFunc } from '@temporalio/workflow';
import {
condition,
setHandler,
setWorkflowOptions,
workflowInfo,
sleep,
makeContinueAsNewFunc,
} from '@temporalio/workflow';
import { unblockSignal } from '../workflows';

setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithVersionUpgrade);
export async function continueAsNewWithVersionUpgrade(attempt: number): Promise<string> {
Expand All @@ -17,3 +25,19 @@ export async function continueAsNewWithVersionUpgrade(attempt: number): Promise<
}
}
}

setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithRampingVersion);
export async function continueAsNewWithRampingVersion(attempt: number): Promise<string> {
if (attempt > 0) {
return 'v1.0';
}

let shouldContinue = false;
setHandler(unblockSignal, () => void (shouldContinue = true));
await condition(() => shouldContinue);

const canWithRampingVersion = makeContinueAsNewFunc<typeof continueAsNewWithRampingVersion>({
initialVersioningBehavior: 'USE_RAMPING_VERSION',
});
return await canWithRampingVersion(attempt + 1);
}
6 changes: 6 additions & 0 deletions packages/test/src/deployment-versioning-can-v2/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithVersionUpg
export async function continueAsNewWithVersionUpgrade(attempt: number): Promise<string> {
return 'v2.0';
}

setWorkflowOptions({ versioningBehavior: 'PINNED' }, continueAsNewWithRampingVersion);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export async function continueAsNewWithRampingVersion(attempt: number): Promise<string> {
return 'v2.0';
}
63 changes: 63 additions & 0 deletions packages/test/src/test-worker-deployment-versioning.ts
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,69 @@ test('ContinueAsNew with version upgrade', async (t) => {
await worker2Promise;
});

test('ContinueAsNew with ramping version', async (t) => {
const taskQueue = 'can-ramping-version-' + randomUUID();
const deploymentName = 'deployment-can-ramping-' + randomUUID();
const { client, nativeConnection } = t.context.env;
const { createNativeConnection } = helpers(t);

const v1 = { buildId: '1.0', deploymentName };
const v2 = { buildId: '2.0', deploymentName };

const worker1 = await Worker.create({
workflowsPath: require.resolve('./deployment-versioning-can-v1'),
taskQueue,
workerDeploymentOptions: {
useWorkerVersioning: true,
version: v1,
defaultVersioningBehavior: 'PINNED',
},
connection: nativeConnection,
});
const worker1Promise = worker1.run();
worker1Promise.catch((err) => t.fail('Worker 1.0 error: ' + err));

const worker2Connection = await createNativeConnection();
t.teardown(() => worker2Connection.close());
const worker2 = await Worker.create({
workflowsPath: require.resolve('./deployment-versioning-can-v2'),
taskQueue,
workerDeploymentOptions: {
useWorkerVersioning: true,
version: v2,
defaultVersioningBehavior: 'PINNED',
},
connection: worker2Connection,
});
const worker2Promise = worker2.run();
worker2Promise.catch((err) => t.fail('Worker 2.0 error: ' + err));

const describeResp1 = await waitUntilWorkerDeploymentVisible(client, v1);
const setResp1 = await setCurrentDeploymentVersion(client, describeResp1.conflictToken, v1);
await waitForRoutingConfigPropagation(client, deploymentName, v1.buildId);

const handle = await client.workflow.start('continueAsNewWithRampingVersion', {
args: [0],
taskQueue,
workflowId: 'can-ramping-version-' + randomUUID(),
});
await waitForWorkflowRunningOnVersion(client, handle, v1.buildId);

await waitUntilWorkerDeploymentVisible(client, v2);
await setRampingVersion(client, setResp1.conflictToken, v2, 0);
await waitForRoutingConfigPropagation(client, deploymentName, v1.buildId, v2.buildId);

await handle.signal(unblockSignal);

const result = await handle.result();
t.is(result, 'v2.0');

worker1.shutdown();
worker2.shutdown();
await worker1Promise;
await worker2Promise;
});

///////////////////////////////

/**
Expand Down
Loading