Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,17 @@ These are changes that while technically breaking, we believe are unlikely to ef
* The deprecated ``requeue_deadline`` and ``requeue_interval`` arguments of |RedisBroker| have been removed.
These have been deprecated and have had no effect since version 1.2.0.
(`#782`_, `@mikeroll`_)
* |RedisBroker|: The code for compatibility with pre-v1.2.0 acks has been removed.
If you are using the |RedisBroker|, you must first upgrade Dramatiq to a version >=1.2.0, <2.0.0,
and run it for some time, before upgrading to a version >=2.0.0.
This is to allow Dramatiq to migrate the pre-v1.2.0 ack data structures in redis to the v1.2.0+
versions. This migration code is what has been removed in version 2.0.0.
(`#771`_, `@mikeroll`_)

.. _#766: https://github.com/Bogdanp/dramatiq/pull/766
.. _PEP-735: https://peps.python.org/pep-0735/
.. _#741: https://github.com/Bogdanp/dramatiq/issues/741
.. _#771: https://github.com/Bogdanp/dramatiq/pull/771
.. _#772: https://github.com/Bogdanp/dramatiq/pull/772
.. _#782: https://github.com/Bogdanp/dramatiq/pull/782
.. _@mikeroll: https://github.com/mikeroll
Expand Down
12 changes: 0 additions & 12 deletions dramatiq/brokers/redis/dispatch.lua
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,6 @@ if do_maintenance == "1" then
redis.call("hdel", xqueue_messages, unpack(dead_message_ids_batch))
end
end

-- The following code is required for backwards-compatibility with
-- the old way acks used to be implemented. It hoists any
-- existing acks zsets into the per-worker sets.
local compat_queue_acks = queue_full_name .. ".acks"
local compat_message_ids = redis.call("zrangebyscore", compat_queue_acks, 0, timestamp - 86400000 * 7.5)
if next(compat_message_ids) then
for compat_message_ids_batch in iter_chunks(compat_message_ids) do
redis.call("sadd", queue_acks, unpack(compat_message_ids_batch))
redis.call("zrem", compat_queue_acks, unpack(compat_message_ids_batch))
end
end
end


Expand Down
34 changes: 0 additions & 34 deletions tests/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,40 +340,6 @@ def test_redis_broker_raises_attribute_error_when_given_an_invalid_attribute(red
redis_broker.idontexist


def test_redis_broker_maintains_backwards_compat_with_old_acks(redis_broker):
# Given that I have an actor
@dramatiq.actor
def do_work(self):
pass

# And that actor has some old-style unacked messages
expired_message_ids = set()
valid_message_ids = set()
for i in range(LUA_MAX_UNPACK_SIZE * 2):
expired_message_id = b"expired-old-school-ack-%r" % i
valid_message_id = b"valid-old-school-ack-%r" % i
expired_message_ids.add(expired_message_id)
valid_message_ids.add(valid_message_id)
if redis.__version__.startswith("2."):
redis_broker.client.zadd("dramatiq:default.acks", 0, expired_message_id)
redis_broker.client.zadd("dramatiq:default.acks", current_millis(), valid_message_id)
else:
redis_broker.client.zadd("dramatiq:default.acks", {expired_message_id: 0})
redis_broker.client.zadd("dramatiq:default.acks", {valid_message_id: current_millis()})

# When maintenance runs for that actor's queue
redis_broker.maintenance_chance = MAINTENANCE_SCALE
redis_broker.do_qsize(do_work.queue_name)

# Then maintenance should move the expired message to the new style acks set
unacked = redis_broker.client.smembers("dramatiq:__acks__.%s.default" % redis_broker.broker_id)
assert set(unacked) == expired_message_ids

# And the valid message should stay in that set
compat_unacked = redis_broker.client.zrangebyscore("dramatiq:default.acks", 0, "+inf")
assert set(compat_unacked) == valid_message_ids


def test_redis_consumer_ack_can_retry_on_connection_error(redis_broker, redis_worker):
# Given that I have an actor
@dramatiq.actor
Expand Down