Skip to content

Commit 2dee8ec

Browse files
authored
CaN USE_RAMPING_VERSION versioning behaviour (#429)
* CaN USE_RAMPING_VERSION versioning behaviour * fix lint
1 parent c136df6 commit 2dee8ec

6 files changed

Lines changed: 113 additions & 4 deletions

File tree

temporalio/lib/temporalio/common_enums.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,22 @@ module ContinueAsNewVersioningBehavior
5050
# workflow code.
5151
AUTO_UPGRADE =
5252
Api::Enums::V1::ContinueAsNewVersioningBehavior::CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE
53+
# Use the Ramping Version of the workflow's task queue at start time, regardless of the workflow's
54+
# Target Version (according to f(workflow_id, ramp_percentage)). After the first workflow task completes,
55+
# the workflow will use whatever Versioning Behavior it is annotated with. If there is no Ramping
56+
# Version by the time that the first workflow task is dispatched, it will be sent to the Current Version.
57+
#
58+
# It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
59+
# this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
60+
# is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
61+
# may be the Current Version instead of the Ramping Version.
62+
#
63+
# Note that if the workflow being continued has a Pinned override, that override will be inherited by the
64+
# new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
65+
# command. Versioning Override always takes precedence until it's removed manually via
66+
# UpdateWorkflowExecutionOptions.
67+
USE_RAMPING_VERSION =
68+
Api::Enums::V1::ContinueAsNewVersioningBehavior::CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_USE_RAMPING_VERSION
5369
end
5470

5571
# Specifies why the server suggests continue-as-new. This is currently experimental.

temporalio/lib/temporalio/workflow.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,8 @@ class ContinueAsNewError < Error
672672
# current workflow.
673673
# @param initial_versioning_behavior [ContinueAsNewVersioningBehavior::enum, nil] Versioning behavior for the
674674
# first task of the new run. Set to {ContinueAsNewVersioningBehavior::AUTO_UPGRADE} to upgrade a pinned workflow
675-
# to the latest version on continue-as-new. This is currently experimental.
675+
# to the latest version on continue-as-new or {ContinueAsNewVersioningBehavior::USE_RAMPING_VERSION} to start on
676+
# the task queue's Ramping Version. This is currently experimental.
676677
def initialize(
677678
*args,
678679
workflow: nil,

temporalio/sig/temporalio/common_enums.rbs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ module Temporalio
2222

2323
UNSPECIFIED: enum
2424
AUTO_UPGRADE: enum
25+
USE_RAMPING_VERSION: enum
2526
end
2627

2728
module SuggestContinueAsNewReason
@@ -40,4 +41,4 @@ module Temporalio
4041
PINNED: enum
4142
AUTO_UPGRADE: enum
4243
end
43-
end
44+
end

temporalio/test/sig/worker_workflow_versioning_test.rbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ class WorkerWorkflowVersioningTest < Test
44
def set_current_deployment_version: (untyped client, String task_queue, Temporalio::WorkerDeploymentVersion version) -> untyped
55
def set_ramping_version: (untyped client, String task_queue, Temporalio::WorkerDeploymentVersion version, Float rate) -> untyped
66
def wait_for_workflow_running_on_version: (untyped handle, String expected_build_id) -> void
7-
def wait_for_worker_deployment_routing_config_propagation: (untyped client, String deployment_name, String expected_current_build_id) -> void
7+
def wait_for_worker_deployment_routing_config_propagation: (untyped client, String deployment_name, String expected_current_build_id, ?String expected_ramping_build_id) -> void
88
end

temporalio/test/test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def initialize
147147
if target_host.empty?
148148
@server = Temporalio::Testing::WorkflowEnvironment.start_local(
149149
logger: Logger.new($stdout),
150-
dev_server_download_version: 'v1.6.2-server-1.31.0-151.6',
150+
dev_server_download_version: 'v1.7.0',
151151
dev_server_extra_args: [
152152
# Allow continue as new to be immediate
153153
'--dynamic-config-value', 'history.workflowIdReuseMinimalInterval="0s"',

temporalio/test/worker_workflow_versioning_test.rb

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,39 @@ def execute(_attempt)
650650
end
651651
end
652652

653+
class CanRampingVersionWorkflowV1 < Temporalio::Workflow::Definition
654+
workflow_name :ContinueAsNewWithRampingVersion
655+
workflow_versioning_behavior Temporalio::VersioningBehavior::PINNED
656+
657+
def initialize
658+
@continue_as_new = false
659+
end
660+
661+
def execute(attempt)
662+
return 'v1.0' if attempt.positive?
663+
664+
Temporalio::Workflow.wait_condition { @continue_as_new }
665+
raise Temporalio::Workflow::ContinueAsNewError.new(
666+
attempt + 1,
667+
initial_versioning_behavior: Temporalio::ContinueAsNewVersioningBehavior::USE_RAMPING_VERSION
668+
)
669+
end
670+
671+
workflow_signal
672+
def do_continue_as_new
673+
@continue_as_new = true
674+
end
675+
end
676+
677+
class CanRampingVersionWorkflowV2 < Temporalio::Workflow::Definition
678+
workflow_name :ContinueAsNewWithRampingVersion
679+
workflow_versioning_behavior Temporalio::VersioningBehavior::PINNED
680+
681+
def execute(_attempt)
682+
'v2.0'
683+
end
684+
end
685+
653686
def test_continue_as_new_with_version_upgrade
654687
deployment_name = "deployment-can-upgrade-#{SecureRandom.uuid}"
655688
worker_v1 = Temporalio::WorkerDeploymentVersion.new(
@@ -715,6 +748,64 @@ def test_continue_as_new_with_version_upgrade
715748
end
716749
end
717750

751+
def test_continue_as_new_with_ramping_version
752+
deployment_name = "deployment-can-ramping-#{SecureRandom.uuid}"
753+
worker_v1 = Temporalio::WorkerDeploymentVersion.new(
754+
deployment_name: deployment_name, build_id: '1.0'
755+
)
756+
worker_v2 = Temporalio::WorkerDeploymentVersion.new(
757+
deployment_name: deployment_name, build_id: '2.0'
758+
)
759+
760+
task_queue = "tq-#{SecureRandom.uuid}"
761+
762+
worker1 = Temporalio::Worker.new(
763+
client: env.client,
764+
task_queue: task_queue,
765+
workflows: [CanRampingVersionWorkflowV1],
766+
deployment_options: Temporalio::Worker::DeploymentOptions.new(
767+
version: worker_v1,
768+
use_worker_versioning: true
769+
)
770+
)
771+
772+
worker2 = Temporalio::Worker.new(
773+
client: env.client,
774+
task_queue: task_queue,
775+
workflows: [CanRampingVersionWorkflowV2],
776+
deployment_options: Temporalio::Worker::DeploymentOptions.new(
777+
version: worker_v2,
778+
use_worker_versioning: true
779+
)
780+
)
781+
782+
Temporalio::Worker.run_all(worker1, worker2) do
783+
describe_resp = wait_until_worker_deployment_visible(env.client, worker_v1)
784+
current_resp = set_current_deployment_version(env.client, describe_resp.conflict_token, worker_v1)
785+
wait_for_worker_deployment_routing_config_propagation(env.client, deployment_name, worker_v1.build_id)
786+
787+
handle = env.client.start_workflow(
788+
'ContinueAsNewWithRampingVersion',
789+
0,
790+
id: "test-can-ramping-version-#{SecureRandom.uuid}",
791+
task_queue: task_queue
792+
)
793+
wait_for_workflow_running_on_version(handle, worker_v1.build_id)
794+
795+
wait_until_worker_deployment_visible(env.client, worker_v2)
796+
set_ramping_version(env.client, current_resp.conflict_token, worker_v2, 0.0)
797+
wait_for_worker_deployment_routing_config_propagation(
798+
env.client,
799+
deployment_name,
800+
worker_v1.build_id,
801+
worker_v2.build_id
802+
)
803+
804+
handle.signal(CanRampingVersionWorkflowV1.do_continue_as_new)
805+
assert_equal 'v2.0', handle.result
806+
end
807+
end
808+
718809
def wait_for_workflow_running_on_version(handle, expected_build_id)
719810
assert_eventually do
720811
desc = handle.describe

0 commit comments

Comments
 (0)