Skip to content

Commit ae65aa0

Browse files
authored
Merge branch 'main' into callback-serialization-optimization
2 parents 2aaf564 + 0e6ce41 commit ae65aa0

File tree

3 files changed

+85
-2
lines changed

3 files changed

+85
-2
lines changed

README.md

+44
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,50 @@ workflow = Workflow(
153153
In this example, Task 2 will run roughly 1 second after Task 1 finishes, and
154154
Task 3 and will run 2 seconds after Task 2 finishes.
155155

156+
### Large Workflows
157+
158+
Because of how `dramatiq-workflow` is implemented, each task in a workflow has
159+
to know about the remaining tasks in the workflow that could potentially run
160+
after it. When a workflow has a large number of tasks, this can lead to an
161+
increase of memory usage in the broker and increased network traffic between
162+
the broker and the workers, especially when using `Group` tasks: Each task in a
163+
`Group` can potentially be the last one to finish, so each task has to retain a
164+
copy of the remaining tasks that run after the `Group`.
165+
166+
There are a few things you can do to alleviate this issue:
167+
168+
- Minimize the usage of parameters in the `message` method. Instead, consider
169+
using a database to store data that is required by your tasks.
170+
- Limit the size of groups to a reasonable number of tasks. Instead of
171+
scheduling one task with 1000 tasks in a group, consider scheduling 10 groups
172+
with 100 tasks each and chaining them together.
173+
- Consider breaking down large workflows into smaller partial workflows that
174+
then schedule a subsequent workflow at the very end of the outermost `Chain`.
175+
176+
Lastly, you can use compression to reduce the size of the messages in your
177+
queue. While dramatiq does not provide a compression implementation by default,
178+
one can be added with just a few lines of code. For example:
179+
180+
```python
181+
import dramatiq
182+
from dramatiq.encoder import JSONEncoder, MessageData
183+
import lz4.frame
184+
185+
class DramatiqLz4JSONEncoder(JSONEncoder):
186+
def encode(self, data: MessageData) -> bytes:
187+
return lz4.frame.compress(super().encode(data))
188+
189+
def decode(self, data: bytes) -> MessageData:
190+
try:
191+
decompressed = lz4.frame.decompress(data)
192+
except RuntimeError:
193+
# Uncompressed data from before the switch to lz4
194+
decompressed = data
195+
return super().decode(decompressed)
196+
197+
dramatiq.set_encoder(DramatiqLz4JSONEncoder())
198+
```
199+
156200
## License
157201

158202
This project is licensed under the MIT License. See the [LICENSE](LICENSE) file

dramatiq_workflow/_barrier.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def __init__(self, backend, key, *args, ttl=900000):
2020
self.ran_key = f"{key}_ran"
2121

2222
def create(self, parties):
23-
self.backend.add(self.ran_key, 0, self.ttl)
23+
self.backend.add(self.ran_key, -1, self.ttl)
2424
return super().create(parties)
2525

2626
def wait(self, *args, block=True, timeout=None):
@@ -32,7 +32,7 @@ def wait(self, *args, block=True, timeout=None):
3232

3333
released = super().wait(*args, block=False)
3434
if released:
35-
never_released = self.backend.incr(self.ran_key, 1, 1, self.ttl)
35+
never_released = self.backend.incr(self.ran_key, 1, 0, self.ttl)
3636
return never_released
3737

3838
return False
+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import unittest
2+
3+
from dramatiq.rate_limits.backends import StubBackend
4+
5+
from .._barrier import AtMostOnceBarrier
6+
7+
8+
class AtMostOnceBarrierTests(unittest.TestCase):
9+
def setUp(self):
10+
self.backend = StubBackend()
11+
self.key = "test_barrier"
12+
self.parties = 3
13+
self.ttl = 900000
14+
self.barrier = AtMostOnceBarrier(self.backend, self.key, ttl=self.ttl)
15+
16+
def test_wait_block_true_raises(self):
17+
with self.assertRaises(ValueError) as context:
18+
self.barrier.wait(block=True)
19+
self.assertEqual(str(context.exception), "Blocking is not supported by AtMostOnceBarrier")
20+
21+
def test_wait_releases_once(self):
22+
self.barrier.create(self.parties)
23+
for _ in range(self.parties - 1):
24+
result = self.barrier.wait(block=False)
25+
self.assertFalse(result)
26+
result = self.barrier.wait(block=False)
27+
self.assertTrue(result)
28+
result = self.barrier.wait(block=False)
29+
self.assertFalse(result)
30+
31+
def test_wait_does_not_release_when_db_emptied(self):
32+
"""
33+
If the store is emptied, the barrier should not be released.
34+
"""
35+
self.barrier.create(self.parties)
36+
self.backend.db = {}
37+
for _ in range(self.parties):
38+
result = self.barrier.wait(block=False)
39+
self.assertFalse(result)

0 commit comments

Comments
 (0)