Skip to content

Commit 5a03f0c

Browse files
authored
Merge pull request #771 from mikeroll/remove-heartbeat-compat
redis: Remove worker heartbeat/ack compatibility code
2 parents 2293a55 + ab5b4c8 commit 5a03f0c

File tree

3 files changed

+7
-46
lines changed

3 files changed

+7
-46
lines changed

docs/source/changelog.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,17 @@ These are changes that while technically breaking, we believe are unlikely to ef
6767
* The deprecated ``requeue_deadline`` and ``requeue_interval`` arguments of |RedisBroker| have been removed.
6868
These have been deprecated and have had no effect since version 1.2.0.
6969
(`#782`_, `@mikeroll`_)
70+
* |RedisBroker|: The code for compatibility with pre-v1.2.0 acks has been removed.
71+
If you are using the |RedisBroker|, you must first upgrade Dramatiq to a version >=1.2.0, <2.0.0,
72+
and run it for some time, before upgrading to a version >=2.0.0.
73+
This is to allow Dramatiq to migrate the pre-v1.2.0 ack data structures in redis to the v1.2.0+
74+
versions. This migration code is what has been removed in version 2.0.0.
75+
(`#771`_, `@mikeroll`_)
7076

7177
.. _#766: https://github.com/Bogdanp/dramatiq/pull/766
7278
.. _PEP-735: https://peps.python.org/pep-0735/
7379
.. _#741: https://github.com/Bogdanp/dramatiq/issues/741
80+
.. _#771: https://github.com/Bogdanp/dramatiq/pull/771
7481
.. _#772: https://github.com/Bogdanp/dramatiq/pull/772
7582
.. _#782: https://github.com/Bogdanp/dramatiq/pull/782
7683
.. _@mikeroll: https://github.com/mikeroll

dramatiq/brokers/redis/dispatch.lua

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -138,18 +138,6 @@ if do_maintenance == "1" then
138138
redis.call("hdel", xqueue_messages, unpack(dead_message_ids_batch))
139139
end
140140
end
141-
142-
-- The following code is required for backwards-compatibility with
143-
-- the old way acks used to be implemented. It hoists any
144-
-- existing acks zsets into the per-worker sets.
145-
local compat_queue_acks = queue_full_name .. ".acks"
146-
local compat_message_ids = redis.call("zrangebyscore", compat_queue_acks, 0, timestamp - 86400000 * 7.5)
147-
if next(compat_message_ids) then
148-
for compat_message_ids_batch in iter_chunks(compat_message_ids) do
149-
redis.call("sadd", queue_acks, unpack(compat_message_ids_batch))
150-
redis.call("zrem", compat_queue_acks, unpack(compat_message_ids_batch))
151-
end
152-
end
153141
end
154142

155143

tests/test_redis.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -340,40 +340,6 @@ def test_redis_broker_raises_attribute_error_when_given_an_invalid_attribute(red
340340
redis_broker.idontexist
341341

342342

343-
def test_redis_broker_maintains_backwards_compat_with_old_acks(redis_broker):
344-
# Given that I have an actor
345-
@dramatiq.actor
346-
def do_work(self):
347-
pass
348-
349-
# And that actor has some old-style unacked messages
350-
expired_message_ids = set()
351-
valid_message_ids = set()
352-
for i in range(LUA_MAX_UNPACK_SIZE * 2):
353-
expired_message_id = b"expired-old-school-ack-%r" % i
354-
valid_message_id = b"valid-old-school-ack-%r" % i
355-
expired_message_ids.add(expired_message_id)
356-
valid_message_ids.add(valid_message_id)
357-
if redis.__version__.startswith("2."):
358-
redis_broker.client.zadd("dramatiq:default.acks", 0, expired_message_id)
359-
redis_broker.client.zadd("dramatiq:default.acks", current_millis(), valid_message_id)
360-
else:
361-
redis_broker.client.zadd("dramatiq:default.acks", {expired_message_id: 0})
362-
redis_broker.client.zadd("dramatiq:default.acks", {valid_message_id: current_millis()})
363-
364-
# When maintenance runs for that actor's queue
365-
redis_broker.maintenance_chance = MAINTENANCE_SCALE
366-
redis_broker.do_qsize(do_work.queue_name)
367-
368-
# Then maintenance should move the expired message to the new style acks set
369-
unacked = redis_broker.client.smembers("dramatiq:__acks__.%s.default" % redis_broker.broker_id)
370-
assert set(unacked) == expired_message_ids
371-
372-
# And the valid message should stay in that set
373-
compat_unacked = redis_broker.client.zrangebyscore("dramatiq:default.acks", 0, "+inf")
374-
assert set(compat_unacked) == valid_message_ids
375-
376-
377343
def test_redis_consumer_ack_can_retry_on_connection_error(redis_broker, redis_worker):
378344
# Given that I have an actor
379345
@dramatiq.actor

0 commit comments

Comments
 (0)