-
Notifications
You must be signed in to change notification settings - Fork 74
Add a sample that uses a workflow to lock resources #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
There is a pattern for doing this with signal-with-start. You can see a sample in .NET, Go, and TypeScript. Would totally support such a sample in Python. Needs to be a simple |
c42ea19
to
2ab9dfa
Compare
resource.autorelease = True | ||
yield resource | ||
finally: | ||
if resource.autorelease: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blah. I want there to be a way to tell whether the workflow code is CAN'ing here, but I believe there isn't one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should always release it if a user is using async with
. It will be very confusing if I as a user use async with
and the resource remains when that exits. If we need to let callers keep the resource across continue as new, they should "detach" it (e.g. can offer a helper for this that returns some type) and "reattach" it on next workflow use. Granted I also think including continue as new from the caller side in this sample is a bit confusing, but maybe it's needed.
@cretz LMK what you think about the |
async def acquire_resource( | ||
cls, | ||
*, | ||
already_acquired_resource: Optional[AcquiredResource] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this, but... (thought finished after ⭐)
@workflow.run | ||
async def run(self, input: ResourceLockingWorkflowInput): | ||
async with ResourceAllocator.acquire_resource( | ||
already_acquired_resource=input.already_acquired_resource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⭐ ...I had to have some way of handling the case where my CAN-predecessor already locked the resource. It was either an optional param to acquire_resource
or branching around the async with
, which felt much more awkward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this sample needs to complication of including continue as new support for this workflow using the resource manager. This workflow isn't large enough to ever need to continue as new, so including support for it is a bit confusing (people aren't going to understand why it is here, but think they need to do the same).
However, if you do keep this, I would call this reattach=
and have it accept an typed object that came from detach
in the pre-CAN workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a bunch of comments, can take em or leave em, some of which are self-contradictory. Now that I've seen the full use case, I would think of this as a ResourcePool
(with a ResourcePoolWorkflow
and a ResourcePoolClient
)
start_signal="acquire_resource", | ||
start_signal_args=[AcquireRequest(info.workflow_id)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would usually be a good opportunity to use our new update-with-start functionality, but if you must span continue-as-new on the lock manager workflow, you have to stay with signals for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think it's important to keep CAN in. Without CAN, the sample can't demonstrate how to detach/reattach so it seems incomplete to me.
resource.autorelease = True | ||
yield resource | ||
finally: | ||
if resource.autorelease: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should always release it if a user is using async with
. It will be very confusing if I as a user use async with
and the resource remains when that exits. If we need to let callers keep the resource across continue as new, they should "detach" it (e.g. can offer a helper for this that returns some type) and "reattach" it on next workflow use. Granted I also think including continue as new from the caller side in this sample is a bit confusing, but maybe it's needed.
Thank you - I like the new names. I'll take most of the comments. I want to leave CAN in, because that's the only thing that makes this tricky - sample feels incomplete without it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, only a few minor things
await handle.signal( | ||
"acquire_resource", AcquireRequest(workflow.info().workflow_id) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
await handle.signal( | |
"acquire_resource", AcquireRequest(workflow.info().workflow_id) | |
) | |
await handle.signal( | |
ResourcePoolWorkflow.acquire_resource, AcquireRequest(workflow.info().workflow_id) | |
) |
To be more type safe. Same everywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost everywhere - the pool workflow should signal the requester in an un-typesafe way (since it can't know what kind of workflow the requester is).
f"assign_resource_{workflow.info().workflow_id}", | ||
AcquireResponse(release_key=release_signal, resource=resource), | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no race here, right? Imagine the release signal arrives immediately. The fix is obvious if there is a race, but the code smells nicer this way.
Why I think there's no race: I believe that this handler must run to completion before the handler for the release signal can start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, there is no race here because this is only called in one place, serially, in the primary loop. However, this can show the dangers of overly extracting/modularizing single-use methods - it can be hard to see the constraints they expect of the callers. Can technically add a "Not safe for concurrent use" docstring if concerned.
44ce942
to
792276c
Compare
792276c
to
7ac42cc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only minor, non-blocking things, LGTM. Merge at will or let me know when done and I can.
f"assign_resource_{workflow.info().workflow_id}", | ||
AcquireResponse(release_key=release_signal, resource=resource), | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, there is no race here because this is only called in one place, serially, in the primary loop. However, this can show the dangers of overly extracting/modularizing single-use methods - it can be hard to see the constraints they expect of the callers. Can technically add a "Not safe for concurrent use" docstring if concerned.
What was changed
Adds a new sample that demonstrates how to use a workflow to mediate access to a small pool of resources.
Why?
A customer asked for an example of how to do this kind of thing. We ruled out worker-specific task queues and sessions - those work best when you can run the worker in/near the protected resource. They cannot, for security reasons, which leaves us with:
Checklist
How was this tested:
Ran the worker and starter per the readme. New unit test passes.
Any docs updates needed?
No - it's just a sample.