Skip to content

Commit 6e687fc

Browse files
committed
Clean up imports, exercise continue_as_new behavior
1 parent a6b1b1f commit 6e687fc

3 files changed

Lines changed: 62 additions & 22 deletions

File tree

resource_locking/load_workflow.py

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
from typing import Optional
55

66
from temporalio import activity, workflow
7-
from temporalio.client import Client
87

98
from resource_locking.sem_workflow import AssignedResource, SEMAPHORE_WORKFLOW_ID, \
10-
ReleaseRequest, AcquireRequest, SemaphoreWorkflowInput, SEMAPHORE_WORKFLOW_TYPE
9+
ReleaseRequest, AcquireRequest, HandoffRequest
1110

1211

1312
@dataclass
@@ -24,47 +23,63 @@ async def load(input: LoadActivityInput) -> None:
2423

2524
@dataclass
2625
class LoadWorkflowInput:
26+
# If set, this workflow will fail after the "first", "second", or "third" activity.
2727
iteration_to_fail_after: Optional[str]
2828

29+
# If True, this workflow will continue as new after the third activity. The next iteration will run three more
30+
# activities, but will not continue as new. This lets us exercise the handoff logic.
31+
should_continue_as_new: bool
32+
33+
# Used to transfer resource ownership between iterations during continue_as_new
34+
already_owned_resource: Optional[str]
35+
2936
class FailWorkflowException(Exception):
3037
pass
3138

3239
MAX_RESOURCE_WAIT_TIME = timedelta(minutes=5)
3340

41+
def has_timeout(timeout: Optional[timedelta]) -> bool:
42+
return timeout is not None and timeout > timedelta(0)
43+
3444
@workflow.defn(
3545
failure_exception_types=[FailWorkflowException]
3646
)
3747
class LoadWorkflow:
3848

3949
def __init__(self):
40-
self.assigned_resource = None
50+
self.assigned_resource: Optional[str] = None
4151

4252
@workflow.signal(name="assign_resource")
4353
def handle_assign_resource(self, input: AssignedResource):
4454
self.assigned_resource = input.resource
4555

4656
@workflow.run
4757
async def run(self, input: LoadWorkflowInput):
48-
if workflow.info().run_timeout is not None:
58+
workflow.info()
59+
if has_timeout(workflow.info().run_timeout):
4960
# See "locking" comment below for rationale
50-
raise FailWorkflowException(f"LoadWorkflow cannot have a run_timeout")
51-
if workflow.info().execution_timeout is not None:
52-
raise FailWorkflowException(f"LoadWorkflow cannot have an execution_timeout")
61+
raise FailWorkflowException(f"LoadWorkflow cannot have a run_timeout (found {workflow.info().run_timeout})")
62+
if has_timeout(workflow.info().execution_timeout):
63+
raise FailWorkflowException(f"LoadWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout})")
5364

5465
sem_handle = workflow.get_external_workflow_handle(SEMAPHORE_WORKFLOW_ID)
5566

56-
# Ask for a resource...
5767
info = workflow.info()
58-
await sem_handle.signal("acquire_resource", AcquireRequest(info.workflow_id, info.run_id))
59-
60-
# ...and wait for the answer
68+
if input.already_owned_resource is None:
69+
await sem_handle.signal("acquire_resource", AcquireRequest(info.workflow_id, info.run_id))
70+
else:
71+
# If we continued as new, we already have a resource. We need to transfer ownership from our predecessor to
72+
# ourselves.
73+
await sem_handle.signal("handoff_resource", HandoffRequest(input.already_owned_resource, info.workflow_id, info.continued_run_id, info.run_id))
74+
75+
# Both branches above should cause us to receive an "assign_resource" signal.
6176
await workflow.wait_condition(lambda: self.assigned_resource is not None, timeout=MAX_RESOURCE_WAIT_TIME)
6277
if self.assigned_resource is None:
6378
raise FailWorkflowException(f"No resource was assigned after {MAX_RESOURCE_WAIT_TIME}")
6479

65-
# From this point forward, we own the resource. Note that this is a lock, not a lease! Our finally block needs
66-
# to run to free up the resource if an activity fails. This is why we asserted the lack of workflow-level
67-
# timeouts above - they would prevent the finally block from running if there was a timeout.
80+
# From this point forward, we own the resource. Note that this is a lock, not a lease! Our finally block will
81+
# free up the resource if an activity fails. This is why we asserted the lack of workflow-level timeouts
82+
# above - the finally block wouldn't run if there was a timeout.
6883
try:
6984
for iteration in ["first", "second", "third"]:
7085
await workflow.execute_activity(
@@ -76,5 +91,17 @@ async def run(self, input: LoadWorkflowInput):
7691
if iteration == input.iteration_to_fail_after:
7792
workflow.logger.info(f"Failing after iteration {input.iteration_to_fail_after}")
7893
raise FailWorkflowException()
94+
95+
if input.should_continue_as_new:
96+
next_input = LoadWorkflowInput(
97+
iteration_to_fail_after=input.iteration_to_fail_after,
98+
should_continue_as_new=False,
99+
already_owned_resource=self.assigned_resource,
100+
)
101+
workflow.continue_as_new(next_input)
79102
finally:
80-
await sem_handle.signal("release_resource", ReleaseRequest(self.assigned_resource, info.workflow_id, info.run_id))
103+
# Only release the resource if we didn't continue-as-new. workflow.continue_as_new raises to halt workflow
104+
# execution, but the code in this finally block will still run. It wouldn't successfully send the signal...
105+
# the if statement just avoids some warnings in the log.
106+
if not input.should_continue_as_new:
107+
await sem_handle.signal("release_resource", ReleaseRequest(self.assigned_resource, info.workflow_id, info.run_id))

resource_locking/sem_workflow.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dataclasses import dataclass
2-
from datetime import datetime, timedelta
2+
from datetime import timedelta
33

44
from temporalio import workflow
55

@@ -31,7 +31,8 @@ class HandoffRequest:
3131
@dataclass
3232
class SemaphoreWorkflowInput:
3333
# Key is resource, value is users of the resource. The first item in each list is the current holder of the lease
34-
# on that resource.
34+
# on that resource. A similar data structure could allow for multiple holders (perhaps the first n items are the
35+
# current holders).
3536
resource_queues: dict[str, list[AcquireRequest]]
3637

3738
@workflow.defn(
@@ -43,16 +44,21 @@ def __init__(self):
4344

4445
@workflow.signal
4546
async def acquire_resource(self, request: AcquireRequest):
47+
# A real-world version of this workflow probably wants to use more sophisticated load balancing strategies than
48+
# "first free" and "wait for a random one".
49+
4650
for resource in self.resource_queues:
4751
# Naively give out the first free resource, if we have one
4852
if len(self.resource_queues[resource]) == 0:
53+
workflow.logger.info(f"workflow_id={request.workflow_id} run_id={request.run_id} acquired resource {resource}")
4954
self.resource_queues[resource].append(request)
5055
requester = workflow.get_external_workflow_handle(request.workflow_id, run_id=request.run_id)
5156
await requester.signal("assign_resource", AssignedResource(resource))
5257
return
5358

54-
# Otherwise put this resource in a random queue
59+
# Otherwise put this resource in a random queue.
5560
resource = workflow.random().choice(list(self.resource_queues.keys()))
61+
workflow.logger.info(f"workflow_id={request.workflow_id} run_id={request.run_id} is waiting for resource {resource}")
5662
self.resource_queues[resource].append(request)
5763

5864
@workflow.signal
@@ -74,12 +80,14 @@ async def release_resource(self, request: ReleaseRequest):
7480
return
7581

7682
# Remove the current holder from the head of the queue
83+
workflow.logger.info(f"workflow_id={request.workflow_id} run_id={request.run_id} released resource {request.resource}")
7784
queue = queue[1:]
7885
self.resource_queues[request.resource] = queue
7986

8087
# If there are queued requests, assign the resource to the next one
8188
if len(queue) > 0:
8289
next_holder = queue[0]
90+
workflow.logger.info(f"workflow_id={next_holder.workflow_id} run_id={next_holder.run_id} acquired resource {request.resource} after waiting")
8391
requester = workflow.get_external_workflow_handle(next_holder.workflow_id, run_id=next_holder.run_id)
8492
await requester.signal("assign_resource", AssignedResource(request.resource))
8593

@@ -101,7 +109,10 @@ async def handoff_resource(self, request: HandoffRequest):
101109
workflow.logger.warning(f"request was from wf_id={request.workflow_id} run_id={request.old_run_id}")
102110
return
103111

112+
workflow.logger.info(f"workflow_id={request.workflow_id} handed off resource {request.resource} from run_id={request.old_run_id} to run_id={request.new_run_id}")
104113
queue[0] = AcquireRequest(request.workflow_id, request.new_run_id)
114+
requester = workflow.get_external_workflow_handle(request.workflow_id, run_id=request.new_run_id)
115+
await requester.signal("assign_resource", AssignedResource(request.resource))
105116

106117
@workflow.query
107118
def get_current_holders(self) -> dict[str, AcquireRequest]:

resource_locking/starter.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import asyncio
2-
from typing import Optional, Any
2+
from typing import Any
33

44
from temporalio.client import Client, WorkflowHandle, WorkflowFailureError
55

6-
from resource_locking.load_workflow import LoadWorkflow, LoadWorkflowInput, FailWorkflowException
6+
from resource_locking.load_workflow import LoadWorkflow, LoadWorkflowInput
77
from resource_locking.sem_workflow import SemaphoreWorkflow, SemaphoreWorkflowInput, SEMAPHORE_WORKFLOW_ID
88

99

@@ -24,9 +24,11 @@ async def main():
2424

2525
load_handles: list[WorkflowHandle[Any, Any]] = []
2626
for i in range(0, 4):
27-
input = LoadWorkflowInput(iteration_to_fail_after=None)
27+
input = LoadWorkflowInput(iteration_to_fail_after=None, should_continue_as_new=False, already_owned_resource=None)
28+
if i == 0:
29+
input.should_continue_as_new = True
2830
if i == 1:
29-
input = LoadWorkflowInput(iteration_to_fail_after="first")
31+
input.iteration_to_fail_after = "first"
3032

3133
load_handle = await client.start_workflow(
3234
workflow=LoadWorkflow.run,

0 commit comments

Comments
 (0)