Skip to content

Commit b7513cc

Browse files
committed
delegate time bucket naming to the backend; ensure same-slot hashing
1 parent 0c48baa commit b7513cc

File tree

3 files changed

+21
-1
lines changed

3 files changed

+21
-1
lines changed

dramatiq/rate_limits/backend.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,19 @@ def decr(self, key, amount, minimum, ttl): # pragma: no cover
6565
"""
6666
raise NotImplementedError
6767

68+
def format_key_variants(self, key, variants): # pragma: no cover
69+
"""Build a list of related key names from a "base" key name and and a list of
70+
distinct "variants" that can act as e.g. name suffixes.
71+
72+
Parameters:
73+
key(str): The base key name.
74+
variants(list[str]): Distinct values to incorporate into the resulting names.
75+
76+
Returns:
77+
list[str]: The list of resulting key names.
78+
"""
79+
return [f"{key}@{variant}" for variant in variants]
80+
6881
def incr_and_sum(self, key, keys, amount, maximum, ttl): # pragma: no cover
6982
"""Atomically increment a key unless the sum of keys is greater
7083
than the given maximum.

dramatiq/rate_limits/backends/redis.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@ def decr(self, key, amount, minimum, ttl):
6363
decr_down_to = self.scripts["decr_down_to"]
6464
return decr_down_to([key], [amount, minimum, ttl]) == 1
6565

66+
def format_key_variants(self, key, variants):
67+
# NOTE the extra { } around the key - this is to make use of Hash tags [0]
68+
# to make sure that multi-key commands execute on the keys in same hash slots.
69+
# This helps avoid a ClusterCrossSlotError in case this redis is a Redis Cluster.
70+
# [0]: https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/#keys-hash-tags
71+
return [f"{{{key}}}@{variant}" for variant in variants]
72+
6673
def incr_and_sum(self, key, keys, amount, maximum, ttl):
6774
# TODO: Drop non-callable keys in Dramatiq v2.
6875
keys_list = keys() if callable(keys) else keys

dramatiq/rate_limits/window.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def __init__(self, backend, key, *, limit=1, window=1):
5252

5353
def _get_keys(self):
5454
timestamp = int(time.time())
55-
return ["%s@%s" % (self.key, timestamp - i) for i in range(self.window)]
55+
return self.backend.format_key_variants(self.key, [str(timestamp - i) for i in range(self.window)])
5656

5757
def _acquire(self):
5858
keys = self._get_keys()

0 commit comments

Comments
 (0)