Replies: 5 comments 21 replies
-
|
Hello! I would start by looking at the code in the CLI used to run a worker: Really it just sets up the log config and calls If it's important to separate the work out from the worker code, you could try using threads to spawn workers (but not processes since you'd want the worker to access the same task definitions in memory). Anyways my instinct is that this is possible, give it a try and let me know how it goes. |
Beta Was this translation helpful? Give feedback.
-
|
There's basically two different ways this could be done. We could do something similar to FastAPI routers: group_a = TaskGroup("a")
group_b = TaskGroup("b")
group_a.add_subgroup(group_b)
@group_a.task()
async def barfoo() -> bool: ...
@group_b.task(timeout=1)
async def sleeper() -> None: ...
worker = Worker()
worker.add_group(group_a)
@worker.task()
async def foobar() -> int: ...This works great. However, it wouldn't support enqueuing jobs unless you've already entered the worker's async context manager. The other option would be something like this: worker_a = Worker()
worker_b = Worker()
worker_a.include(worker_b)
@worker_a.task()
async def barfoo() -> bool: ...
@worker_b.task(timeout=1)
async def sleeper() -> None: ...
worker = Worker()
worker.include(worker_a)
@worker.task()
async def foobar() -> int: ...This would allow for enqueuing tasks in a sub-worker's context manager without needing to worry about the root worker, plus you could run sub-workers individually, so it's more flexible and powerful. I'll have to think about how I'd implement this though. Thoughts @espdev @wwarne @kuhnroyal @vikigenius ? |
Beta Was this translation helpful? Give feedback.
-
|
I have added a simple way to modularize workers to the master branch: https://streaq.readthedocs.io/en/latest/worker.html#modularizing-workers |
Beta Was this translation helpful? Give feedback.
-
|
Y'all might find #113 interesting |
Beta Was this translation helpful? Give feedback.
-
|
Please see #125, I think this resolves all modularity concerns in a satisfactory way |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hello,
streaq is a very interesting library. I'm thinking about using it in our project. Unfortunately, I don't quite understand how you can use the library without creating a module-level worker.
The documentation says:
And running the worker:
Let's imagine, I would like to initialize a worker not in import-time but in run-time (using a worker_factory func), for example, something like this:
And run the worker:
And somewhere in my code I can access worker (by dependency injection, for example) and enqueue tasks.
I need to get rid of any logic in import-time for more control, configurability and flexibility.
The problem with import-time is that I need to have an initialized configured Worker already at the function-task definition stage. But in real applications, I can only configure the Worker after some initial loading and configuration of my application. I can't manage the application configuration at the time of importing modules.
For example, see taskiq dynamic envs (I can't use taskiq because there is no any way to abort tasks).
Any thoughts on this.
Beta Was this translation helpful? Give feedback.
All reactions