-
Notifications
You must be signed in to change notification settings - Fork 7
Open
Description
# broker.py
import asyncio
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
from taskiq_nats import NatsBroker
result_backend = RedisAsyncResultBackend(
redis_url="redis://default:P@[email protected]:6379",
)
# Or you can use PubSubBroker if you need broadcasting
# Or ListQueueBroker if you don't want acknowledges
broker = RedisStreamBroker(
url="redis://default:P@[email protected]:6379",
).with_result_backend(result_backend)
broker_nats = NatsBroker(
servers=["nats://nats:P@[email protected]:4222"], queue="my_queue"
).with_result_backend(result_backend)
@broker.task
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(0.5)
print("All problems are solved!")
@broker_nats.task
async def best_task_ever_nats() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(0.5)
print("All problems are solved with NATS!")
async def main():
await broker.startup()
await broker_nats.startup()
task = await best_task_ever.kiq()
print(await task.wait_result())
task_nats = await best_task_ever_nats.kiq()
print(await task_nats.wait_result())
if __name__ == "__main__":
asyncio.run(main())I've tried everything I understood before coming here.
- I started the broker with
taskiq broker:broker - I ran
python3 broker.py
OUTPUT:
Worker (taskiq broker:broker)
08:00:23 ((.env) ) root@MSI nats ±|main ✗|→ taskiq worker broker:broker
[2025-09-17 20:00:24,913][taskiq.worker][INFO ][MainProcess] Pid of a main process: 97868
[2025-09-17 20:00:24,913][taskiq.worker][INFO ][MainProcess] Starting 2 worker processes.
[2025-09-17 20:00:24,915][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 97869
[2025-09-17 20:00:24,917][taskiq.process-manager][INFO ][MainProcess] Started process worker-1 with pid 97871
[2025-09-17 20:00:25,135][taskiq.receiver.receiver][INFO ][worker-0] Listening started.
[2025-09-17 20:00:25,136][taskiq.receiver.receiver][INFO ][worker-1] Listening started.
[2025-09-17 20:00:31,753][taskiq.receiver.receiver][INFO ][worker-1] Executing task broker:best_task_ever with ID: bf5154a9a08d4434a49e14c562d8fffc
All problems are solved!
Broker (python3 broker.py)
08:00:30 ((.env) ) root@MSI nats ±|main ✗|→ python3 broker.py
is_err=False log=None return_value=None execution_time=0.53 labels={} error=None
It's just indefinitely stuck after this.
Can you please help @s3rius
What am I getting wrong??
taskiq = "^0.11.17"
taskiq-fastapi = "^0.3.5"
taskiq-nats = "^0.5.1"
taskiq-redis = "^1.0.9"
Metadata
Metadata
Assignees
Labels
No labels