Skip to content

Error on startup PushBasedJetStreamBroker - "consumer name already in use" #23

@Ruslan-Droid

Description

@Ruslan-Droid

I'm trying to run the PushBasedJetStreamBroker as shown in the example. But when I run the main python file, I get an error - "consumer name already in use"

My code:

# broker_example.py
import asyncio
from taskiq_nats import PushBasedJetStreamBroker

broker = PushBasedJetStreamBroker(
    servers='localhost',
    queue='broker_example_queue',
)


@broker.task
async def my_lovely_task():
    print("I love taskiq")


async def main():
    await broker.startup()

    await my_lovely_task.kiq()

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

My dependencies:

python = "^3.13"
taskiq = "^0.11.20"
taskiq-nats = "^0.5.1"

My startup steps:

  1. I started the broker with taskiq worker broker_example:broker
(testtaskiq-py3.13) PS C:\Users\Inquisitor\PycharmProjects\TestTaskiq> taskiq worker broker_example:broker -fsd
[2025-11-17 20:00:08,690][taskiq.worker][INFO   ][MainProcess] Pid of a main process: 18400
[2025-11-17 20:00:08,690][taskiq.worker][INFO   ][MainProcess] Starting 2 worker processes.
[2025-11-17 20:00:08,694][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 18844 
[2025-11-17 20:00:08,697][taskiq.process-manager][INFO   ][MainProcess] Started process worker-1 with pid 8316 
[2025-11-17 20:00:09,215][taskiq.receiver.receiver][INFO   ][worker-1] Listening started.
[2025-11-17 20:00:09,216][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
  1. I ran python broker_example.py
Traceback (most recent call last):
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\broker_example.py", line 25, in <module>
    asyncio.run(main())
    ~~~~~~~~~~~^^^^^^^^
  File "C:\Users\Inquisitor\AppData\Local\Programs\Python\Python313\Lib\asyncio\runners.py", line 195, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "C:\Users\Inquisitor\AppData\Local\Programs\Python\Python313\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "C:\Users\Inquisitor\AppData\Local\Programs\Python\Python313\Lib\asyncio\base_events.py", line 725, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\broker_example.py", line 17, in main
    await broker.startup()
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\.venv\Lib\site-packages\taskiq_nats\broker.py", line 156, in startup
    await self._startup_consumer()
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\.venv\Lib\site-packages\taskiq_nats\broker.py", line 208, in _startup_consumer
    self.consumer = await self.js.subscribe(
                    ^^^^^^^^^^^^^^^^^^^^^^^^
    ...<3 lines>...
    )
    ^
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\.venv\Lib\site-packages\nats\js\client.py", line 476, in subscribe
    consumer_info = await self._jsm.add_consumer(stream, config=config)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\.venv\Lib\site-packages\nats\js\manager.py", line 262, in add_consumer
    resp = await self._api_request(subject, req_data, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\.venv\Lib\site-packages\nats\js\manager.py", line 484, in _api_request
    raise APIError.from_error(resp["error"])
          ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^
  File "C:\Users\Inquisitor\PycharmProjects\TestTaskiq\.venv\Lib\site-packages\nats\js\errors.py", line 89, in from_error
    raise BadRequestError(**err)
nats.js.errors.BadRequestError: nats: BadRequestError: code=400 err_code=10013 description='consumer name already in use'

I found one solution:

The queue name must match the the durable name.

If I do this, I don't have an error:

# broker_example.py
import asyncio
from taskiq_nats import PushBasedJetStreamBroker

broker = PushBasedJetStreamBroker(
    servers='localhost',
    queue="taskiq_consumer",
)

@broker.task
async def my_lovely_task():
    print("I love taskiq")


async def main():
    await broker.startup()
    await my_lovely_task.kiq()
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

The reason lies in the main js.subscribe function of the nats-py library::

# class JetStreamContext(JetStreamManager):
# async def subscribe
....
# If using a queue, that will be the consumer/durable name.
        if queue:
            if durable and durable != queue:
                raise nats.js.errors.Error(
                    f"cannot create queue subscription '{queue}' to consumer '{durable}'"
                )
            else:
                durable = queue    # ← main moment
...

Is it correct? or i'm doing something wrong?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions