Skip to content

Commit fcc04f5

Browse files
Bugfix: Change logic of completed_count property for group (#814)
* debug completed_count property for group * Add test for group.completion_count - Add test for for when the jobs in a group finish out of order. --------- Co-authored-by: LincolnPuzey <[email protected]>
1 parent bbc3db9 commit fcc04f5

File tree

3 files changed

+37
-2
lines changed

3 files changed

+37
-2
lines changed

CONTRIBUTORS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,4 @@ of those changes to CLEARTYPE SRL.
7676
| [@ksoviero-zengrc](https://github.com/ksoviero-zengrc) | Kevin Soviero |
7777
| [@mikeroll](https://github.com/mikeroll) | Mikhail Bulash |
7878
| [@janek-cosmose](https://github.com/janek-cosmose) | Jan Szejko |
79+
| [@ABolouk](https://github.com/ABolouk) | Amirhossein Bolouk Asli |

dramatiq/composition.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,14 +242,16 @@ def completed_count(self):
242242
Returns:
243243
int: The total number of results.
244244
"""
245-
for count, child in enumerate(self.children, start=1):
245+
count = 0
246+
for child in self.children:
246247
try:
247248
if isinstance(child, group):
248249
child.get_results()
249250
else:
250251
child.get_result()
252+
count += 1
251253
except ResultMissing:
252-
return count - 1
254+
pass
253255

254256
return count
255257

tests/test_composition.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,38 @@ def wait(n):
268268
assert g.completed
269269

270270

271+
def test_groups_completion_count_works_for_tasks_finishing_out_of_order(stub_broker, stub_worker, result_backend):
272+
# Regression test for https://github.com/Bogdanp/dramatiq/issues/452
273+
274+
# Given that I have a result backend
275+
stub_broker.add_middleware(Results(backend=result_backend))
276+
277+
# And an actor that waits some amount of time, before notifying the Condition
278+
condition = Condition()
279+
280+
@dramatiq.actor(store_results=True)
281+
def wait(n):
282+
time.sleep(n)
283+
with condition:
284+
condition.notify_all()
285+
return n
286+
287+
# When I group messages of varying durations together and run the group.
288+
# NOTE the durations ensure the jobs finish in a different order to what they are in the group.
289+
g = group(wait.message(t) for t in [3.5, 1, 2.2])
290+
g.run()
291+
292+
# Then every time a job in the group completes, the completed_count should increase by 1
293+
for count in [1, 2, 3]:
294+
with condition:
295+
condition.wait(5)
296+
time.sleep(0.1) # give the worker time to set the result
297+
assert g.completed_count == count
298+
299+
# Finally, completed should be true
300+
assert g.completed
301+
302+
271303
def test_pipeline_does_not_continue_to_next_actor_when_message_is_marked_as_failed(stub_broker, stub_worker):
272304
# Given that I have an actor that fails messages
273305
class FailMessageMiddleware(middleware.Middleware):

0 commit comments

Comments
 (0)