Skip to content

Commit c38f803

Browse files
committed
handle substantive PR feedback
1 parent d75d2d3 commit c38f803

8 files changed

Lines changed: 227 additions & 215 deletions

File tree

resource_pool/resource_allocator.py

Lines changed: 0 additions & 108 deletions
This file was deleted.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
from contextlib import asynccontextmanager
2+
from datetime import timedelta
3+
from typing import AsyncGenerator, Optional
4+
5+
from temporalio import workflow
6+
7+
from resource_pool.shared import (
8+
AcquiredResource,
9+
AcquireRequest,
10+
AcquireResponse,
11+
DetachedResource,
12+
)
13+
14+
15+
# Use this class in workflow code that that needs to run on locked resources.
16+
class ResourcePoolClient:
17+
def __init__(self, pool_workflow_id: str) -> None:
18+
self.pool_workflow_id = pool_workflow_id
19+
self.acquired_resources: list[AcquiredResource] = []
20+
21+
async def send_acquire_signal(self) -> None:
22+
handle = workflow.get_external_workflow_handle(self.pool_workflow_id)
23+
await handle.signal(
24+
"acquire_resource", AcquireRequest(workflow.info().workflow_id)
25+
)
26+
27+
async def send_release_signal(self, acquired_resource: AcquiredResource) -> None:
28+
handle = workflow.get_external_workflow_handle(self.pool_workflow_id)
29+
await handle.signal(
30+
"release_resource",
31+
AcquireResponse(
32+
resource=acquired_resource.resource,
33+
release_key=acquired_resource.release_key,
34+
),
35+
)
36+
37+
def lazy_register_signal_handler(self) -> None:
38+
if workflow.get_signal_handler("assign_resource") is None:
39+
workflow.set_signal_handler("assign_resource", self.assign_resource)
40+
41+
def assign_resource(self, response: AcquireResponse) -> None:
42+
self.acquired_resources.append(
43+
AcquiredResource(
44+
resource=response.resource, release_key=response.release_key
45+
)
46+
)
47+
48+
@asynccontextmanager
49+
async def acquire_resource(
50+
self,
51+
*,
52+
reattach: Optional[DetachedResource] = None,
53+
max_wait_time: timedelta = timedelta(minutes=5),
54+
) -> AsyncGenerator[AcquiredResource, None]:
55+
warn_when_workflow_has_timeouts()
56+
self.lazy_register_signal_handler()
57+
58+
if reattach is None:
59+
await self.send_acquire_signal()
60+
await workflow.wait_condition(
61+
lambda: len(self.acquired_resources) > 0, timeout=max_wait_time
62+
)
63+
resource = self.acquired_resources.pop(0)
64+
else:
65+
resource = AcquiredResource(
66+
resource=reattach.resource, release_key=reattach.release_key
67+
)
68+
69+
# Can't happen, but the typechecker doesn't know about workflow.wait_condition
70+
if resource is None:
71+
raise RuntimeError("resource was None when it can't be")
72+
73+
# During the yield, the calling workflow owns the resource. Note that this is a lock, not a lease! Our
74+
# finally block will release the resource if an activity fails. This is why we asserted the lack of
75+
# workflow-level timeouts above - the finally block wouldn't run if there was a timeout.
76+
try:
77+
yield resource
78+
finally:
79+
if not resource.detached:
80+
await self.send_release_signal(resource)
81+
82+
83+
def warn_when_workflow_has_timeouts() -> None:
84+
if has_timeout(workflow.info().run_timeout):
85+
workflow.logger.warning(
86+
f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout}) - this will leak locks"
87+
)
88+
if has_timeout(workflow.info().execution_timeout):
89+
workflow.logger.warning(
90+
f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout}) - this will leak locks"
91+
)
92+
93+
94+
def has_timeout(timeout: Optional[timedelta]) -> bool:
95+
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
96+
# continue_as_new call).
97+
return timeout is not None and timeout > timedelta(0)

resource_pool/resource_pool_workflow.py

Lines changed: 68 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ class ResourcePoolWorkflow:
2626
def __init__(self, input: ResourcePoolWorkflowInput) -> None:
2727
self.resources = input.resources
2828
self.waiters = input.waiters
29-
self.release_signal_to_resource: dict[str, str] = {}
29+
self.release_key_to_resource: dict[str, str] = {}
30+
3031
for resource, holder in self.resources.items():
3132
if holder is not None and holder.release_signal is not None:
32-
self.release_signal_to_resource[holder.release_signal] = resource
33+
self.release_key_to_resource[holder.release_signal] = resource
3334

3435
@workflow.signal
3536
async def add_resources(self, resources: list[str]) -> None:
@@ -39,29 +40,43 @@ async def add_resources(self, resources: list[str]) -> None:
3940
f"Ignoring attempt to add already-existing resource: {resource}"
4041
)
4142
continue
42-
43-
self.resources[resource] = None
44-
if len(self.waiters) > 0:
45-
next_holder = self.waiters.pop(0)
46-
await self.assign_resource(resource, next_holder)
43+
else:
44+
self.resources[resource] = None
4745

4846
@workflow.signal
4947
async def acquire_resource(self, request: AcquireRequest) -> None:
50-
internal_request = InternalAcquireRequest(
51-
workflow_id=request.workflow_id, release_signal=None
48+
self.waiters.append(
49+
InternalAcquireRequest(workflow_id=request.workflow_id, release_signal=None)
50+
)
51+
workflow.logger.info(
52+
f"workflow_id={request.workflow_id} is waiting for a resource"
5253
)
5354

54-
for resource, holder in self.resources.items():
55-
# Naively give out the first free resource, if we have one
56-
if holder is None:
57-
await self.assign_resource(resource, internal_request)
58-
return
55+
@workflow.signal()
56+
async def release_resource(self, acquire_response: AcquireResponse) -> None:
57+
release_key = acquire_response.release_key
58+
resource = self.release_key_to_resource.get(release_key)
59+
if resource is None:
60+
workflow.logger.warning(f"Ignoring unknown release_key: {release_key}")
61+
return
62+
63+
holder = self.resources[resource]
64+
if holder is None:
65+
workflow.logger.warning(
66+
f"Ignoring request to release resource that is not held: {resource}"
67+
)
68+
return
5969

60-
# Otherwise queue the request
61-
self.waiters.append(internal_request)
70+
# Remove the current holder
6271
workflow.logger.info(
63-
f"workflow_id={request.workflow_id} is waiting for a resource"
72+
f"workflow_id={holder.workflow_id} released resource {resource}"
6473
)
74+
self.resources[resource] = None
75+
del self.release_key_to_resource[release_key]
76+
77+
@workflow.query
78+
def get_current_holders(self) -> dict[str, Optional[InternalAcquireRequest]]:
79+
return self.resources
6580

6681
async def assign_resource(
6782
self, resource: str, internal_request: InternalAcquireRequest
@@ -71,60 +86,57 @@ async def assign_resource(
7186
f"workflow_id={internal_request.workflow_id} acquired resource {resource}"
7287
)
7388
internal_request.release_signal = str(workflow.uuid4())
74-
self.release_signal_to_resource[internal_request.release_signal] = resource
89+
self.release_key_to_resource[internal_request.release_signal] = resource
7590

7691
requester = workflow.get_external_workflow_handle(internal_request.workflow_id)
7792
await requester.signal(
7893
"assign_resource",
7994
AcquireResponse(
80-
release_signal_name=internal_request.release_signal, resource=resource
95+
release_key=internal_request.release_signal, resource=resource
8196
),
8297
)
8398

84-
@workflow.signal(dynamic=True)
85-
async def release_resource(self, signal_name, *args) -> None:
86-
if not signal_name in self.release_signal_to_resource:
87-
workflow.logger.warning(
88-
f"Ignoring unknown signal: {signal_name} was not a valid release signal."
89-
)
90-
return
99+
async def assign_next_resource(self) -> bool:
100+
if len(self.waiters) == 0:
101+
return False
91102

92-
resource = self.release_signal_to_resource[signal_name]
103+
next_free_resource = self.get_free_resource()
104+
if next_free_resource is None:
105+
return False
93106

94-
holder = self.resources[resource]
95-
if holder is None:
96-
workflow.logger.warning(
97-
f"Ignoring request to release resource that is not held: {resource}"
98-
)
99-
return
107+
next_waiter = self.waiters.pop(0)
108+
await self.assign_resource(next_free_resource, next_waiter)
109+
return True
100110

101-
# Remove the current holder
102-
workflow.logger.info(
103-
f"workflow_id={holder.workflow_id} released resource {resource}"
111+
def get_free_resource(self) -> Optional[str]:
112+
return next(
113+
(resource for resource, holder in self.resources.items() if holder is None),
114+
None,
104115
)
105-
self.resources[resource] = None
106-
del self.release_signal_to_resource[signal_name]
107116

108-
# If there are queued requests, assign the resource to the next one
109-
if len(self.waiters) > 0:
110-
next_holder = self.waiters.pop(0)
111-
await self.assign_resource(resource, next_holder)
117+
def can_assign_resource(self) -> bool:
118+
return len(self.waiters) > 0 and self.get_free_resource() is not None
112119

113-
@workflow.query
114-
def get_current_holders(self) -> dict[str, Optional[InternalAcquireRequest]]:
115-
return {k: v if v else None for k, v in self.resources.items()}
120+
def should_continue_as_new(self) -> bool:
121+
return (
122+
workflow.info().is_continue_as_new_suggested()
123+
and workflow.all_handlers_finished()
124+
)
116125

117126
@workflow.run
118127
async def run(self, _: ResourcePoolWorkflowInput) -> None:
119-
# Continue as new either when temporal tells us to, or every 12 hours (so it occurs semi-frequently)
120-
await workflow.wait_condition(
121-
lambda: workflow.info().is_continue_as_new_suggested(),
122-
timeout=timedelta(hours=12),
123-
)
124-
125-
workflow.continue_as_new(
126-
ResourcePoolWorkflowInput(
127-
resources=self.resources,
128-
waiters=self.waiters,
128+
while True:
129+
await workflow.wait_condition(
130+
lambda: self.can_assign_resource() or self.should_continue_as_new()
129131
)
130-
)
132+
133+
if await self.assign_next_resource():
134+
continue
135+
136+
if self.should_continue_as_new():
137+
workflow.continue_as_new(
138+
ResourcePoolWorkflowInput(
139+
resources=self.resources,
140+
waiters=self.waiters,
141+
)
142+
)

0 commit comments

Comments
 (0)