Skip to content

Commit d684aa7

Browse files
committed
Remove the dramatiq_group_callback_barrier_ttl environment variable
1 parent 62b2e9b commit d684aa7

File tree

4 files changed

+18
-14
lines changed

4 files changed

+18
-14
lines changed

docs/source/changelog.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,17 @@ Breaking Changes
1919
* The ``backend`` argument to the |Results| middleware is now required.
2020
Previously, not supplying this argument would result in a non-functional |Results| middleware.
2121
(`#728`_, `@LincolnPuzey`_)
22+
* The `dramatiq_group_callback_barrier_ttl` environment variable has been removed.
23+
Instead, use the `barrier_ttl` parameter of the `GroupCallbacks` middleware.
24+
(`#775`_, `@mikeroll`_)
2225

2326
.. _#95: https://github.com/Bogdanp/dramatiq/issues/95
2427
.. _#345: https://github.com/Bogdanp/dramatiq/issues/345
2528
.. _#688: https://github.com/Bogdanp/dramatiq/pull/688
2629
.. _@azmeuk: https://github.com/azmeuk
2730
.. _#728: https://github.com/Bogdanp/dramatiq/pull/728
31+
.. _#775: https://github.com/Bogdanp/dramatiq/pull/775
32+
.. _@mikeroll: https://github.com/mikeroll
2833

2934
Fixed
3035
^^^^^

docs/source/reference.rst

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ Message Composition
3535
:members:
3636
.. autoclass:: pipeline
3737
:members:
38+
.. autoclass:: dramatiq.middleware.GroupCallbacks
3839

3940
Message Encoders
4041
^^^^^^^^^^^^^^^^
@@ -105,6 +106,7 @@ The following middleware classes are available, but not enabled by default.
105106
.. autoclass:: dramatiq.middleware.CurrentMessage
106107
:members: get_current_message
107108
:member-order: bysource
109+
.. autoclass:: dramatiq.middleware.GroupCallbacks
108110
.. autoclass:: dramatiq.middleware.prometheus.Prometheus
109111

110112
.. py:class:: dramatiq.results.Results
@@ -259,9 +261,6 @@ These are the environment variables that dramatiq reads
259261
* - ``dramatiq_dead_message_ttl``
260262
- 604800000 (One week)
261263
- The maximum amount of time a message can be in the dead letter queue for the RabbitMQ Broker (in milliseconds).
262-
* - ``dramatiq_group_callback_barrier_ttl``
263-
- 86400000 (One day)
264-
-
265264
* - ``dramatiq_prom_db``
266265
- tempfile.gettempdir()/dramatiq-prometheus
267266
- The path to store the prometheus database files. See :ref:`gotchas-with-prometheus`.

dramatiq/composition.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -264,15 +264,13 @@ def run(self, *, delay=None):
264264
group: This same group.
265265
"""
266266
if self.completion_callbacks:
267-
from .middleware.group_callbacks import (
268-
GROUP_CALLBACK_BARRIER_TTL,
269-
GroupCallbacks,
270-
)
267+
from .middleware.group_callbacks import GroupCallbacks
271268

272269
rate_limiter_backend = None
273270
for middleware in self.broker.middleware:
274271
if isinstance(middleware, GroupCallbacks):
275272
rate_limiter_backend = middleware.rate_limiter_backend
273+
barrier_ttl = middleware.barrier_ttl
276274
break
277275
else:
278276
raise RuntimeError(
@@ -285,7 +283,7 @@ def run(self, *, delay=None):
285283
# group is re-run, the barriers are all separate.
286284
# Re-using a barrier's name is an unsafe operation.
287285
completion_uuid = str(uuid4())
288-
completion_barrier = Barrier(rate_limiter_backend, completion_uuid, ttl=GROUP_CALLBACK_BARRIER_TTL)
286+
completion_barrier = Barrier(rate_limiter_backend, completion_uuid, ttl=barrier_ttl)
289287
completion_barrier.create(len(self.children))
290288

291289
children = []

dramatiq/middleware/group_callbacks.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,19 @@
1717

1818
from __future__ import annotations
1919

20-
import os
21-
2220
from ..rate_limits import Barrier, RateLimiterBackend
2321
from .middleware import Middleware
2422

25-
GROUP_CALLBACK_BARRIER_TTL = int(os.getenv("dramatiq_group_callback_barrier_ttl", "86400000"))
26-
2723

2824
class GroupCallbacks(Middleware):
29-
def __init__(self, rate_limiter_backend: RateLimiterBackend) -> None:
25+
def __init__(
26+
self,
27+
rate_limiter_backend: RateLimiterBackend,
28+
*,
29+
barrier_ttl: int = 86400 * 1000,
30+
) -> None:
3031
self.rate_limiter_backend = rate_limiter_backend
32+
self.barrier_ttl = barrier_ttl
3133

3234
def after_process_message(self, broker, message, *, result=None, exception=None):
3335
from ..message import Message
@@ -39,7 +41,7 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
3941
barrier = Barrier(
4042
self.rate_limiter_backend,
4143
group_completion_uuid,
42-
ttl=GROUP_CALLBACK_BARRIER_TTL,
44+
ttl=self.barrier_ttl,
4345
)
4446
if barrier.wait(block=False):
4547
for message in group_completion_callbacks:

0 commit comments

Comments
 (0)