Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ Changed
.. _#738: https://github.com/Bogdanp/dramatiq/issues/738
.. _#764: https://github.com/Bogdanp/dramatiq/pull/764

Deprecated
^^^^^^^^^^

* The ``dramatiq_group_callback_barrier_ttl`` environment variable has been deprecated.
Instead, use the ``barrier_ttl`` parameter of the |GroupCallbacks| middleware.
(`#775`_, `@mikeroll`_)

.. _#775: https://github.com/Bogdanp/dramatiq/pull/775

Removed
^^^^^^^

Expand Down
2 changes: 1 addition & 1 deletion docs/source/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ These are the environment variables that dramatiq reads
- The maximum amount of time a message can be in the dead letter queue for the RabbitMQ Broker (in milliseconds).
* - ``dramatiq_group_callback_barrier_ttl``
- 86400000 (One day)
-
- Deprecated. Use `barrier_ttl` parameter of `GroupCallbacks` middleware.
* - ``dramatiq_prom_db``
- tempfile.gettempdir()/dramatiq-prometheus
- The path to store the prometheus database files. See :ref:`gotchas-with-prometheus`.
Expand Down
9 changes: 3 additions & 6 deletions dramatiq/composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,12 @@ def run(self, *, delay=None):
group: This same group.
"""
if self.completion_callbacks:
from .middleware.group_callbacks import (
GROUP_CALLBACK_BARRIER_TTL,
GroupCallbacks,
)
from .middleware.group_callbacks import GroupCallbacks

rate_limiter_backend = None
for middleware in self.broker.middleware:
if isinstance(middleware, GroupCallbacks):
rate_limiter_backend = middleware.rate_limiter_backend
barrier_ttl = middleware.barrier_ttl
break
else:
raise RuntimeError(
Expand All @@ -285,7 +282,7 @@ def run(self, *, delay=None):
# group is re-run, the barriers are all separate.
# Re-using a barrier's name is an unsafe operation.
completion_uuid = str(uuid4())
completion_barrier = Barrier(rate_limiter_backend, completion_uuid, ttl=GROUP_CALLBACK_BARRIER_TTL)
completion_barrier = Barrier(rate_limiter_backend, completion_uuid, ttl=barrier_ttl)
completion_barrier.create(len(self.children))

children = []
Expand Down
18 changes: 14 additions & 4 deletions dramatiq/middleware/group_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,29 @@
from __future__ import annotations

import os
import warnings

from ..rate_limits import Barrier, RateLimiterBackend
from .middleware import Middleware

GROUP_CALLBACK_BARRIER_TTL = int(os.getenv("dramatiq_group_callback_barrier_ttl", "86400000"))


class GroupCallbacks(Middleware):
"""Middleware that enables adding completion callbacks to |Groups|."""

def __init__(self, rate_limiter_backend: RateLimiterBackend) -> None:
def __init__(self, rate_limiter_backend: RateLimiterBackend, *, barrier_ttl: int = 86400 * 1000) -> None:
self.rate_limiter_backend = rate_limiter_backend

_barrier_ttl_env = os.getenv("dramatiq_group_callback_barrier_ttl", None)
if _barrier_ttl_env is not None:
warnings.warn(
"Configuring the barrier TTL via the environment variable is deprecated; "
"use the `barrier_ttl` argument instead.",
FutureWarning,
)
self.barrier_ttl = int(_barrier_ttl_env)
else:
self.barrier_ttl = barrier_ttl

def after_process_message(self, broker, message, *, result=None, exception=None):
from ..message import Message

Expand All @@ -41,7 +51,7 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
barrier = Barrier(
self.rate_limiter_backend,
group_completion_uuid,
ttl=GROUP_CALLBACK_BARRIER_TTL,
ttl=self.barrier_ttl,
)
if barrier.wait(block=False):
for message in group_completion_callbacks:
Expand Down
Loading