Skip to content

Commit e6772d4

Browse files
committed
poe format
1 parent f5eedc5 commit e6772d4

4 files changed

Lines changed: 157 additions & 49 deletions

File tree

resource_locking/lock_manager_workflow.py

Lines changed: 74 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,44 @@
33

44
from temporalio import workflow
55

6+
67
@dataclass
78
class AssignedResource:
89
resource: str
910

11+
1012
@dataclass
1113
class AcquireRequest:
1214
workflow_id: str
1315
run_id: str
1416

17+
1518
@dataclass
1619
class ReleaseRequest:
1720
resource: str
1821
workflow_id: str
1922
run_id: str
2023

24+
2125
@dataclass
2226
class HandoffRequest:
2327
resource: str
2428
workflow_id: str
2529
old_run_id: str
2630
new_run_id: str
2731

32+
2833
LOCK_MANAGER_WORKFLOW_ID = "lock_manager"
2934

35+
3036
@dataclass
3137
class LockManagerWorkflowInput:
3238
# Key is resource, value is users of the resource. The first item in each list is the current holder of the lease
3339
# on that resource. A similar data structure could allow for multiple holders (perhaps the first n items are the
3440
# current holders).
3541
resource_queues: dict[str, list[AcquireRequest]]
3642

43+
3744
@workflow.defn
3845
class LockManagerWorkflow:
3946
def __init__(self):
@@ -47,73 +54,117 @@ async def acquire_resource(self, request: AcquireRequest):
4754
for resource in self.resource_queues:
4855
# Naively give out the first free resource, if we have one
4956
if len(self.resource_queues[resource]) == 0:
50-
workflow.logger.info(f"workflow_id={request.workflow_id} run_id={request.run_id} acquired resource {resource}")
57+
workflow.logger.info(
58+
f"workflow_id={request.workflow_id} run_id={request.run_id} acquired resource {resource}"
59+
)
5160
self.resource_queues[resource].append(request)
52-
requester = workflow.get_external_workflow_handle(request.workflow_id, run_id=request.run_id)
61+
requester = workflow.get_external_workflow_handle(
62+
request.workflow_id, run_id=request.run_id
63+
)
5364
await requester.signal("assign_resource", AssignedResource(resource))
5465
return
5566

5667
# Otherwise put this resource in a random queue.
5768
resource = workflow.random().choice(list(self.resource_queues.keys()))
58-
workflow.logger.info(f"workflow_id={request.workflow_id} run_id={request.run_id} is waiting for resource {resource}")
69+
workflow.logger.info(
70+
f"workflow_id={request.workflow_id} run_id={request.run_id} is waiting for resource {resource}"
71+
)
5972
self.resource_queues[resource].append(request)
6073

6174
@workflow.signal
6275
async def release_resource(self, request: ReleaseRequest):
6376
queue = self.resource_queues[request.resource]
6477
if queue is None:
65-
workflow.logger.warning(f"Ignoring request from {request.workflow_id} to release non-existent resource: {request.resource}")
78+
workflow.logger.warning(
79+
f"Ignoring request from {request.workflow_id} to release non-existent resource: {request.resource}"
80+
)
6681
return
6782

6883
if len(queue) == 0:
69-
workflow.logger.warning(f"Ignoring request from {request.workflow_id} to release resource that is not held: {request.resource}")
84+
workflow.logger.warning(
85+
f"Ignoring request from {request.workflow_id} to release resource that is not held: {request.resource}"
86+
)
7087
return
7188

7289
holder = queue[0]
73-
if not (holder.workflow_id == request.workflow_id and holder.run_id == request.run_id):
74-
workflow.logger.warning(f"Ignoring request from non-holder to release resource {request.resource}")
75-
workflow.logger.warning(f"resource is currently held by wf_id={holder.workflow_id} run_id={holder.run_id}")
76-
workflow.logger.warning(f"request was from wf_id={request.workflow_id} run_id={request.run_id}")
90+
if not (
91+
holder.workflow_id == request.workflow_id
92+
and holder.run_id == request.run_id
93+
):
94+
workflow.logger.warning(
95+
f"Ignoring request from non-holder to release resource {request.resource}"
96+
)
97+
workflow.logger.warning(
98+
f"resource is currently held by wf_id={holder.workflow_id} run_id={holder.run_id}"
99+
)
100+
workflow.logger.warning(
101+
f"request was from wf_id={request.workflow_id} run_id={request.run_id}"
102+
)
77103
return
78104

79105
# Remove the current holder from the head of the queue
80-
workflow.logger.info(f"workflow_id={request.workflow_id} run_id={request.run_id} released resource {request.resource}")
106+
workflow.logger.info(
107+
f"workflow_id={request.workflow_id} run_id={request.run_id} released resource {request.resource}"
108+
)
81109
queue = queue[1:]
82110
self.resource_queues[request.resource] = queue
83111

84112
# If there are queued requests, assign the resource to the next one
85113
if len(queue) > 0:
86114
next_holder = queue[0]
87-
workflow.logger.info(f"workflow_id={next_holder.workflow_id} run_id={next_holder.run_id} acquired resource {request.resource} after waiting")
88-
requester = workflow.get_external_workflow_handle(next_holder.workflow_id, run_id=next_holder.run_id)
89-
await requester.signal("assign_resource", AssignedResource(request.resource))
115+
workflow.logger.info(
116+
f"workflow_id={next_holder.workflow_id} run_id={next_holder.run_id} acquired resource {request.resource} after waiting"
117+
)
118+
requester = workflow.get_external_workflow_handle(
119+
next_holder.workflow_id, run_id=next_holder.run_id
120+
)
121+
await requester.signal(
122+
"assign_resource", AssignedResource(request.resource)
123+
)
90124

91125
@workflow.signal
92126
async def handoff_resource(self, request: HandoffRequest):
93127
queue = self.resource_queues[request.resource]
94128
if queue is None:
95-
workflow.logger.warning(f"Ignoring request from {request.workflow_id} to hand off non-existent resource: {request.resource}")
129+
workflow.logger.warning(
130+
f"Ignoring request from {request.workflow_id} to hand off non-existent resource: {request.resource}"
131+
)
96132
return
97133

98134
if len(queue) == 0:
99-
workflow.logger.warning(f"Ignoring request from {request.workflow_id} to hand off resource that is not held: {request.resource}")
135+
workflow.logger.warning(
136+
f"Ignoring request from {request.workflow_id} to hand off resource that is not held: {request.resource}"
137+
)
100138
return
101139

102140
holder = queue[0]
103-
if not (holder.workflow_id == request.workflow_id and holder.run_id == request.old_run_id):
104-
workflow.logger.warning(f"Ignoring request from non-holder to hand off resource {request.resource}")
105-
workflow.logger.warning(f"resource is currently held by wf_id={holder.workflow_id} run_id={holder.run_id}")
106-
workflow.logger.warning(f"request was from wf_id={request.workflow_id} run_id={request.old_run_id}")
141+
if not (
142+
holder.workflow_id == request.workflow_id
143+
and holder.run_id == request.old_run_id
144+
):
145+
workflow.logger.warning(
146+
f"Ignoring request from non-holder to hand off resource {request.resource}"
147+
)
148+
workflow.logger.warning(
149+
f"resource is currently held by wf_id={holder.workflow_id} run_id={holder.run_id}"
150+
)
151+
workflow.logger.warning(
152+
f"request was from wf_id={request.workflow_id} run_id={request.old_run_id}"
153+
)
107154
return
108155

109-
workflow.logger.info(f"workflow_id={request.workflow_id} handed off resource {request.resource} from run_id={request.old_run_id} to run_id={request.new_run_id}")
156+
workflow.logger.info(
157+
f"workflow_id={request.workflow_id} handed off resource {request.resource} from run_id={request.old_run_id} to run_id={request.new_run_id}"
158+
)
110159
queue[0] = AcquireRequest(request.workflow_id, request.new_run_id)
111-
requester = workflow.get_external_workflow_handle(request.workflow_id, run_id=request.new_run_id)
160+
requester = workflow.get_external_workflow_handle(
161+
request.workflow_id, run_id=request.new_run_id
162+
)
112163
await requester.signal("assign_resource", AssignedResource(request.resource))
113164

114165
@workflow.query
115166
def get_current_holders(self) -> dict[str, AcquireRequest]:
116-
return { k: v[0] if v else None for k, v in self.resource_queues.items() }
167+
return {k: v[0] if v else None for k, v in self.resource_queues.items()}
117168

118169
@workflow.run
119170
async def run(self, input: LockManagerWorkflowInput) -> None:
@@ -125,4 +176,4 @@ async def run(self, input: LockManagerWorkflowInput) -> None:
125176
timeout=timedelta(hours=12),
126177
)
127178

128-
workflow.continue_as_new(LockManagerWorkflowInput(self.resource_queues))
179+
workflow.continue_as_new(LockManagerWorkflowInput(self.resource_queues))

resource_locking/resource_locking_workflow.py

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,32 @@
55

66
from temporalio import activity, workflow
77

8-
from resource_locking.lock_manager_workflow import AssignedResource, LOCK_MANAGER_WORKFLOW_ID, \
9-
ReleaseRequest, AcquireRequest, HandoffRequest
8+
from resource_locking.lock_manager_workflow import (
9+
LOCK_MANAGER_WORKFLOW_ID,
10+
AcquireRequest,
11+
AssignedResource,
12+
HandoffRequest,
13+
ReleaseRequest,
14+
)
15+
1016

1117
@dataclass
1218
class UseResourceActivityInput:
1319
resource: str
1420
iteration: str
1521

22+
1623
@activity.defn
1724
async def use_resource(input: UseResourceActivityInput) -> None:
1825
info = activity.info()
19-
activity.logger.info(f"{info.workflow_id} starts using {input.resource} the {input.iteration} time")
26+
activity.logger.info(
27+
f"{info.workflow_id} starts using {input.resource} the {input.iteration} time"
28+
)
2029
await asyncio.sleep(3)
21-
activity.logger.info(f"{info.workflow_id} done using {input.resource} the {input.iteration} time")
30+
activity.logger.info(
31+
f"{info.workflow_id} done using {input.resource} the {input.iteration} time"
32+
)
33+
2234

2335
@dataclass
2436
class ResourceLockingWorkflowInput:
@@ -32,17 +44,17 @@ class ResourceLockingWorkflowInput:
3244
# Used to transfer resource ownership between iterations during continue_as_new
3345
already_owned_resource: Optional[str]
3446

47+
3548
class FailWorkflowException(Exception):
3649
pass
3750

51+
3852
# Wait this long for a resource before giving up
3953
MAX_RESOURCE_WAIT_TIME = timedelta(minutes=5)
4054

41-
@workflow.defn(
42-
failure_exception_types=[FailWorkflowException]
43-
)
44-
class ResourceLockingWorkflow:
4555

56+
@workflow.defn(failure_exception_types=[FailWorkflowException])
57+
class ResourceLockingWorkflow:
4658
def __init__(self):
4759
self.assigned_resource: Optional[str] = None
4860

@@ -55,24 +67,42 @@ async def run(self, input: ResourceLockingWorkflowInput):
5567
workflow.info()
5668
if has_timeout(workflow.info().run_timeout):
5769
# See "locking" comment below for rationale
58-
raise FailWorkflowException(f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout})")
70+
raise FailWorkflowException(
71+
f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout})"
72+
)
5973
if has_timeout(workflow.info().execution_timeout):
60-
raise FailWorkflowException(f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout})")
74+
raise FailWorkflowException(
75+
f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout})"
76+
)
6177

6278
sem_handle = workflow.get_external_workflow_handle(LOCK_MANAGER_WORKFLOW_ID)
6379

6480
info = workflow.info()
6581
if input.already_owned_resource is None:
66-
await sem_handle.signal("acquire_resource", AcquireRequest(info.workflow_id, info.run_id))
82+
await sem_handle.signal(
83+
"acquire_resource", AcquireRequest(info.workflow_id, info.run_id)
84+
)
6785
else:
6886
# If we continued as new, we already have a resource. We need to transfer ownership from our predecessor to
6987
# ourselves.
70-
await sem_handle.signal("handoff_resource", HandoffRequest(input.already_owned_resource, info.workflow_id, info.continued_run_id, info.run_id))
88+
await sem_handle.signal(
89+
"handoff_resource",
90+
HandoffRequest(
91+
input.already_owned_resource,
92+
info.workflow_id,
93+
info.continued_run_id,
94+
info.run_id,
95+
),
96+
)
7197

7298
# Both branches above should cause us to receive an "assign_resource" signal.
73-
await workflow.wait_condition(lambda: self.assigned_resource is not None, timeout=MAX_RESOURCE_WAIT_TIME)
99+
await workflow.wait_condition(
100+
lambda: self.assigned_resource is not None, timeout=MAX_RESOURCE_WAIT_TIME
101+
)
74102
if self.assigned_resource is None:
75-
raise FailWorkflowException(f"No resource was assigned after {MAX_RESOURCE_WAIT_TIME}")
103+
raise FailWorkflowException(
104+
f"No resource was assigned after {MAX_RESOURCE_WAIT_TIME}"
105+
)
76106

77107
# From this point forward, we own the resource. Note that this is a lock, not a lease! Our finally block will
78108
# release the resource if an activity fails. This is why we asserted the lack of workflow-level timeouts
@@ -86,7 +116,9 @@ async def run(self, input: ResourceLockingWorkflowInput):
86116
)
87117

88118
if iteration == input.iteration_to_fail_after:
89-
workflow.logger.info(f"Failing after iteration {input.iteration_to_fail_after}")
119+
workflow.logger.info(
120+
f"Failing after iteration {input.iteration_to_fail_after}"
121+
)
90122
raise FailWorkflowException()
91123

92124
if input.should_continue_as_new:
@@ -101,9 +133,15 @@ async def run(self, input: ResourceLockingWorkflowInput):
101133
# execution, but this code in this finally block will still run. It wouldn't successfully send the signal...
102134
# the if statement just avoids some warnings in the log.
103135
if not input.should_continue_as_new:
104-
await sem_handle.signal("release_resource", ReleaseRequest(self.assigned_resource, info.workflow_id, info.run_id))
136+
await sem_handle.signal(
137+
"release_resource",
138+
ReleaseRequest(
139+
self.assigned_resource, info.workflow_id, info.run_id
140+
),
141+
)
142+
105143

106144
def has_timeout(timeout: Optional[timedelta]) -> bool:
107145
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
108146
# continue_as_new call).
109-
return timeout is not None and timeout > timedelta(0)
147+
return timeout is not None and timeout > timedelta(0)

resource_locking/starter.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
import asyncio
22
from typing import Any
33

4-
from temporalio.client import Client, WorkflowHandle, WorkflowFailureError
4+
from temporalio.client import Client, WorkflowFailureError, WorkflowHandle
55

6-
from resource_locking.resource_locking_workflow import ResourceLockingWorkflow, ResourceLockingWorkflowInput
7-
from resource_locking.lock_manager_workflow import LockManagerWorkflow, LockManagerWorkflowInput, LOCK_MANAGER_WORKFLOW_ID
6+
from resource_locking.lock_manager_workflow import (
7+
LOCK_MANAGER_WORKFLOW_ID,
8+
LockManagerWorkflow,
9+
LockManagerWorkflowInput,
10+
)
11+
from resource_locking.resource_locking_workflow import (
12+
ResourceLockingWorkflow,
13+
ResourceLockingWorkflowInput,
14+
)
815

916

1017
async def main():
@@ -14,18 +21,24 @@ async def main():
1421
# Start the LockManagerWorkflow
1522
lock_manager_handle = await client.start_workflow(
1623
workflow=LockManagerWorkflow.run,
17-
arg=LockManagerWorkflowInput({
18-
"resource_a": [],
19-
"resource_b": [],
20-
}),
24+
arg=LockManagerWorkflowInput(
25+
{
26+
"resource_a": [],
27+
"resource_b": [],
28+
}
29+
),
2130
id=LOCK_MANAGER_WORKFLOW_ID,
2231
task_queue="default",
2332
)
2433

2534
# Start the ResourceLockingWorkflows
2635
resource_locking_handles: list[WorkflowHandle[Any, Any]] = []
2736
for i in range(0, 4):
28-
input = ResourceLockingWorkflowInput(iteration_to_fail_after=None, should_continue_as_new=False, already_owned_resource=None)
37+
input = ResourceLockingWorkflowInput(
38+
iteration_to_fail_after=None,
39+
should_continue_as_new=False,
40+
already_owned_resource=None,
41+
)
2942
if i == 0:
3043
input.should_continue_as_new = True
3144
if i == 1:
@@ -48,5 +61,6 @@ async def main():
4861
# Clean up after ourselves. In the real world, the lock manager workflow would run forever.
4962
await lock_manager_handle.terminate()
5063

64+
5165
if __name__ == "__main__":
5266
asyncio.run(main())

0 commit comments

Comments
 (0)