Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions temporalio/lib/temporalio/common_enums.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ module ContinueAsNewVersioningBehavior
# workflow code.
AUTO_UPGRADE =
Api::Enums::V1::ContinueAsNewVersioningBehavior::CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE
# Use the Ramping Version of the workflow's task queue at start time, regardless of the workflow's
# Target Version (according to f(workflow_id, ramp_percentage)). After the first workflow task completes,
# the workflow will use whatever Versioning Behavior it is annotated with. If there is no Ramping
# Version by the time that the first workflow task is dispatched, it will be sent to the Current Version.
#
# It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
# this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
# is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
# may be the Current Version instead of the Ramping Version.
#
# Note that if the workflow being continued has a Pinned override, that override will be inherited by the
# new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
# command. Versioning Override always takes precedence until it's removed manually via
# UpdateWorkflowExecutionOptions.
USE_RAMPING_VERSION =
Api::Enums::V1::ContinueAsNewVersioningBehavior::CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_USE_RAMPING_VERSION
end

# Specifies why the server suggests continue-as-new. This is currently experimental.
Expand Down
3 changes: 2 additions & 1 deletion temporalio/lib/temporalio/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,8 @@ class ContinueAsNewError < Error
# current workflow.
# @param initial_versioning_behavior [ContinueAsNewVersioningBehavior::enum, nil] Versioning behavior for the
# first task of the new run. Set to {ContinueAsNewVersioningBehavior::AUTO_UPGRADE} to upgrade a pinned workflow
# to the latest version on continue-as-new. This is currently experimental.
# to the latest version on continue-as-new or {ContinueAsNewVersioningBehavior::USE_RAMPING_VERSION} to start on
# the task queue's Ramping Version. This is currently experimental.
def initialize(
*args,
workflow: nil,
Expand Down
3 changes: 2 additions & 1 deletion temporalio/sig/temporalio/common_enums.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module Temporalio

UNSPECIFIED: enum
AUTO_UPGRADE: enum
USE_RAMPING_VERSION: enum
end

module SuggestContinueAsNewReason
Expand All @@ -40,4 +41,4 @@ module Temporalio
PINNED: enum
AUTO_UPGRADE: enum
end
end
end
2 changes: 1 addition & 1 deletion temporalio/test/sig/worker_workflow_versioning_test.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ class WorkerWorkflowVersioningTest < Test
def set_current_deployment_version: (untyped client, String task_queue, Temporalio::WorkerDeploymentVersion version) -> untyped
def set_ramping_version: (untyped client, String task_queue, Temporalio::WorkerDeploymentVersion version, Float rate) -> untyped
def wait_for_workflow_running_on_version: (untyped handle, String expected_build_id) -> void
def wait_for_worker_deployment_routing_config_propagation: (untyped client, String deployment_name, String expected_current_build_id) -> void
def wait_for_worker_deployment_routing_config_propagation: (untyped client, String deployment_name, String expected_current_build_id, ?String expected_ramping_build_id) -> void
end
2 changes: 1 addition & 1 deletion temporalio/test/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def initialize
if target_host.empty?
@server = Temporalio::Testing::WorkflowEnvironment.start_local(
logger: Logger.new($stdout),
dev_server_download_version: 'v1.6.2-server-1.31.0-151.6',
dev_server_download_version: 'v1.7.0',
dev_server_extra_args: [
# Allow continue as new to be immediate
'--dynamic-config-value', 'history.workflowIdReuseMinimalInterval="0s"',
Expand Down
91 changes: 91 additions & 0 deletions temporalio/test/worker_workflow_versioning_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,39 @@ def execute(_attempt)
end
end

class CanRampingVersionWorkflowV1 < Temporalio::Workflow::Definition
workflow_name :ContinueAsNewWithRampingVersion
workflow_versioning_behavior Temporalio::VersioningBehavior::PINNED

def initialize
@continue_as_new = false
end

def execute(attempt)
return 'v1.0' if attempt.positive?

Temporalio::Workflow.wait_condition { @continue_as_new }
raise Temporalio::Workflow::ContinueAsNewError.new(
attempt + 1,
initial_versioning_behavior: Temporalio::ContinueAsNewVersioningBehavior::USE_RAMPING_VERSION
)
end

workflow_signal
def do_continue_as_new
@continue_as_new = true
end
end

class CanRampingVersionWorkflowV2 < Temporalio::Workflow::Definition
workflow_name :ContinueAsNewWithRampingVersion
workflow_versioning_behavior Temporalio::VersioningBehavior::PINNED

def execute(_attempt)
'v2.0'
end
end

def test_continue_as_new_with_version_upgrade
deployment_name = "deployment-can-upgrade-#{SecureRandom.uuid}"
worker_v1 = Temporalio::WorkerDeploymentVersion.new(
Expand Down Expand Up @@ -715,6 +748,64 @@ def test_continue_as_new_with_version_upgrade
end
end

def test_continue_as_new_with_ramping_version
deployment_name = "deployment-can-ramping-#{SecureRandom.uuid}"
worker_v1 = Temporalio::WorkerDeploymentVersion.new(
deployment_name: deployment_name, build_id: '1.0'
)
worker_v2 = Temporalio::WorkerDeploymentVersion.new(
deployment_name: deployment_name, build_id: '2.0'
)

task_queue = "tq-#{SecureRandom.uuid}"

worker1 = Temporalio::Worker.new(
client: env.client,
task_queue: task_queue,
workflows: [CanRampingVersionWorkflowV1],
deployment_options: Temporalio::Worker::DeploymentOptions.new(
version: worker_v1,
use_worker_versioning: true
)
)

worker2 = Temporalio::Worker.new(
client: env.client,
task_queue: task_queue,
workflows: [CanRampingVersionWorkflowV2],
deployment_options: Temporalio::Worker::DeploymentOptions.new(
version: worker_v2,
use_worker_versioning: true
)
)

Temporalio::Worker.run_all(worker1, worker2) do
describe_resp = wait_until_worker_deployment_visible(env.client, worker_v1)
current_resp = set_current_deployment_version(env.client, describe_resp.conflict_token, worker_v1)
wait_for_worker_deployment_routing_config_propagation(env.client, deployment_name, worker_v1.build_id)

handle = env.client.start_workflow(
'ContinueAsNewWithRampingVersion',
0,
id: "test-can-ramping-version-#{SecureRandom.uuid}",
task_queue: task_queue
)
wait_for_workflow_running_on_version(handle, worker_v1.build_id)

wait_until_worker_deployment_visible(env.client, worker_v2)
set_ramping_version(env.client, current_resp.conflict_token, worker_v2, 0.0)
wait_for_worker_deployment_routing_config_propagation(
env.client,
deployment_name,
worker_v1.build_id,
worker_v2.build_id
)

handle.signal(CanRampingVersionWorkflowV1.do_continue_as_new)
assert_equal 'v2.0', handle.result
end
end

def wait_for_workflow_running_on_version(handle, expected_build_id)
assert_eventually do
desc = handle.describe
Expand Down
Loading