Skip to content

Commit 8ce7b51

Browse files
committed
Remove worker heartbeat compatibility code
This reverts commit d6cff30.
1 parent 62b2e9b commit 8ce7b51

File tree

4 files changed

+4
-65
lines changed

4 files changed

+4
-65
lines changed

docs/source/changelog.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@ Breaking Changes
1919
* The ``backend`` argument to the |Results| middleware is now required.
2020
Previously, not supplying this argument would result in a non-functional |Results| middleware.
2121
(`#728`_, `@LincolnPuzey`_)
22+
* The long-deprecated ``requeue_{deadline,interval}`` parameters of |RedisBroker|,
23+
as well as the obsolete heartbeat compat code, have been removed. (`TBD`_, `@mikeroll`_)
2224

2325
.. _#95: https://github.com/Bogdanp/dramatiq/issues/95
2426
.. _#345: https://github.com/Bogdanp/dramatiq/issues/345
2527
.. _#688: https://github.com/Bogdanp/dramatiq/pull/688
2628
.. _@azmeuk: https://github.com/azmeuk
2729
.. _#728: https://github.com/Bogdanp/dramatiq/pull/728
30+
.. _TBD: https://github.com/Bogdanp/dramatiq/pull/TBD
2831

2932
Fixed
3033
^^^^^

dramatiq/brokers/redis.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import glob
2121
import random
2222
import time
23-
import warnings
2423
from os import path
2524
from threading import Lock
2625
from typing import Optional
@@ -82,9 +81,7 @@ class RedisBroker(Broker):
8281
offline.
8382
dead_message_ttl(int): The amount of time (in ms) that
8483
dead-lettered messages are kept in Redis for.
85-
requeue_deadline(int): Deprecated. Does nothing.
86-
requeue_interval(int): Deprecated. Does nothing.
87-
client(redis.StrictRedis): A redis client to use.
84+
client(redis.Redis): A redis client to use.
8885
**parameters: Connection parameters are passed directly
8986
to :class:`redis.Redis`.
9087
@@ -100,8 +97,6 @@ def __init__(
10097
maintenance_chance=DEFAULT_MAINTENANCE_CHANCE,
10198
heartbeat_timeout=DEFAULT_HEARTBEAT_TIMEOUT,
10299
dead_message_ttl=DEFAULT_DEAD_MESSAGE_TTL,
103-
requeue_deadline=None,
104-
requeue_interval=None,
105100
client=None,
106101
**parameters,
107102
):
@@ -110,10 +105,6 @@ def __init__(
110105
if url:
111106
parameters["connection_pool"] = redis.ConnectionPool.from_url(url)
112107

113-
if requeue_deadline or requeue_interval:
114-
message = "requeue_{deadline,interval} have been deprecated and no longer do anything"
115-
warnings.warn(message, DeprecationWarning, stacklevel=2)
116-
117108
self.broker_id = str(uuid4())
118109
self.namespace = namespace
119110
self.maintenance_chance = maintenance_chance

dramatiq/brokers/redis/dispatch.lua

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -138,18 +138,6 @@ if do_maintenance == "1" then
138138
redis.call("hdel", xqueue_messages, unpack(dead_message_ids_batch))
139139
end
140140
end
141-
142-
-- The following code is required for backwards-compatibility with
143-
-- the old way acks used to be implemented. It hoists any
144-
-- existing acks zsets into the per-worker sets.
145-
local compat_queue_acks = queue_full_name .. ".acks"
146-
local compat_message_ids = redis.call("zrangebyscore", compat_queue_acks, 0, timestamp - 86400000 * 7.5)
147-
if next(compat_message_ids) then
148-
for compat_message_ids_batch in iter_chunks(compat_message_ids) do
149-
redis.call("sadd", queue_acks, unpack(compat_message_ids_batch))
150-
redis.call("zrem", compat_queue_acks, unpack(compat_message_ids_batch))
151-
end
152-
end
153141
end
154142

155143

tests/test_redis.py

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -332,15 +332,6 @@ def test_redis_broker_can_connect_via_client():
332332
assert broker.client is client
333333

334334

335-
def test_redis_broker_warns_about_deprecated_parameters():
336-
# When I pass deprecated params to RedisBroker
337-
# Then it should warn me that those params do nothing
338-
with pytest.warns(DeprecationWarning) as record:
339-
RedisBroker(requeue_deadline=1000)
340-
341-
assert str(record[0].message) == "requeue_{deadline,interval} have been deprecated and no longer do anything"
342-
343-
344335
def test_redis_broker_raises_attribute_error_when_given_an_invalid_attribute(redis_broker):
345336
# Given that I have a Redis broker
346337
# When I try to access an attribute that doesn't exist
@@ -349,40 +340,6 @@ def test_redis_broker_raises_attribute_error_when_given_an_invalid_attribute(red
349340
redis_broker.idontexist
350341

351342

352-
def test_redis_broker_maintains_backwards_compat_with_old_acks(redis_broker):
353-
# Given that I have an actor
354-
@dramatiq.actor
355-
def do_work(self):
356-
pass
357-
358-
# And that actor has some old-style unacked messages
359-
expired_message_ids = set()
360-
valid_message_ids = set()
361-
for i in range(LUA_MAX_UNPACK_SIZE * 2):
362-
expired_message_id = b"expired-old-school-ack-%r" % i
363-
valid_message_id = b"valid-old-school-ack-%r" % i
364-
expired_message_ids.add(expired_message_id)
365-
valid_message_ids.add(valid_message_id)
366-
if redis.__version__.startswith("2."):
367-
redis_broker.client.zadd("dramatiq:default.acks", 0, expired_message_id)
368-
redis_broker.client.zadd("dramatiq:default.acks", current_millis(), valid_message_id)
369-
else:
370-
redis_broker.client.zadd("dramatiq:default.acks", {expired_message_id: 0})
371-
redis_broker.client.zadd("dramatiq:default.acks", {valid_message_id: current_millis()})
372-
373-
# When maintenance runs for that actor's queue
374-
redis_broker.maintenance_chance = MAINTENANCE_SCALE
375-
redis_broker.do_qsize(do_work.queue_name)
376-
377-
# Then maintenance should move the expired message to the new style acks set
378-
unacked = redis_broker.client.smembers("dramatiq:__acks__.%s.default" % redis_broker.broker_id)
379-
assert set(unacked) == expired_message_ids
380-
381-
# And the valid message should stay in that set
382-
compat_unacked = redis_broker.client.zrangebyscore("dramatiq:default.acks", 0, "+inf")
383-
assert set(compat_unacked) == valid_message_ids
384-
385-
386343
def test_redis_consumer_ack_can_retry_on_connection_error(redis_broker, redis_worker):
387344
# Given that I have an actor
388345
@dramatiq.actor

0 commit comments

Comments
 (0)