Skip to content

Feature/task concurrency control#148

Merged
chrisguidry merged 9 commits intochrisguidry:mainfrom
abrookins:feature/task-concurrency-control
Jul 30, 2025
Merged

Feature/task concurrency control#148
chrisguidry merged 9 commits intochrisguidry:mainfrom
abrookins:feature/task-concurrency-control

Conversation

@abrookins
Copy link
Collaborator

@abrookins abrookins commented Jul 11, 2025

Adds a ConcurrencyLimit context manager. Closes #86.

NOTE: I haven't tested this!

Copy link
Owner

@chrisguidry chrisguidry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @abrookins! First off, the docs and tests are 🤌. The implementation looks good, but I think there's a way to weave this into the worker that's a little more natural and may need a little less code.

Here you're changing start_task and process_completed_tasks, but there's a natural spot for handling per-task things like this, the Worker._execute method. That would have a couple of advantages, because we already pull out the tasks dependencies (see how the timeout is used there) and we already have some natural exception handling with retry logic (and the ability to put the silly log glyphs in). I think if you stitch in concurrency limiting there, it would come out a little simpler.

The only other question I had was about the inevitable orphaning of concurrency slots. Any thoughts there about how to expire them? What if it was a sorted set with timestamps and we used the workers redelivery_timeout as the expiration? I think you could do a quick atomic ZREMRANGE in the lua script before checking the ZCARD. What do you think?

@abrookins
Copy link
Collaborator Author

Thanks @abrookins! First off, the docs and tests are 🤌. The implementation looks good, but I think there's a way to weave this into the worker that's a little more natural and may need a little less code.

Here you're changing start_task and process_completed_tasks, but there's a natural spot for handling per-task things like this, the Worker._execute method. That would have a couple of advantages, because we already pull out the tasks dependencies (see how the timeout is used there) and we already have some natural exception handling with retry logic (and the ability to put the silly log glyphs in). I think if you stitch in concurrency limiting there, it would come out a little simpler.

The only other question I had was about the inevitable orphaning of concurrency slots. Any thoughts there about how to expire them? What if it was a sorted set with timestamps and we used the workers redelivery_timeout as the expiration? I think you could do a quick atomic ZREMRANGE in the lua script before checking the ZCARD. What do you think?

Great points. I haven't thought much about expiring the slots. Let me explore some ideas -- I'm wondering if we can run an async task to refresh the timestamp while a task is running. It won't be perfect, but we can at least cover some bases for longer-running tasks. If I'm thinking about this correctly, that is.

@abrookins abrookins force-pushed the feature/task-concurrency-control branch from 2b10df4 to 9c0f13d Compare July 11, 2025 20:42
@abrookins
Copy link
Collaborator Author

@chrisguidry Here's what I'm considering: https://github.com/chrisguidry/docket/pull/148/files#diff-faf9939804414c2603b6478851789a5f3ac874bbcb82a63a3af6d3ccfa780b0fR962-R986

So basically, each worker would start one coroutine that manages refreshing the timestamp on any active tasks. We don't attempt to spawn one coroutine per active task, which could be problematic, and we also don't try to solve the problem of tasks that are intentionally blocked on CPU (meh). I'm not attached to this idea, but what do you think?

@chrisguidry
Copy link
Owner

@chrisguidry Here's what I'm considering: https://github.com/chrisguidry/docket/pull/148/files#diff-faf9939804414c2603b6478851789a5f3ac874bbcb82a63a3af6d3ccfa780b0fR962-R986

So basically, each worker would start one coroutine that manages refreshing the timestamp on any active tasks. We don't attempt to spawn one coroutine per active task, which could be problematic, and we also don't try to solve the problem of tasks that are intentionally blocked on CPU (meh). I'm not attached to this idea, but what do you think?

My initial reaction was that your ZSET changes should already make this work pretty well and be resilient to worker restarts, so I was trying to figure out why you wanted the lease extension mechanism here. Then I realized you're exposing a potentially deeper problem with Docket here, and one that I may have actually seen in production and not realized it.

The redelivery timeout isn't currently paired with a corresponding hard-wired task timeout. Even if the user doesn't request a timeout, every task should probably be timed out at the redelivery timeout (or the min of the user's and the redelivery timeout). I was only thinking of the redelivery timeout as being about ensuring tasks get processed when workers die ungracefully, but there's also a problem if a single task runs for longer than the redelivery timeout and then gets redelivered to another worker, which starts working on it, which then exceeds the redelivery timeout and gets redelivered to another, etc, etc, etc.

Does it seem reasonable to always time tasks out at the redelivery timeout (or sooner if they request it) and then you wouldn't need your lease extending mechanism? Definitely still has problems if tasks are hogging the CPU and starving the event loop, but that's something we probably can't help with

@abrookins
Copy link
Collaborator Author

Let me ponder this while I mine for the 100% test coverage gold.

@abrookins
Copy link
Collaborator Author

Ok, yes, after thinking about this, I think your proposal is right. I'll try to implement it in this PR, but I'll be on vacation next week. We'll see where I get before then!

@abrookins
Copy link
Collaborator Author

I'm back and will be looking at this during the week. 🫡

@codecov-commenter
Copy link

codecov-commenter commented Jul 29, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 100.00%. Comparing base (91faaa3) to head (0fada28).

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##              main      #148    +/-   ##
==========================================
  Coverage   100.00%   100.00%            
==========================================
  Files           28        31     +3     
  Lines         3675      4382   +707     
  Branches       205       246    +41     
==========================================
+ Hits          3675      4382   +707     
Flag Coverage Δ
python-3.12 100.00% <100.00%> (ø)
python-3.13 100.00% <100.00%> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/docket/__init__.py 100.00% <ø> (ø)
src/docket/dependencies.py 100.00% <100.00%> (ø)
src/docket/worker.py 100.00% <100.00%> (ø)
tests/cli/test_worker.py 100.00% <100.00%> (ø)
tests/conftest.py 100.00% <100.00%> (ø)
tests/test_concurrency_basic.py 100.00% <100.00%> (ø)
tests/test_concurrency_control.py 100.00% <100.00%> (ø)
tests/test_concurrency_refresh.py 100.00% <100.00%> (ø)
tests/test_worker.py 100.00% <100.00%> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@abrookins
Copy link
Collaborator Author

@chrisguidry Ok, other than the docs failure, I think this is good to review again. 🫡

@abrookins abrookins marked this pull request as ready for review July 29, 2025 22:53
Copy link
Owner

@chrisguidry chrisguidry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you ever get tired of hearing how good your tests are?

)

def start_task(message_id: RedisMessageID, message: RedisMessage) -> bool:
async def start_task(message_id: RedisMessageID, message: RedisMessage) -> bool:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guy won't need to be async anymore 💪

if not await self._can_start_task(redis, execution):
# Task cannot start due to concurrency limits - reschedule
logger.debug(
"🔒 Task %s blocked by concurrency limit, rescheduling",
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

execution.key,
extra=log_context,
)
# Reschedule for a few milliseconds in the future
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but @bunchesofdonald has cool algorithms for this we can stitch in later

@chrisguidry
Copy link
Owner

I invited you as a contributor so you can merge and cut a release. All you would need to do is make a release in the GH UI or CLI and everything else should be automated

@chrisguidry chrisguidry merged commit cfe3345 into chrisguidry:main Jul 30, 2025
27 of 29 checks passed
@abrookins
Copy link
Collaborator Author

Oh snap, thanks my dude!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Task concurrency control

3 participants