Skip to content

Commit ebbf0ed

Browse files
committed
Rename the workflows for clarity, improve README
1 parent 6e687fc commit ebbf0ed

5 files changed

Lines changed: 79 additions & 63 deletions

File tree

resource_locking/README.md

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
# Semaphore Sample
1+
# Resource Locking Sample
22

3-
This sample shows how to use a long-lived `semaphore_workflow` to ensure that each `resource` is used by at most one
4-
`load_workflow` at a time. `load_workflow` runs several activities while it has ownership of a resource.
3+
This sample shows how to use a long-lived `LockManagerWorkflow` to ensure that each `resource` is used by at most one
4+
`ResourceLockingWorkflow` at a time. `ResourceLockingWorkflow` runs several activities while it has ownership of a
5+
resource.
56

67
Run the following from this directory to start the worker:
78

@@ -11,21 +12,36 @@ This will start the worker. Then, in another terminal, run the following to exec
1112

1213
poetry run python starter.py
1314

14-
You should see output indicating that the semaphore workflow serialized access to each resource.
15+
You should see output indicating that the LockManagerWorkflow serialized access to each resource.
1516

1617
You can query the set of current lock holders with:
1718

18-
tctl wf query -w semaphore --qt get_current_holders
19+
tctl wf query -w lock_manager --qt get_current_holders
20+
21+
# Other approaches
22+
23+
There are simpler ways to manage concurrent access to resources. Consider using resource-specific workers/task queues,
24+
and limiting the number of activity slots on the workers. The golang SDK also [sessions](https://docs.temporal.io/develop/go/sessions)
25+
that allow workflows to pin themselves to workers.
26+
27+
The technique in this sample is capable of more complex resource locking than the options above, but it doesn't scale
28+
as well. Specifically, it can:
29+
- Manage access to a set of resources that is decoupled from the set of workers and task queues
30+
- Run arbitrary code to place workloads on resources as they become available
1931

2032
# Caveats
2133

2234
This sample uses true locking (not leasing!) to avoid complexity and scaling concerns associated with heartbeating via
23-
signals. Locking carries the risk of a "leak" (failure to unlock) permanently removing a resource from the pool. With
24-
Temporal's durabile execution guarantees, this can only happen if:
35+
signals. Locking carries a risk where failure to unlock permanently removing a resource from the pool. However, with
36+
Temporal's durable execution guarantees, this can only happen if:
2537

2638
- A LoadWorkflow times out (prohibited in the sample code)
27-
- You shut down your workers, and never restart them (unhandled, but probably irrelevant)
39+
- You shut down your workers and never restart them (unhandled, but irrelevant)
40+
41+
If a leak were to happen, you could discover the identity of the leaker using the query above, then:
2842

29-
If a leak were to happen in the wild, you could discover the identity of the leaker using the query above, then:
43+
tctl wf signal -w lock_manager --name release_resource --input '{ "resource": "the resource", "workflow_id": "holder workflow id", "run_id": "holder run id" }'
3044

31-
tctl wf signal -w semaphore --name release_resource --input '{ "resource": "the resource", "workflow_id": "holder workflow id", "run_id": "holder run id" }'
45+
Performance: A single LockManagerWorkflow scales to tens, but not hundreds, of lock/unlock events per second. It is
46+
best suited for locking resources during long-running workflows. Actual performance will depend on your temporal
47+
server's persistence layer.
Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,17 @@ class HandoffRequest:
2525
old_run_id: str
2626
new_run_id: str
2727

28-
SEMAPHORE_WORKFLOW_TYPE = "semaphore-wf"
29-
SEMAPHORE_WORKFLOW_ID = "semaphore"
28+
LOCK_MANAGER_WORKFLOW_ID = "lock_manager"
3029

3130
@dataclass
32-
class SemaphoreWorkflowInput:
31+
class LockManagerWorkflowInput:
3332
# Key is resource, value is users of the resource. The first item in each list is the current holder of the lease
3433
# on that resource. A similar data structure could allow for multiple holders (perhaps the first n items are the
3534
# current holders).
3635
resource_queues: dict[str, list[AcquireRequest]]
3736

38-
@workflow.defn(
39-
name=SEMAPHORE_WORKFLOW_TYPE,
40-
)
41-
class SemaphoreWorkflow:
37+
@workflow.defn
38+
class LockManagerWorkflow:
4239
def __init__(self):
4340
self.resource_queues: dict[str, list[AcquireRequest]] = {}
4441

@@ -119,7 +116,7 @@ def get_current_holders(self) -> dict[str, AcquireRequest]:
119116
return { k: v[0] if v else None for k, v in self.resource_queues.items() }
120117

121118
@workflow.run
122-
async def run(self, input: SemaphoreWorkflowInput) -> None:
119+
async def run(self, input: LockManagerWorkflowInput) -> None:
123120
self.resource_queues = input.resource_queues
124121

125122
# Continue as new either when temporal tells us to, or every 12 hours (so it occurs semi-frequently)
@@ -128,4 +125,4 @@ async def run(self, input: SemaphoreWorkflowInput) -> None:
128125
timeout=timedelta(hours=12),
129126
)
130127

131-
workflow.continue_as_new(SemaphoreWorkflowInput(self.resource_queues))
128+
workflow.continue_as_new(LockManagerWorkflowInput(self.resource_queues))

resource_locking/load_workflow.py renamed to resource_locking/resource_locking_workflow.py

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,23 @@
55

66
from temporalio import activity, workflow
77

8-
from resource_locking.sem_workflow import AssignedResource, SEMAPHORE_WORKFLOW_ID, \
8+
from resource_locking.lock_manager_workflow import AssignedResource, LOCK_MANAGER_WORKFLOW_ID, \
99
ReleaseRequest, AcquireRequest, HandoffRequest
1010

11-
1211
@dataclass
13-
class LoadActivityInput:
12+
class UseResourceActivityInput:
1413
resource: str
1514
iteration: str
1615

1716
@activity.defn
18-
async def load(input: LoadActivityInput) -> None:
19-
workflow_id = activity.info().workflow_id
20-
print(f"Workflow {workflow_id} starts using {input.resource} the {input.iteration} time")
21-
await asyncio.sleep(5)
22-
print(f"Workflow {workflow_id} finishes using {input.resource} the {input.iteration} time")
17+
async def use_resource(input: UseResourceActivityInput) -> None:
18+
info = activity.info()
19+
activity.logger.info(f"{info.workflow_id} starts using {input.resource} the {input.iteration} time")
20+
await asyncio.sleep(3)
21+
activity.logger.info(f"{info.workflow_id} done using {input.resource} the {input.iteration} time")
2322

2423
@dataclass
25-
class LoadWorkflowInput:
24+
class ResourceLockingWorkflowInput:
2625
# If set, this workflow will fail after the "first", "second", or "third" activity.
2726
iteration_to_fail_after: Optional[str]
2827

@@ -36,15 +35,13 @@ class LoadWorkflowInput:
3635
class FailWorkflowException(Exception):
3736
pass
3837

38+
# Wait this long for a resource before giving up
3939
MAX_RESOURCE_WAIT_TIME = timedelta(minutes=5)
4040

41-
def has_timeout(timeout: Optional[timedelta]) -> bool:
42-
return timeout is not None and timeout > timedelta(0)
43-
4441
@workflow.defn(
4542
failure_exception_types=[FailWorkflowException]
4643
)
47-
class LoadWorkflow:
44+
class ResourceLockingWorkflow:
4845

4946
def __init__(self):
5047
self.assigned_resource: Optional[str] = None
@@ -54,15 +51,15 @@ def handle_assign_resource(self, input: AssignedResource):
5451
self.assigned_resource = input.resource
5552

5653
@workflow.run
57-
async def run(self, input: LoadWorkflowInput):
54+
async def run(self, input: ResourceLockingWorkflowInput):
5855
workflow.info()
5956
if has_timeout(workflow.info().run_timeout):
6057
# See "locking" comment below for rationale
61-
raise FailWorkflowException(f"LoadWorkflow cannot have a run_timeout (found {workflow.info().run_timeout})")
58+
raise FailWorkflowException(f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout})")
6259
if has_timeout(workflow.info().execution_timeout):
63-
raise FailWorkflowException(f"LoadWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout})")
60+
raise FailWorkflowException(f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout})")
6461

65-
sem_handle = workflow.get_external_workflow_handle(SEMAPHORE_WORKFLOW_ID)
62+
sem_handle = workflow.get_external_workflow_handle(LOCK_MANAGER_WORKFLOW_ID)
6663

6764
info = workflow.info()
6865
if input.already_owned_resource is None:
@@ -78,13 +75,13 @@ async def run(self, input: LoadWorkflowInput):
7875
raise FailWorkflowException(f"No resource was assigned after {MAX_RESOURCE_WAIT_TIME}")
7976

8077
# From this point forward, we own the resource. Note that this is a lock, not a lease! Our finally block will
81-
# free up the resource if an activity fails. This is why we asserted the lack of workflow-level timeouts
78+
# release the resource if an activity fails. This is why we asserted the lack of workflow-level timeouts
8279
# above - the finally block wouldn't run if there was a timeout.
8380
try:
8481
for iteration in ["first", "second", "third"]:
8582
await workflow.execute_activity(
86-
load,
87-
LoadActivityInput(self.assigned_resource, iteration),
83+
use_resource,
84+
UseResourceActivityInput(self.assigned_resource, iteration),
8885
start_to_close_timeout=timedelta(seconds=10),
8986
)
9087

@@ -93,15 +90,20 @@ async def run(self, input: LoadWorkflowInput):
9390
raise FailWorkflowException()
9491

9592
if input.should_continue_as_new:
96-
next_input = LoadWorkflowInput(
93+
next_input = ResourceLockingWorkflowInput(
9794
iteration_to_fail_after=input.iteration_to_fail_after,
9895
should_continue_as_new=False,
9996
already_owned_resource=self.assigned_resource,
10097
)
10198
workflow.continue_as_new(next_input)
10299
finally:
103100
# Only release the resource if we didn't continue-as-new. workflow.continue_as_new raises to halt workflow
104-
# execution, but the code in this finally block will still run. It wouldn't successfully send the signal...
101+
# execution, but this code in this finally block will still run. It wouldn't successfully send the signal...
105102
# the if statement just avoids some warnings in the log.
106103
if not input.should_continue_as_new:
107-
await sem_handle.signal("release_resource", ReleaseRequest(self.assigned_resource, info.workflow_id, info.run_id))
104+
await sem_handle.signal("release_resource", ReleaseRequest(self.assigned_resource, info.workflow_id, info.run_id))
105+
106+
def has_timeout(timeout: Optional[timedelta]) -> bool:
107+
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
108+
# continue_as_new call).
109+
return timeout is not None and timeout > timedelta(0)

resource_locking/starter.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,49 +3,50 @@
33

44
from temporalio.client import Client, WorkflowHandle, WorkflowFailureError
55

6-
from resource_locking.load_workflow import LoadWorkflow, LoadWorkflowInput
7-
from resource_locking.sem_workflow import SemaphoreWorkflow, SemaphoreWorkflowInput, SEMAPHORE_WORKFLOW_ID
6+
from resource_locking.resource_locking_workflow import ResourceLockingWorkflow, ResourceLockingWorkflowInput
7+
from resource_locking.lock_manager_workflow import LockManagerWorkflow, LockManagerWorkflowInput, LOCK_MANAGER_WORKFLOW_ID
88

99

1010
async def main():
1111
# Connect client
1212
client = await Client.connect("localhost:7233")
1313

14-
# Run the semaphore workflow
15-
sem_handle = await client.start_workflow(
16-
workflow=SemaphoreWorkflow.run,
17-
arg=SemaphoreWorkflowInput({
14+
# Start the LockManagerWorkflow
15+
lock_manager_handle = await client.start_workflow(
16+
workflow=LockManagerWorkflow.run,
17+
arg=LockManagerWorkflowInput({
1818
"resource_a": [],
1919
"resource_b": [],
2020
}),
21-
id=SEMAPHORE_WORKFLOW_ID,
21+
id=LOCK_MANAGER_WORKFLOW_ID,
2222
task_queue="default",
2323
)
2424

25-
load_handles: list[WorkflowHandle[Any, Any]] = []
25+
# Start the ResourceLockingWorkflows
26+
resource_locking_handles: list[WorkflowHandle[Any, Any]] = []
2627
for i in range(0, 4):
27-
input = LoadWorkflowInput(iteration_to_fail_after=None, should_continue_as_new=False, already_owned_resource=None)
28+
input = ResourceLockingWorkflowInput(iteration_to_fail_after=None, should_continue_as_new=False, already_owned_resource=None)
2829
if i == 0:
2930
input.should_continue_as_new = True
3031
if i == 1:
3132
input.iteration_to_fail_after = "first"
3233

33-
load_handle = await client.start_workflow(
34-
workflow=LoadWorkflow.run,
34+
resource_locking_handle = await client.start_workflow(
35+
workflow=ResourceLockingWorkflow.run,
3536
arg=input,
36-
id=f"load-workflow-{i}",
37+
id=f"resource-locking-workflow-{i}",
3738
task_queue="default",
3839
)
39-
load_handles.append(load_handle)
40+
resource_locking_handles.append(resource_locking_handle)
4041

41-
for load_handle in load_handles:
42+
for resource_locking_handle in resource_locking_handles:
4243
try:
43-
await load_handle.result()
44+
await resource_locking_handle.result()
4445
except WorkflowFailureError:
4546
pass
4647

47-
await sem_handle.terminate()
48-
48+
# Clean up after ourselves. In the real world, the lock manager workflow would run forever.
49+
await lock_manager_handle.terminate()
4950

5051
if __name__ == "__main__":
5152
asyncio.run(main())

resource_locking/worker.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from temporalio.client import Client
55
from temporalio.worker import Worker
66

7-
from resource_locking.load_workflow import LoadWorkflow, load
8-
from resource_locking.sem_workflow import SemaphoreWorkflow
7+
from resource_locking.resource_locking_workflow import ResourceLockingWorkflow, use_resource
8+
from resource_locking.lock_manager_workflow import LockManagerWorkflow
99

1010
async def main():
1111
# Uncomment the line below to see logging
@@ -18,8 +18,8 @@ async def main():
1818
worker = Worker(
1919
client,
2020
task_queue="default",
21-
workflows=[SemaphoreWorkflow, LoadWorkflow],
22-
activities=[load],
21+
workflows=[LockManagerWorkflow, ResourceLockingWorkflow],
22+
activities=[use_resource],
2323
)
2424

2525
await worker.run()

0 commit comments

Comments
 (0)