Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions resource_locking/resource_allocator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from contextlib import asynccontextmanager
from datetime import timedelta
from typing import Optional, AsyncGenerator

from temporalio.client import Client
from temporalio import workflow, activity
from temporalio.common import WorkflowIDConflictPolicy

from resource_locking.lock_manager_workflow import LockManagerWorkflowInput, LockManagerWorkflow
from resource_locking.shared import AcquireResponse, LOCK_MANAGER_WORKFLOW_ID, AcquireRequest, AcquiredResource

# Use this class in workflow code that that needs to run on locked resources.
class ResourceAllocator:
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
def __init__(self, client: Client):
self.client = client
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated

@activity.defn
async def send_acquire_signal(self):
info = activity.info()

# This will start and signal the workflow if it isn't running, otherwise it will signal the current run.
await self.client.start_workflow(
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
workflow=LockManagerWorkflow.run,
arg=LockManagerWorkflowInput(
resources={},
waiters=[],
),
id=LOCK_MANAGER_WORKFLOW_ID,
task_queue="default",
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
start_signal="acquire_resource",
start_signal_args=[AcquireRequest(info.workflow_id)]
)

@classmethod
@asynccontextmanager
async def acquire_resource(cls, *, already_acquired_resource: Optional[AcquiredResource] = None, max_wait_time: timedelta = timedelta(minutes=5)):
warn_when_workflow_has_timeouts()

resource = already_acquired_resource
if resource is None:
async def assign_resource(input: AcquireResponse):
workflow.set_signal_handler("assign_resource", None)
nonlocal resource
resource = AcquiredResource(
resource=input.resource,
release_signal_name=input.release_signal_name,
)

workflow.set_signal_handler("assign_resource", assign_resource)
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated

await workflow.execute_activity(
ResourceAllocator.send_acquire_signal,
start_to_close_timeout=timedelta(seconds=10),
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
)

await workflow.wait_condition(lambda: resource is not None, timeout=max_wait_time)

# During the yield, the calling workflow owns the resource. Note that this is a lock, not a lease! Our
# finally block will release the resource if an activity fails. This is why we asserted the lack of
# workflow-level timeouts above - the finally block wouldn't run if there was a timeout.
try:
resource.autorelease = True
yield resource
finally:
if resource.autorelease:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blah. I want there to be a way to tell whether the workflow code is CAN'ing here, but I believe there isn't one.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should always release it if a user is using async with. It will be very confusing if I as a user use async with and the resource remains when that exits. If we need to let callers keep the resource across continue as new, they should "detach" it (e.g. can offer a helper for this that returns some type) and "reattach" it on next workflow use. Granted I also think including continue as new from the caller side in this sample is a bit confusing, but maybe it's needed.

handle = workflow.get_external_workflow_handle(LOCK_MANAGER_WORKFLOW_ID)
await handle.signal(resource.release_signal_name)

def warn_when_workflow_has_timeouts():
if has_timeout(workflow.info().run_timeout):
workflow.logger.warning(
f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout}) - this will leak locks"
)
if has_timeout(workflow.info().execution_timeout):
workflow.logger.warning(
f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout}) - this will leak locks"
)

def has_timeout(timeout: Optional[timedelta]) -> bool:
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
# continue_as_new call).
return timeout is not None and timeout > timedelta(0)
71 changes: 13 additions & 58 deletions resource_locking/resource_locking_workflow.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import asyncio
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Optional
from typing import Optional, Callable

from temporalio import activity, workflow

from resource_locking.resource_allocator import ResourceAllocator
from resource_locking.shared import (
LOCK_MANAGER_WORKFLOW_ID,
AcquireRequest,
AcquireResponse,
AcquireResponse, AcquiredResource,
)

@dataclass
Expand Down Expand Up @@ -38,7 +39,7 @@ class ResourceLockingWorkflowInput:
should_continue_as_new: bool

# Used to transfer resource ownership between iterations during continue_as_new
already_assigned_resource: Optional[AcquireResponse]
already_acquired_resource: Optional[AcquiredResource] = field(default=None)


class FailWorkflowException(Exception):
Expand All @@ -51,53 +52,13 @@ class FailWorkflowException(Exception):

@workflow.defn(failure_exception_types=[FailWorkflowException])
class ResourceLockingWorkflow:
def __init__(self):
self.assigned_resource: Optional[AcquireResponse] = None

@workflow.signal(name="assign_resource")
def handle_assign_resource(self, input: AcquireResponse):
self.assigned_resource = input

@workflow.run
async def run(self, input: ResourceLockingWorkflowInput):
if has_timeout(workflow.info().run_timeout):
# See "locking" comment below for rationale
raise FailWorkflowException(
f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout})"
)
if has_timeout(workflow.info().execution_timeout):
raise FailWorkflowException(
f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout})"
)

sem_handle = workflow.get_external_workflow_handle(LOCK_MANAGER_WORKFLOW_ID)

info = workflow.info()
if input.already_assigned_resource is None:
await sem_handle.signal("acquire_resource", AcquireRequest(info.workflow_id))
elif info.continued_run_id:
self.assigned_resource = input.already_assigned_resource
else:
raise FailWorkflowException(
f"Only set 'already_assigned_resource' when using continue_as_new"
)

await workflow.wait_condition(
lambda: self.assigned_resource is not None, timeout=MAX_RESOURCE_WAIT_TIME
)
if self.assigned_resource is None:
raise FailWorkflowException(
f"No resource was assigned after {MAX_RESOURCE_WAIT_TIME}"
)

# From this point forward, we own the resource. Note that this is a lock, not a lease! Our finally block will
# release the resource if an activity fails. This is why we asserted the lack of workflow-level timeouts
# above - the finally block wouldn't run if there was a timeout.
try:
async with ResourceAllocator.acquire_resource(already_acquired_resource=input.already_acquired_resource) as resource:
for iteration in ["first", "second", "third"]:
await workflow.execute_activity(
use_resource,
UseResourceActivityInput(self.assigned_resource.resource, iteration),
UseResourceActivityInput(resource.resource, iteration),
start_to_close_timeout=timedelta(seconds=10),
)

Expand All @@ -111,17 +72,11 @@ async def run(self, input: ResourceLockingWorkflowInput):
next_input = ResourceLockingWorkflowInput(
iteration_to_fail_after=input.iteration_to_fail_after,
should_continue_as_new=False,
already_assigned_resource=self.assigned_resource,
already_acquired_resource=resource,
)

# By default, ResourceAllocator will release the resource when we return. We want to hold the resource
# across continue-as-new for the sake of demonstration.
resource.autorelease = False

workflow.continue_as_new(next_input)
finally:
# Only release the resource if we didn't continue-as-new. workflow.continue_as_new raises to halt workflow
# execution, but this code in this finally block will still run. It wouldn't successfully send the signal...
# the if statement just avoids some warnings in the log.
if not input.should_continue_as_new:
await sem_handle.signal(self.assigned_resource.release_signal_name)

def has_timeout(timeout: Optional[timedelta]) -> bool:
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
# continue_as_new call).
return timeout is not None and timeout > timedelta(0)
7 changes: 6 additions & 1 deletion resource_locking/shared.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Optional

LOCK_MANAGER_WORKFLOW_ID = "lock_manager"
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated

Expand All @@ -10,3 +11,7 @@ class AcquireRequest:
class AcquireResponse:
release_signal_name: str
resource: str

@dataclass
class AcquiredResource(AcquireResponse):
autorelease: bool = field(default=True)
34 changes: 17 additions & 17 deletions resource_locking/starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,25 @@

from temporalio.client import Client, WorkflowFailureError, WorkflowHandle

from resource_locking.shared import LOCK_MANAGER_WORKFLOW_ID
from resource_locking.lock_manager_workflow import (
LockManagerWorkflow,
LockManagerWorkflowInput,
)
from resource_locking.lock_manager_workflow import LockManagerWorkflow, LockManagerWorkflowInput
from resource_locking.resource_locking_workflow import (
ResourceLockingWorkflow,
ResourceLockingWorkflowInput,
)
from resource_locking.shared import LOCK_MANAGER_WORKFLOW_ID
from temporalio.common import WorkflowIDConflictPolicy


async def main():
# Connect client
client = await Client.connect("localhost:7233")

# Start the LockManagerWorkflow
lock_manager_handle = await client.start_workflow(
workflow=LockManagerWorkflow.run,
arg=LockManagerWorkflowInput(
resources={ "resource_a": None, "resource_b": None },
waiters=[],
),
id=LOCK_MANAGER_WORKFLOW_ID,
task_queue="default",
)

# Start the ResourceLockingWorkflows
resource_locking_handles: list[WorkflowHandle[Any, Any]] = []
for i in range(0, 4):
input = ResourceLockingWorkflowInput(
iteration_to_fail_after=None,
should_continue_as_new=False,
already_assigned_resource=None,
)
if i == 0:
input.should_continue_as_new = True
Expand All @@ -50,6 +36,20 @@ async def main():
)
resource_locking_handles.append(resource_locking_handle)

# Add some resources
lock_manager_handle = await client.start_workflow(
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
workflow=LockManagerWorkflow.run,
arg=LockManagerWorkflowInput(
resources={},
waiters=[],
),
id=LOCK_MANAGER_WORKFLOW_ID,
task_queue="default",
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
start_signal="add_resources",
start_signal_args=[["resource_a", "resource_b"]],
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
)

for resource_locking_handle in resource_locking_handles:
try:
await resource_locking_handle.result()
Expand Down
8 changes: 7 additions & 1 deletion resource_locking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from temporalio.client import Client
from temporalio.worker import Worker

from resource_locking.resource_allocator import ResourceAllocator
from resource_locking.lock_manager_workflow import LockManagerWorkflow
from resource_locking.resource_locking_workflow import (
ResourceLockingWorkflow,
Expand All @@ -17,12 +18,17 @@ async def main():
# Start client
client = await Client.connect("localhost:7233")

resource_allocator = ResourceAllocator(client)

# Run a worker for the workflow
worker = Worker(
client,
task_queue="default",
workflows=[LockManagerWorkflow, ResourceLockingWorkflow],
activities=[use_resource],
activities=[
use_resource,
resource_allocator.send_acquire_signal,
],
)

await worker.run()
Expand Down