Skip to content

Commit cc81586

Browse files
committed
redis: add support for broker priority
Create different queues for priority steps and consuming them in that order.
1 parent 0c02bb0 commit cc81586

File tree

7 files changed

+297
-17
lines changed

7 files changed

+297
-17
lines changed

docs/source/advanced.rst

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,49 @@ failover if the currently connected node fails.
7676
.. _high availability cluster: https://www.rabbitmq.com/ha.html
7777
.. _connection parameters: https://pika.readthedocs.io/en/0.12.0/modules/parameters.html
7878

79+
Broker Priority Queues
80+
^^^^^^^^^^^^^^^^^^^^^^^
81+
Dramatiq supports priority queues on RabbitMQ and Redis.
82+
To configure the broker to work with priority queues, you should set the ``max_priority`` parameter of the broker.
83+
To enqueue a message with a priority, you should set the ``broker_priority`` parameter of the |Message|'s options.
84+
85+
86+
.. code-block:: python
87+
88+
import dramatiq
89+
from dramatiq.brokers.rabbitmq import RabbitmqBroker
90+
91+
# Using max_priority parameter:
92+
93+
rabbitmq_broker = RabbitmqBroker(url="amqp://guest:[email protected]:5672", max_priority=10)
94+
95+
# Define an actor with priority:
96+
@dramatiq.actor(broker=rabbitmq_broker)
97+
def operation(priority):
98+
print(priority)
99+
100+
# Enqueue a message with priority (lower number means higher priority):
101+
operation.send_with_options(args=(3,), options={"broker_priority": 3})
102+
operation.send_with_options(args=(2,), options={"broker_priority": 2})
103+
operation.send_with_options(args=(1,), options={"broker_priority": 1})
104+
105+
106+
RabbitMQ
107+
~~~~~~~~
108+
Dramatiq supports RabbitMQ's `priority queues`_ feature.
109+
To use it, you should set the ``max_priority`` parameter of the |RabbitmqBroker| to a value
110+
between 0 and 255 (10 is the recommended values).
111+
112+
.. `priority queues`_: https://www.rabbitmq.com/priority.html
113+
114+
115+
Redis
116+
~~~~~
117+
Dramatiq take similar approach as celery to implement priority queues on Redis.
118+
To use it, you should set the ``max_priority`` parameter of the |RedisBroker| to a value up to 10.
119+
The broker will created multiple queues for each priority level defined by ``priority_steps``, (default is 4 steps).
120+
The consumer will consume messages from the highest priority queue first.
121+
This method isn't as reliable as RabbitMQ's, for example, large number of messages ending up in the same queue.
79122

80123
Other brokers
81124
^^^^^^^^^^^^^

dramatiq/broker.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,16 @@ def join(self, queue_name, *, timeout=None): # pragma: no cover
299299
"""
300300
raise NotImplementedError()
301301

302+
def get_queue_size(self, queue_name: str):
303+
"""
304+
Get the number of messages in a queue. This method is only meant to be used in unit and integration tests.
305+
Parameters:
306+
queue_name(str): The queue whose message counts to get.
307+
308+
Returns: The number of messages in the queue, including the delay queue
309+
"""
310+
raise NotImplementedError()
311+
302312

303313
class Consumer:
304314
"""Consumers iterate over messages on a queue.

dramatiq/brokers/rabbitmq.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,16 @@ def join(self, queue_name, min_successes=10, idle_time=100, *, timeout=None):
431431

432432
self.connection.sleep(idle_time / 1000)
433433

434+
def get_queue_size(self, queue_name: str):
435+
"""
436+
Get the number of messages in a queue. This method is only meant to be used in unit and integration tests.
437+
Parameters:
438+
queue_name(str): The queue whose message counts to get.
439+
440+
Returns: The number of messages in the queue, including the delay queue
441+
"""
442+
return sum(self.get_queue_message_counts(queue_name)[:-1])
443+
434444

435445
def URLRabbitmqBroker(url, *, middleware=None):
436446
"""Alias for the RabbitMQ broker that takes a connection URL as a

dramatiq/brokers/redis.py

Lines changed: 103 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import random
2020
import time
2121
import warnings
22+
from bisect import bisect
23+
from collections import defaultdict
2224
from os import path
2325
from threading import Lock
2426
from uuid import uuid4
@@ -32,7 +34,7 @@
3234
from ..message import Message
3335

3436
MAINTENANCE_SCALE = 1000000
35-
MAINTENANCE_COMMAND_BLACKLIST = {"ack", "nack"}
37+
MAINTENANCE_COMMAND_BLACKLIST = {"ack", "nack", "qsize"}
3638

3739
#: How many commands out of a million should trigger queue
3840
#: maintenance.
@@ -50,6 +52,33 @@
5052
#: the first time it's run, but it may be overwritten using this var.
5153
DEFAULT_LUA_MAX_STACK = getenv_int("dramatiq_lua_max_stack")
5254

55+
#: The default priority steps. Each step will create a new queue
56+
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]
57+
58+
59+
def _get_all_priority_queue_names(queue_name, priority_steps):
60+
"""
61+
Yields the queue names for a given queue name and a list of priority steps.
62+
Parameters:
63+
queue_name(str): The queue name
64+
priority_steps(list[int]): The configured priority steps
65+
Returns: The queue names for the given queue name and priority steps
66+
"""
67+
if dq_name(queue_name) == queue_name:
68+
return
69+
for step in priority_steps:
70+
yield pri_name(queue_name, step)
71+
72+
73+
def pri_name(queue_name, priority):
74+
"""Returns the queue name for a given queue name and a priority. If the given
75+
queue name already belongs to a priority queue, then it is returned
76+
unchanged.
77+
"""
78+
if queue_name.endswith(".PR{}".format(priority)):
79+
return queue_name
80+
return "{}.PR{}".format(queue_name, priority)
81+
5382

5483
class RedisBroker(Broker):
5584
"""A broker than can be used with Redis.
@@ -81,6 +110,11 @@ class RedisBroker(Broker):
81110
dead-lettered messages are kept in Redis for.
82111
requeue_deadline(int): Deprecated. Does nothing.
83112
requeue_interval(int): Deprecated. Does nothing.
113+
max_priority(int): Configure queues with max priority to support message’s broker_priority option.
114+
The queuing is done by having multiple queues for each named queue.
115+
The queues are then consumed by in order of priority. The max value of max_priority is 10.
116+
priority_steps(list[int]): The priority range that is collapsed into the queues (4 by default).
117+
The number of steps can be configured by providing a list of numbers in sorted order
84118
client(redis.StrictRedis): A redis client to use.
85119
**parameters: Connection parameters are passed directly
86120
to :class:`redis.Redis`.
@@ -97,6 +131,8 @@ def __init__(
97131
requeue_deadline=None,
98132
requeue_interval=None,
99133
client=None,
134+
max_priority=None,
135+
priority_steps=None,
100136
**parameters
101137
):
102138
super().__init__(middleware=middleware)
@@ -114,6 +150,14 @@ def __init__(
114150
self.heartbeat_timeout = heartbeat_timeout
115151
self.dead_message_ttl = dead_message_ttl
116152
self.queues = set()
153+
if max_priority:
154+
if max_priority > 10:
155+
raise ValueError("max priority is supported up to 10")
156+
if not priority_steps:
157+
self.priority_steps = DEFAULT_PRIORITY_STEPS[:bisect(DEFAULT_PRIORITY_STEPS, max_priority) - 1]
158+
self.priority_steps = priority_steps or []
159+
else:
160+
self.priority_steps = []
117161
# TODO: Replace usages of StrictRedis (redis-py 2.x) with Redis in Dramatiq 2.0.
118162
self.client = client or redis.StrictRedis(**parameters)
119163
self.scripts = {name: self.client.register_script(script) for name, script in _scripts.items()}
@@ -163,6 +207,9 @@ def enqueue(self, message, *, delay=None):
163207
ValueError: If ``delay`` is longer than 7 days.
164208
"""
165209
queue_name = message.queue_name
210+
if "broker_priority" in message.options and delay is None:
211+
priority = message.options["broker_priority"]
212+
queue_name = self.priority_queue_name(queue_name, priority)
166213

167214
# Each enqueued message must have a unique id in Redis so
168215
# using the Message's id isn't safe because messages may be
@@ -202,7 +249,7 @@ def flush(self, queue_name):
202249
Parameters:
203250
queue_name(str): The queue to flush.
204251
"""
205-
for name in (queue_name, dq_name(queue_name)):
252+
for name in (queue_name, dq_name(queue_name), *_get_all_priority_queue_names(queue_name, self.priority_steps)):
206253
self.do_purge(name)
207254

208255
def flush_all(self):
@@ -231,13 +278,36 @@ def join(self, queue_name, *, interval=100, timeout=None):
231278
if deadline and time.monotonic() >= deadline:
232279
raise QueueJoinTimeout(queue_name)
233280

234-
size = self.do_qsize(queue_name)
281+
size = self.get_queue_size(queue_name)
235282

236283
if size == 0:
237284
return
238285

239286
time.sleep(interval / 1000)
240287

288+
def get_queue_size(self, queue_name):
289+
"""
290+
Get the number of messages in a queue. This method is only meant to be used in unit and integration tests.
291+
Parameters:
292+
queue_name(str): The queue whose message counts to get.
293+
294+
Returns: The number of messages in the queue, including the delay queue
295+
"""
296+
size = 0
297+
if self.priority_steps:
298+
for queue_name in _get_all_priority_queue_names(queue_name, self.priority_steps):
299+
qsize = self.do_qsize(queue_name)
300+
size += qsize
301+
size += self.do_qsize(queue_name)
302+
return size
303+
304+
def priority_queue_name(self, queue, priority):
305+
if priority is None or dq_name(queue) == queue:
306+
return queue
307+
308+
queue_number = self.priority_steps[bisect(self.priority_steps, priority) - 1]
309+
return pri_name(queue, queue_number)
310+
241311
def _should_do_maintenance(self, command):
242312
return int(
243313
command not in MAINTENANCE_COMMAND_BLACKLIST and
@@ -310,7 +380,8 @@ def ack(self, message):
310380
# The current queue might be different from message.queue_name
311381
# if the message has been delayed so we want to ack on the
312382
# current queue.
313-
self.broker.do_ack(self.queue_name, message.options["redis_message_id"])
383+
queue_name = self.broker.priority_queue_name(self.queue_name, message.options.get("broker_priority"))
384+
self.broker.do_ack(queue_name, message.options["redis_message_id"])
314385
except redis.ConnectionError as e:
315386
raise ConnectionClosed(e) from None
316387
finally:
@@ -319,19 +390,26 @@ def ack(self, message):
319390
def nack(self, message):
320391
try:
321392
# Same deal as above.
322-
self.broker.do_nack(self.queue_name, message.options["redis_message_id"])
393+
queue_name = self.broker.priority_queue_name(self.queue_name, message.options.get("broker_priority"))
394+
self.broker.do_nack(queue_name, message.options["redis_message_id"])
323395
except redis.ConnectionError as e:
324396
raise ConnectionClosed(e) from None
325397
finally:
326398
self.queued_message_ids.discard(message.message_id)
327399

328400
def requeue(self, messages):
329-
message_ids = [message.options["redis_message_id"] for message in messages]
330-
if not message_ids:
331-
return
332-
333-
self.logger.debug("Re-enqueueing %r on queue %r.", message_ids, self.queue_name)
334-
self.broker.do_requeue(self.queue_name, *message_ids)
401+
messages_id_by_queue = defaultdict(list)
402+
for message in messages:
403+
priority = message.options.get("broker_priority")
404+
if priority is None:
405+
queue_name = self.queue_name
406+
else:
407+
queue_name = self.broker.priority_queue_name(self.queue_name, priority)
408+
messages_id_by_queue[queue_name].append(message.options["redis_message_id"])
409+
410+
for queue_name, message_ids in messages_id_by_queue.items():
411+
self.logger.debug("Re-enqueueing %r on queue %r.", message_ids, self.queue_name)
412+
self.broker.do_requeue(queue_name, *message_ids)
335413

336414
def __next__(self):
337415
try:
@@ -360,11 +438,16 @@ def __next__(self):
360438
# prefetch up to that number of messages.
361439
messages = []
362440
if self.outstanding_message_count < self.prefetch:
363-
self.message_cache = messages = self.broker.do_fetch(
364-
self.queue_name,
365-
self.prefetch - self.outstanding_message_count,
366-
)
367-
441+
for queue_name in self.queue_names():
442+
# Ideally, we would want to sort the messages by their priority,
443+
# but that will require decoding them now
444+
self.message_cache = messages = self.broker.do_fetch(
445+
queue_name,
446+
self.prefetch - self.outstanding_message_count,
447+
)
448+
449+
if messages:
450+
break
368451
# Because we didn't get any messages, we should
369452
# progressively long poll up to the idle timeout.
370453
if not messages:
@@ -374,6 +457,10 @@ def __next__(self):
374457
except redis.ConnectionError as e:
375458
raise ConnectionClosed(e) from None
376459

460+
def queue_names(self):
461+
yield from _get_all_priority_queue_names(self.queue_name, self.broker.priority_steps)
462+
yield self.queue_name
463+
377464

378465
_scripts = {}
379466
_scripts_path = path.join(path.abspath(path.dirname(__file__)), "redis")

dramatiq/brokers/redis/dispatch.lua

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ local queue_acks = acks .. "." .. queue_name
6868
local queue_full_name = namespace .. ":" .. queue_name
6969
local queue_messages = queue_full_name .. ".msgs"
7070
local xqueue_full_name = namespace .. ":" .. queue_canonical_name .. ".XQ"
71+
if string.sub(queue_canonical_name, -4, -2) == ".PR" then
72+
xqueue_full_name = namespace .. ":" .. string.sub(queue_canonical_name, 1, -5) .. ".XQ"
73+
end
7174
local xqueue_messages = xqueue_full_name .. ".msgs"
7275

7376
-- Command-specific arguments.

dramatiq/brokers/stub.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,16 @@ def join(self, queue_name, *, fail_fast=False, timeout=None):
175175
raise message._exception from None
176176

177177
return
178+
def get_queue_size(self, queue_name):
179+
"""Returns the number of messages in a queue.
180+
181+
Parameters:
182+
queue_name(str): The queue to inspect.
183+
184+
Returns:
185+
int: The number of messages in the queue.
186+
"""
187+
return self.queues[queue_name].qsize() + self.queues[dq_name(queue_name)].qsize()
178188

179189

180190
class _StubConsumer(Consumer):

0 commit comments

Comments
 (0)