@@ -391,6 +391,77 @@ def test_basic_items_list_with_concurrency(self):
391391 # Assert the workflow succeeded.
392392 self .assertEqual (conductor .get_workflow_status (), statuses .SUCCEEDED )
393393
394+ def test_basic_items_list_with_zero_concurrency (self ):
395+ wf_def = """
396+ version: 1.0
397+
398+ vars:
399+ - concurrency: 0
400+ - xs:
401+ - fee
402+ - fi
403+ - fo
404+ - fum
405+
406+ tasks:
407+ task1:
408+ with:
409+ items: <% ctx(xs) %>
410+ concurrency: <% ctx(concurrency) %>
411+ action: core.echo message=<% item() %>
412+ next:
413+ - publish:
414+ - items: <% result() %>
415+
416+ output:
417+ - items: <% ctx(items) %>
418+ """
419+
420+ # Set the concurrency to 1 since concurrency 0 is expected to be
421+ # overridden in the Orquesta concurrency scheduling code.
422+ concurrency = 1
423+
424+ spec = native_specs .WorkflowSpec (wf_def )
425+ self .assertDictEqual (spec .inspect (), {})
426+
427+ conductor = conducting .WorkflowConductor (spec )
428+ conductor .request_workflow_status (statuses .RUNNING )
429+
430+ # Mock the action execution for each item and assert expected task statuses.
431+ task_route = 0
432+ task_name = "task1"
433+ task_ctx = {"xs" : ["fee" , "fi" , "fo" , "fum" ], "concurrency" : 0 }
434+
435+ task_action_specs = [
436+ {"action" : "core.echo" , "input" : {"message" : "fee" }, "item_id" : 0 },
437+ {"action" : "core.echo" , "input" : {"message" : "fi" }, "item_id" : 1 },
438+ {"action" : "core.echo" , "input" : {"message" : "fo" }, "item_id" : 2 },
439+ {"action" : "core.echo" , "input" : {"message" : "fum" }, "item_id" : 3 },
440+ ]
441+
442+ mock_ac_ex_statuses = [statuses .SUCCEEDED ] * 4
443+ expected_task_statuses = [statuses .RUNNING ] * 3 + [statuses .SUCCEEDED ]
444+ expected_workflow_statuses = [statuses .RUNNING ] * 3 + [statuses .SUCCEEDED ]
445+
446+ self .assert_task_items (
447+ conductor ,
448+ task_name ,
449+ task_route ,
450+ task_ctx ,
451+ task_ctx ["xs" ],
452+ task_action_specs ,
453+ mock_ac_ex_statuses ,
454+ expected_task_statuses ,
455+ expected_workflow_statuses ,
456+ concurrency = concurrency ,
457+ )
458+
459+ # Assert the task is removed from staging.
460+ self .assertIsNone (conductor .workflow_state .get_staged_task (task_name , task_route ))
461+
462+ # Assert the workflow succeeded.
463+ self .assertEqual (conductor .get_workflow_status (), statuses .SUCCEEDED )
464+
394465 def test_multiple_items_list (self ):
395466 wf_def = """
396467 version: 1.0
0 commit comments