Skip to content

Commit a0f0b1b

Browse files
committed
CaN USE_RAMPING_VERSION versioning behaviour
1 parent 85dde16 commit a0f0b1b

3 files changed

Lines changed: 112 additions & 1 deletion

File tree

temporalio/workflow.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5426,6 +5426,25 @@ class ContinueAsNewVersioningBehavior(IntEnum):
54265426
effective version will be whatever is specified by the Versioning Override until the override is removed.
54275427
"""
54285428

5429+
USE_RAMPING_VERSION = int(
5430+
temporalio.api.enums.v1.ContinueAsNewVersioningBehavior.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_USE_RAMPING_VERSION
5431+
)
5432+
"""Use the Ramping Version of the workflow's task queue at start time, regardless of the workflow's
5433+
Target Version. After the first workflow task completes, the workflow will use whatever Versioning
5434+
Behavior it is annotated with. If there is no Ramping Version by the time that the first workflow task
5435+
is dispatched, it will be sent to the Current Version.
5436+
5437+
It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
5438+
this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
5439+
is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
5440+
may be the Current Version instead of the Ramping Version.
5441+
5442+
Note that if the workflow being continued has a Pinned override, that override will be inherited by the
5443+
new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
5444+
command. Versioning Override always takes precedence until it's removed manually via
5445+
UpdateWorkflowExecutionOptions.
5446+
"""
5447+
54295448

54305449
ServiceT = TypeVar("ServiceT")
54315450

tests/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
DEV_SERVER_DOWNLOAD_VERSION = "v1.6.1-server-1.31.0-151.0"
1+
DEV_SERVER_DOWNLOAD_VERSION = "v1.7.0"

tests/worker/test_worker.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1456,6 +1456,40 @@ async def run(self, attempt: int) -> str: # type:ignore[reportUnusedParameter]
14561456
return "v2.0"
14571457

14581458

1459+
@workflow.defn(
1460+
name="ContinueAsNewWithRampingVersion",
1461+
versioning_behavior=VersioningBehavior.PINNED,
1462+
)
1463+
class ContinueAsNewWithRampingVersionV1:
1464+
def __init__(self) -> None:
1465+
self._should_continue_as_new = False
1466+
1467+
@workflow.run
1468+
async def run(self, attempt: int) -> str:
1469+
if attempt > 0:
1470+
return "v1.0"
1471+
1472+
await workflow.wait_condition(lambda: self._should_continue_as_new)
1473+
workflow.continue_as_new(
1474+
arg=attempt + 1,
1475+
initial_versioning_behavior=workflow.ContinueAsNewVersioningBehavior.USE_RAMPING_VERSION,
1476+
)
1477+
1478+
@workflow.signal
1479+
def do_continue_as_new(self) -> None:
1480+
self._should_continue_as_new = True
1481+
1482+
1483+
@workflow.defn(
1484+
name="ContinueAsNewWithRampingVersion",
1485+
versioning_behavior=VersioningBehavior.PINNED,
1486+
)
1487+
class ContinueAsNewWithRampingVersionV2:
1488+
@workflow.run
1489+
async def run(self, attempt: int) -> str: # type:ignore[reportUnusedParameter]
1490+
return "v2.0"
1491+
1492+
14591493
async def wait_for_workflow_running_on_version(
14601494
handle: WorkflowHandle[Any, Any], expected_build_id: str
14611495
) -> None:
@@ -1545,6 +1579,64 @@ async def test_continue_as_new_with_version_upgrade(
15451579
assert result == "v2.0"
15461580

15471581

1582+
async def test_continue_as_new_with_ramping_version(
1583+
client: Client, env: WorkflowEnvironment
1584+
):
1585+
if env.supports_time_skipping:
1586+
pytest.skip("Test Server doesn't support worker deployments")
1587+
1588+
deployment_name = f"deployment-can-ramping-{uuid.uuid4()}"
1589+
v1 = WorkerDeploymentVersion(deployment_name=deployment_name, build_id="1.0")
1590+
v2 = WorkerDeploymentVersion(deployment_name=deployment_name, build_id="2.0")
1591+
1592+
async with (
1593+
new_worker(
1594+
client,
1595+
ContinueAsNewWithRampingVersionV1,
1596+
deployment_config=WorkerDeploymentConfig(
1597+
version=v1,
1598+
use_worker_versioning=True,
1599+
),
1600+
) as w1,
1601+
new_worker(
1602+
client,
1603+
ContinueAsNewWithRampingVersionV2,
1604+
deployment_config=WorkerDeploymentConfig(
1605+
version=v2,
1606+
use_worker_versioning=True,
1607+
),
1608+
task_queue=w1.task_queue,
1609+
),
1610+
):
1611+
describe_resp = await wait_until_worker_deployment_visible(client, v1)
1612+
1613+
resp2 = await set_current_deployment_version(
1614+
client, describe_resp.conflict_token, v1
1615+
)
1616+
await wait_for_worker_deployment_routing_config_propagation(
1617+
client, deployment_name, v1.build_id
1618+
)
1619+
1620+
handle = await client.start_workflow(
1621+
"ContinueAsNewWithRampingVersion",
1622+
0,
1623+
id=f"test-can-ramping-version-{uuid.uuid4()}",
1624+
task_queue=w1.task_queue,
1625+
)
1626+
await wait_for_workflow_running_on_version(handle, v1.build_id)
1627+
1628+
await wait_until_worker_deployment_visible(client, v2)
1629+
await set_ramping_version(client, resp2.conflict_token, v2, 0)
1630+
await wait_for_worker_deployment_routing_config_propagation(
1631+
client, deployment_name, v1.build_id, v2.build_id
1632+
)
1633+
1634+
await handle.signal(ContinueAsNewWithRampingVersionV1.do_continue_as_new)
1635+
1636+
result = await handle.result()
1637+
assert result == "v2.0"
1638+
1639+
15481640
def test_worker_config_matches_init_params():
15491641
"""WorkerConfig TypedDict keys must match Worker.__init__ kwargs."""
15501642
import inspect

0 commit comments

Comments
 (0)