Skip to content

Commit 0e6ce41

Browse files
authored
Make sure AtMostOnceBarrier only releases once (or never) (#4)
I realized the implementation of `incr(self, key, amount, maximum, ttl)` assumes a default value of `0` if the key does not exist. This means, when the backend (e.g. Redis) loses its contents, we would assume every `AtMostOnceBarrier` never ran. To fix this, the key is now created with a value of `-1` (instead of `0`) and we only release if by incrementing by `1` we stay under the maximum of `0` - which can only be true if the previous value was `-1` and not with the default of `0`. I also added some tests to confirm this bug existed and to confirm this PR fixes it.
1 parent 2f0a94a commit 0e6ce41

File tree

2 files changed

+41
-2
lines changed

2 files changed

+41
-2
lines changed

dramatiq_workflow/_barrier.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def __init__(self, backend, key, *args, ttl=900000):
2020
self.ran_key = f"{key}_ran"
2121

2222
def create(self, parties):
23-
self.backend.add(self.ran_key, 0, self.ttl)
23+
self.backend.add(self.ran_key, -1, self.ttl)
2424
return super().create(parties)
2525

2626
def wait(self, *args, block=True, timeout=None):
@@ -32,7 +32,7 @@ def wait(self, *args, block=True, timeout=None):
3232

3333
released = super().wait(*args, block=False)
3434
if released:
35-
never_released = self.backend.incr(self.ran_key, 1, 1, self.ttl)
35+
never_released = self.backend.incr(self.ran_key, 1, 0, self.ttl)
3636
return never_released
3737

3838
return False
+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import unittest
2+
3+
from dramatiq.rate_limits.backends import StubBackend
4+
5+
from .._barrier import AtMostOnceBarrier
6+
7+
8+
class AtMostOnceBarrierTests(unittest.TestCase):
9+
def setUp(self):
10+
self.backend = StubBackend()
11+
self.key = "test_barrier"
12+
self.parties = 3
13+
self.ttl = 900000
14+
self.barrier = AtMostOnceBarrier(self.backend, self.key, ttl=self.ttl)
15+
16+
def test_wait_block_true_raises(self):
17+
with self.assertRaises(ValueError) as context:
18+
self.barrier.wait(block=True)
19+
self.assertEqual(str(context.exception), "Blocking is not supported by AtMostOnceBarrier")
20+
21+
def test_wait_releases_once(self):
22+
self.barrier.create(self.parties)
23+
for _ in range(self.parties - 1):
24+
result = self.barrier.wait(block=False)
25+
self.assertFalse(result)
26+
result = self.barrier.wait(block=False)
27+
self.assertTrue(result)
28+
result = self.barrier.wait(block=False)
29+
self.assertFalse(result)
30+
31+
def test_wait_does_not_release_when_db_emptied(self):
32+
"""
33+
If the store is emptied, the barrier should not be released.
34+
"""
35+
self.barrier.create(self.parties)
36+
self.backend.db = {}
37+
for _ in range(self.parties):
38+
result = self.barrier.wait(block=False)
39+
self.assertFalse(result)

0 commit comments

Comments
 (0)