Skip to content

Delayed messages are being duplicated on work queue which leads to one message being consumed and processed by many WorkerThreads #431

Open
@somnam

Description

@somnam

OS

CentOS Linux release 7.5.1804 (Core)

Python version

3.7.3

Dramatiq version

v1.11.0

Gevent version:

gevent==21.1.2
greenlet==1.0.0

Broker:

Redis server v=3.2.12

What did you do?

I run the latest dramatiq version using dramatiq-gevent command.

On the host machine I'm running two separate dramatiq instances that consume messages from separate queues:

dramatiq-gevent -p 8 -t 50 --use-spawn --queues primary module.with.dramatiq.actors
dramatiq-gevent -p 8 -t 50 --use-spawn --queues default module.with.dramatiq.actors

I've enqueued multiple messages for an actor on the default queue and each of these messages has a delay set to be run in the future. This is done using send_with_options method:

@dramatiq.actor({"priority": 2, "max_retries": 1, "min_backoff": 600000, "max_backoff": 1200000})
def calculate_account_stats_report(account_uuid: str) -> None:
    ...

for args, delay in schedule:
    calculate_account_stats_report.send_with_options(args=args, delay=delay)

What did you expect would happen?

Each delayed message is consumed and processed only once by a single WorkerThread.

What happened?

Some of the messages get duplicated when being enqueued by Broker on the 'default' queue.

This results in a single message being processed concurrently or in short time intervals by multiple WorkerThreads.
In the case of mentioned actor this leads to duplicated entries in database.

Example delayed message enqueued in default.DQ queue:

{"queue_name":"default.DQ","actor_name":"calculate_account_stats_report","args":["4ca963ee-54ce-5e0d-a4f7-6bb91eea3aa0"],"kwargs":{},"options":{"redis_message_id":"061ca9c8-b756-4559-b91c-c27cd4a63b26","eta":1633610993462},"message_id":"d67db818-1b83-44bf-ac74-f04d84ca428b","message_timestamp":1633557600177}

The messages get pushed to dramatiq:default.msgs queue after time set in eta field.

When inspecting dramatiq:default.msgs I can see that a single delayed message has been multiplicated:

$ redis-cli HGETALL "dramatiq:default.msgs" | grep "calculate_account_stats_report"
{"queue_name":"default","actor_name":"calculate_account_stats_report","args":["1975b0ea-d3f1-51f0-999e-b33b627f9532"],"kwargs":{},"options":{"redis_message_id":"64f577c9-0b7b-43ad-91c1-e2636a7777e3"},"message_id":"5a77fb7b-7b2b-44fc-913c-3c40edbb7800","message_timestamp":1633557600166}
{"queue_name":"default","actor_name":"calculate_account_stats_report","args":["1975b0ea-d3f1-51f0-999e-b33b627f9532"],"kwargs":{},"options":{"redis_message_id":"5fca8c15-6e9a-4ec6-924f-493bd50a3529"},"message_id":"5a77fb7b-7b2b-44fc-913c-3c40edbb7800","message_timestamp":1633557600166}
{"queue_name":"default","actor_name":"calculate_account_stats_report","args":["1975b0ea-d3f1-51f0-999e-b33b627f9532"],"kwargs":{},"options":{"redis_message_id":"4ccf6d96-9180-44bc-9149-a439ec3cdd41"},"message_id":"5a77fb7b-7b2b-44fc-913c-3c40edbb7800","message_timestamp":1633557600166}

Each message has the same message_id but different redis_message_ids.

After inspecting workers I can see that they have consumed these duplicates and will process them:

127.0.0.1:6379> SMEMBERS "dramatiq:__acks__.b44694ee-d5f6-45ad-baa0-c7a206f75899.default"
1) "4ccf6d96-9180-44bc-9149-a439ec3cdd41"
2) "fc374c21-3d75-464a-84f5-e4de0d4c86a6"
3) "e26e899a-518b-4e1c-a723-4e649aa0a6e4"
4) "7b0b6c8c-6586-428a-9dec-c2d0c5d6aa7b"
127.0.0.1:6379> SMEMBERS "dramatiq:__acks__.820e5157-2c3b-4d90-9cd9-158cd2161c10.default"
1) "5fca8c15-6e9a-4ec6-924f-493bd50a3529"
2) "64f577c9-0b7b-43ad-91c1-e2636a7777e3"
3) "6938f2fa-9af1-4a79-98d1-8b24a9a08f7a"
4) "afe4b678-7d4a-4311-8b3b-16785899eb0e"

The amount of messages with same ID being set indramatiq:default.msgs looks random - sometimes (correctly) only one, sometimes two or more.

Exaple debug logs when one message gets picked up by two Workers:

2021-10-07 13:45:10,323 DEBUG [Process-4:10793] [Thread-2] [dramatiq.broker.RedisBroker redis:180] Enqueueing message '41e3df3d-c637-44b7-8a84-7c3375e00eeb' on queue 'default'.
2021-10-07 13:45:10,324 DEBUG [Process-4:10793] [Thread-2] [dramatiq.worker.ConsumerThread(default.DQ) worker:345] Acknowledging message '41e3df3d-c637-44b7-8a84-7c3375e00eeb'.
2021-10-07 13:45:10,334 DEBUG [Process-3:10792] [Thread-1] [dramatiq.worker.ConsumerThread(default) worker:321] Pushing message '41e3df3d-c637-44b7-8a84-7c3375e00eeb' onto work queue.
2021-10-07 13:45:10,335 DEBUG [Process-3:10792] [Thread-7] [dramatiq.worker.WorkerThread worker:471] Received message calculate_account_stats_report('7b881686-cafb-57e8-b525-ebc9a941a7a4') with id '41e3df3d-c637-44b7-8a84-7c3375e00eeb'.
2021-10-07 13:46:11,784 DEBUG [Process-2:10791] [Thread-1] [dramatiq.worker.ConsumerThread(default) worker:321] Pushing message '41e3df3d-c637-44b7-8a84-7c3375e00eeb' onto work queue.
2021-10-07 13:46:11,786 DEBUG [Process-2:10791] [Thread-43] [dramatiq.worker.WorkerThread worker:471] Received message calculate_account_stats_report('7b881686-cafb-57e8-b525-ebc9a941a7a4') with id '41e3df3d-c637-44b7-8a84-7c3375e00eeb'.
2021-10-07 13:46:52,050 DEBUG [Process-3:10792] [Thread-7] [dramatiq.worker.ConsumerThread(default) worker:345] Acknowledging message '41e3df3d-c637-44b7-8a84-7c3375e00eeb'.
2021-10-07 13:46:52,049 DEBUG [Process-2:10791] [Thread-43] [dramatiq.worker.ConsumerThread(default) worker:345] Acknowledging message '41e3df3d-c637-44b7-8a84-7c3375e00eeb'.

Same example for different message:

2021-10-07 14:49:53,730 DEBUG [Process-4:10793] [Thread-2] [dramatiq.broker.RedisBroker redis:180] Enqueueing message 'd67db818-1b83-44bf-ac74-f04d84ca428b' on queue 'default'.
2021-10-07 14:49:53,731 DEBUG [Process-4:10793] [Thread-2] [dramatiq.worker.ConsumerThread(default.DQ) worker:345] Acknowledging message 'd67db818-1b83-44bf-ac74-f04d84ca428b'.
2021-10-07 14:49:53,742 DEBUG [Process-1:10790] [Thread-1] [dramatiq.worker.ConsumerThread(default) worker:321] Pushing message 'd67db818-1b83-44bf-ac74-f04d84ca428b' onto work queue.
2021-10-07 14:49:53,742 DEBUG [Process-1:10790] [Thread-35] [dramatiq.worker.WorkerThread worker:471] Received message calculate_account_stats_report('4ca963ee-54ce-5e0d-a4f7-6bb91eea3aa0') with id 'd67db818-1b83-44bf-ac74-f04d84ca428b'.
2021-10-07 14:51:05,252 DEBUG [Process-5:10794] [Thread-1] [dramatiq.worker.ConsumerThread(default) worker:321] Pushing message 'd67db818-1b83-44bf-ac74-f04d84ca428b' onto work queue.
2021-10-07 14:51:05,253 DEBUG [Process-5:10794] [Thread-26] [dramatiq.worker.WorkerThread worker:471] Received message calculate_account_stats_report('4ca963ee-54ce-5e0d-a4f7-6bb91eea3aa0') with id 'd67db818-1b83-44bf-ac74-f04d84ca428b'.
2021-10-07 14:52:01,254 DEBUG [Process-5:10794] [Thread-26] [dramatiq.worker.ConsumerThread(default) worker:345] Acknowledging message 'd67db818-1b83-44bf-ac74-f04d84ca428b'.
2021-10-07 14:52:01,259 DEBUG [Process-1:10790] [Thread-35] [dramatiq.worker.ConsumerThread(default) worker:345] Acknowledging message 'd67db818-1b83-44bf-ac74-f04d84ca428b'.

Reproducible example:

Unfortunately I wasn't able to reproduce the issue in the development environment, but it happens on a regular basis on production env.

Funding

  • You can sponsor this specific effort via a Polar.sh pledge below
  • We receive the pledge once the issue is completed & verified
Fund with Polar

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions