Skip to content

Commit f9e72bf

Browse files
authored
Merge pull request #159 from djarecka/empty_input_list
fixing issues with state input fields that are empty lists
2 parents d7e6b62 + b95597d commit f9e72bf

File tree

5 files changed

+182
-25
lines changed

5 files changed

+182
-25
lines changed

pydra/engine/core.py

+8
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,14 @@ def done(self):
427427
# TODO: only check for needed state result
428428
if self.result() and all(self.result()):
429429
return True
430+
# checking if self.result() is not an empty list only because
431+
# the states_ind is an empty list (input field might be an empty list)
432+
elif (
433+
self.result() == []
434+
and hasattr(self.state, "states_ind")
435+
and self.state.states_ind == []
436+
):
437+
return True
430438
else:
431439
if self.result():
432440
return True

pydra/engine/specs.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ def get_value(self, wf, state_index=None):
487487
node = getattr(wf, self.name)
488488
result = node.result(state_index=state_index)
489489
if isinstance(result, list):
490-
if isinstance(result[0], list):
490+
if len(result) and isinstance(result[0], list):
491491
results_new = []
492492
for res_l in result:
493493
if self.field == "all_":
@@ -534,6 +534,6 @@ def reset(self):
534534
def path_to_string(value):
535535
if isinstance(value, Path):
536536
value = str(value)
537-
elif isinstance(value, list) and isinstance(value[0], Path):
537+
elif isinstance(value, list) and len(value) and isinstance(value[0], Path):
538538
value = [str(val) for val in value]
539539
return value

pydra/engine/task.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def _run_task(self):
159159
del inputs["_func"]
160160
self.output_ = None
161161
output = cp.loads(self.inputs._func)(**inputs)
162-
if output:
162+
if output is not None:
163163
output_names = [el[0] for el in self.output_spec.fields]
164164
self.output_ = {}
165165
if len(output_names) > 1:

pydra/engine/tests/test_node_task.py

+56
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,20 @@ def test_task_init_5c():
269269
assert nn.state.splitter_rpn_final == ["NA.a"]
270270

271271

272+
def test_task_init_6():
273+
""" task with splitter, but the input is an empty list"""
274+
nn = fun_addtwo(name="NA", a=[])
275+
nn.split(splitter="a")
276+
assert nn.inputs.a == []
277+
278+
assert nn.state.splitter == "NA.a"
279+
assert nn.state.splitter_rpn == ["NA.a"]
280+
281+
nn.state.prepare_states(nn.inputs)
282+
assert nn.state.states_ind == []
283+
assert nn.state.states_val == []
284+
285+
272286
def test_task_error():
273287
func = fun_div(name="div", a=1, b=0)
274288
with pytest.raises(ZeroDivisionError):
@@ -607,6 +621,27 @@ def test_task_state_singl_1(plugin):
607621
assert odir.exists()
608622

609623

624+
@pytest.mark.parametrize("plugin", Plugins)
625+
def test_task_state_2(plugin):
626+
""" task with the simplest splitter, the input is an empty list"""
627+
nn = fun_addtwo(name="NA").split(splitter="a", a=[])
628+
629+
assert nn.state.splitter == "NA.a"
630+
assert nn.state.splitter_rpn == ["NA.a"]
631+
assert nn.inputs.a == []
632+
633+
with Submitter(plugin=plugin) as sub:
634+
sub(nn)
635+
636+
# checking the results
637+
results = nn.result()
638+
expected = []
639+
for i, res in enumerate(expected):
640+
assert results[i].output.out == res[1]
641+
# checking the output_dir
642+
assert nn.output_dir == []
643+
644+
610645
@pytest.mark.parametrize("plugin", Plugins)
611646
def test_task_state_comb_1(plugin):
612647
""" task with the simplest splitter and combiner"""
@@ -775,6 +810,27 @@ def test_task_state_comb_singl_1(plugin):
775810
assert odir.exists()
776811

777812

813+
@pytest.mark.parametrize("plugin", Plugins)
814+
def test_task_state_comb_2(plugin):
815+
""" task with the simplest splitter, the input is an empty list"""
816+
nn = fun_addtwo(name="NA").split(splitter="a", a=[]).combine(combiner=["a"])
817+
818+
assert nn.state.splitter == "NA.a"
819+
assert nn.state.splitter_rpn == ["NA.a"]
820+
assert nn.inputs.a == []
821+
822+
with Submitter(plugin=plugin) as sub:
823+
sub(nn)
824+
825+
# checking the results
826+
results = nn.result()
827+
expected = []
828+
for i, res in enumerate(expected):
829+
assert results[i].output.out == res[1]
830+
# checking the output_dir
831+
assert nn.output_dir == []
832+
833+
778834
@pytest.mark.parametrize("plugin", Plugins)
779835
def test_task_state_comb_order(plugin):
780836
""" tasks with an outer splitter and various combiner;

pydra/engine/tests/test_workflow.py

+115-22
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,28 @@ def test_wf_st_1_call_plug(plugin):
302302
assert odir.exists()
303303

304304

305+
@pytest.mark.parametrize("plugin", Plugins)
306+
def test_wf_st_noinput_1(plugin):
307+
""" Workflow with one task, a splitter for the workflow"""
308+
wf = Workflow(name="wf_spl_1", input_spec=["x"])
309+
wf.add(add2(name="add2", x=wf.lzin.x))
310+
311+
wf.split(("x"))
312+
wf.inputs.x = []
313+
wf.set_output([("out", wf.add2.lzout.out)])
314+
wf.plugin = plugin
315+
316+
checksum_before = wf.checksum
317+
with Submitter(plugin=plugin) as sub:
318+
sub(wf)
319+
320+
assert wf.checksum == checksum_before
321+
results = wf.result()
322+
assert results == []
323+
# checking all directories
324+
assert wf.output_dir == []
325+
326+
305327
@pytest.mark.parametrize("plugin", Plugins)
306328
def test_wf_ndst_1(plugin):
307329
""" workflow with one task, a splitter on the task level"""
@@ -394,6 +416,26 @@ def test_wf_ndst_updateinp_1(plugin):
394416
assert wf.output_dir.exists()
395417

396418

419+
@pytest.mark.parametrize("plugin", Plugins)
420+
def test_wf_ndst_noinput_1(plugin):
421+
""" workflow with one task, a splitter on the task level"""
422+
wf = Workflow(name="wf_spl_1", input_spec=["x"])
423+
wf.add(add2(name="add2", x=wf.lzin.x).split("x"))
424+
wf.inputs.x = []
425+
wf.set_output([("out", wf.add2.lzout.out)])
426+
wf.plugin = plugin
427+
428+
checksum_before = wf.checksum
429+
with Submitter(plugin=plugin) as sub:
430+
sub(wf)
431+
432+
assert wf.checksum == checksum_before
433+
results = wf.result()
434+
435+
assert results.output.out == []
436+
assert wf.output_dir.exists()
437+
438+
397439
@pytest.mark.parametrize("plugin", Plugins)
398440
def test_wf_st_2(plugin):
399441
""" workflow with one task, splitters and combiner for workflow"""
@@ -538,12 +580,60 @@ def test_wf_ndst_4(plugin):
538580

539581
@pytest.mark.parametrize("plugin", Plugins)
540582
def test_wf_st_5(plugin):
541-
""" workflow with two tasks, outer splitter and combiner for the workflow"""
583+
""" workflow with two tasks, outer splitter and no combiner"""
542584
wf = Workflow(name="wf_st_5", input_spec=["x", "y"])
543585
wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y))
544586
wf.add(add2(name="add2", x=wf.mult.lzout.out))
545587

546588
wf.split(["x", "y"], x=[1, 2], y=[11, 12])
589+
wf.set_output([("out", wf.add2.lzout.out)])
590+
wf.plugin = plugin
591+
592+
with Submitter(plugin=plugin) as sub:
593+
sub(wf)
594+
595+
results = wf.result()
596+
assert results[0].output.out == 13
597+
assert results[1].output.out == 14
598+
assert results[2].output.out == 24
599+
assert results[3].output.out == 26
600+
# checking all directories
601+
assert wf.output_dir
602+
for odir in wf.output_dir:
603+
assert odir.exists()
604+
605+
606+
@pytest.mark.parametrize("plugin", Plugins)
607+
def test_wf_ndst_5(plugin):
608+
""" workflow with two tasks, outer splitter on tasks level and no combiner"""
609+
wf = Workflow(name="wf_ndst_5", input_spec=["x", "y"])
610+
wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y).split(["x", "y"]))
611+
wf.add(add2(name="add2", x=wf.mult.lzout.out))
612+
wf.inputs.x = [1, 2]
613+
wf.inputs.y = [11, 12]
614+
wf.set_output([("out", wf.add2.lzout.out)])
615+
wf.plugin = plugin
616+
617+
with Submitter(plugin=plugin) as sub:
618+
sub(wf)
619+
620+
results = wf.result()
621+
assert results.output.out[0] == 13
622+
assert results.output.out[1] == 14
623+
assert results.output.out[2] == 24
624+
assert results.output.out[3] == 26
625+
# checking the output directory
626+
assert wf.output_dir.exists()
627+
628+
629+
@pytest.mark.parametrize("plugin", Plugins)
630+
def test_wf_st_6(plugin):
631+
""" workflow with two tasks, outer splitter and combiner for the workflow"""
632+
wf = Workflow(name="wf_st_6", input_spec=["x", "y"])
633+
wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y))
634+
wf.add(add2(name="add2", x=wf.mult.lzout.out))
635+
636+
wf.split(["x", "y"], x=[1, 2, 3], y=[11, 12])
547637
wf.combine("x")
548638
wf.set_output([("out", wf.add2.lzout.out)])
549639
wf.plugin = plugin
@@ -554,21 +644,23 @@ def test_wf_st_5(plugin):
554644
results = wf.result()
555645
assert results[0][0].output.out == 13
556646
assert results[0][1].output.out == 24
647+
assert results[0][2].output.out == 35
557648
assert results[1][0].output.out == 14
558649
assert results[1][1].output.out == 26
650+
assert results[1][2].output.out == 38
559651
# checking all directories
560652
assert wf.output_dir
561653
for odir in wf.output_dir:
562654
assert odir.exists()
563655

564656

565657
@pytest.mark.parametrize("plugin", Plugins)
566-
def test_wf_ndst_5(plugin):
658+
def test_wf_ndst_6(plugin):
567659
""" workflow with two tasks, outer splitter and combiner on tasks level"""
568-
wf = Workflow(name="wf_ndst_5", input_spec=["x", "y"])
660+
wf = Workflow(name="wf_ndst_6", input_spec=["x", "y"])
569661
wf.add(multiply(name="mult", x=wf.lzin.x, y=wf.lzin.y).split(["x", "y"]))
570662
wf.add(add2(name="add2", x=wf.mult.lzout.out).combine("mult.x"))
571-
wf.inputs.x = [1, 2]
663+
wf.inputs.x = [1, 2, 3]
572664
wf.inputs.y = [11, 12]
573665
wf.set_output([("out", wf.add2.lzout.out)])
574666
wf.plugin = plugin
@@ -577,8 +669,9 @@ def test_wf_ndst_5(plugin):
577669
sub(wf)
578670

579671
results = wf.result()
580-
assert results.output.out[0] == [13, 24]
581-
assert results.output.out[1] == [14, 26]
672+
assert results.output.out[0] == [13, 24, 35]
673+
assert results.output.out[1] == [14, 26, 38]
674+
582675
# checking the output directory
583676
assert wf.output_dir.exists()
584677

@@ -587,11 +680,11 @@ def test_wf_ndst_5(plugin):
587680

588681

589682
@pytest.mark.parametrize("plugin", Plugins)
590-
def test_wf_st_6(plugin):
683+
def test_wf_st_7(plugin):
591684
""" workflow with three tasks, third one connected to two previous tasks,
592685
splitter on the workflow level
593686
"""
594-
wf = Workflow(name="wf_st_6", input_spec=["x", "y"])
687+
wf = Workflow(name="wf_st_7", input_spec=["x", "y"])
595688
wf.add(add2(name="add2x", x=wf.lzin.x))
596689
wf.add(add2(name="add2y", x=wf.lzin.y))
597690
wf.add(multiply(name="mult", x=wf.add2x.lzout.out, y=wf.add2y.lzout.out))
@@ -615,11 +708,11 @@ def test_wf_st_6(plugin):
615708

616709

617710
@pytest.mark.parametrize("plugin", Plugins)
618-
def test_wf_ndst_6(plugin):
711+
def test_wf_ndst_7(plugin):
619712
""" workflow with three tasks, third one connected to two previous tasks,
620713
splitter on the tasks levels
621714
"""
622-
wf = Workflow(name="wf_ndst_6", input_spec=["x", "y"])
715+
wf = Workflow(name="wf_ndst_7", input_spec=["x", "y"])
623716
wf.add(add2(name="add2x", x=wf.lzin.x).split("x"))
624717
wf.add(add2(name="add2y", x=wf.lzin.y).split("x"))
625718
wf.add(multiply(name="mult", x=wf.add2x.lzout.out, y=wf.add2y.lzout.out))
@@ -639,11 +732,11 @@ def test_wf_ndst_6(plugin):
639732

640733

641734
@pytest.mark.parametrize("plugin", Plugins)
642-
def test_wf_st_7(plugin):
735+
def test_wf_st_8(plugin):
643736
""" workflow with three tasks, third one connected to two previous tasks,
644737
splitter and partial combiner on the workflow level
645738
"""
646-
wf = Workflow(name="wf_st_7", input_spec=["x", "y"])
739+
wf = Workflow(name="wf_st_8", input_spec=["x", "y"])
647740
wf.add(add2(name="add2x", x=wf.lzin.x))
648741
wf.add(add2(name="add2y", x=wf.lzin.y))
649742
wf.add(multiply(name="mult", x=wf.add2x.lzout.out, y=wf.add2y.lzout.out))
@@ -670,11 +763,11 @@ def test_wf_st_7(plugin):
670763

671764

672765
@pytest.mark.parametrize("plugin", Plugins)
673-
def test_wf_ndst_7(plugin):
766+
def test_wf_ndst_8(plugin):
674767
""" workflow with three tasks, third one connected to two previous tasks,
675768
splitter and partial combiner on the tasks levels
676769
"""
677-
wf = Workflow(name="wf_ndst_7", input_spec=["x", "y"])
770+
wf = Workflow(name="wf_ndst_8", input_spec=["x", "y"])
678771
wf.add(add2(name="add2x", x=wf.lzin.x).split("x"))
679772
wf.add(add2(name="add2y", x=wf.lzin.y).split("x"))
680773
wf.add(
@@ -699,11 +792,11 @@ def test_wf_ndst_7(plugin):
699792

700793

701794
@pytest.mark.parametrize("plugin", Plugins)
702-
def test_wf_st_8(plugin):
795+
def test_wf_st_9(plugin):
703796
""" workflow with three tasks, third one connected to two previous tasks,
704797
splitter and partial combiner (from the second task) on the workflow level
705798
"""
706-
wf = Workflow(name="wf_st_8", input_spec=["x", "y"])
799+
wf = Workflow(name="wf_st_9", input_spec=["x", "y"])
707800
wf.add(add2(name="add2x", x=wf.lzin.x))
708801
wf.add(add2(name="add2y", x=wf.lzin.y))
709802
wf.add(multiply(name="mult", x=wf.add2x.lzout.out, y=wf.add2y.lzout.out))
@@ -730,11 +823,11 @@ def test_wf_st_8(plugin):
730823

731824

732825
@pytest.mark.parametrize("plugin", Plugins)
733-
def test_wf_ndst_8(plugin):
826+
def test_wf_ndst_9(plugin):
734827
""" workflow with three tasks, third one connected to two previous tasks,
735828
splitter and partial combiner (from the second task) on the tasks levels
736829
"""
737-
wf = Workflow(name="wf_ndst_8", input_spec=["x", "y"])
830+
wf = Workflow(name="wf_ndst_9", input_spec=["x", "y"])
738831
wf.add(add2(name="add2x", x=wf.lzin.x).split("x"))
739832
wf.add(add2(name="add2y", x=wf.lzin.y).split("x"))
740833
wf.add(
@@ -760,11 +853,11 @@ def test_wf_ndst_8(plugin):
760853

761854

762855
@pytest.mark.parametrize("plugin", Plugins)
763-
def test_wf_st_9(plugin):
856+
def test_wf_st_10(plugin):
764857
""" workflow with three tasks, third one connected to two previous tasks,
765858
splitter and full combiner on the workflow level
766859
"""
767-
wf = Workflow(name="wf_st_9", input_spec=["x", "y"])
860+
wf = Workflow(name="wf_st_10", input_spec=["x", "y"])
768861
wf.add(add2(name="add2x", x=wf.lzin.x))
769862
wf.add(add2(name="add2y", x=wf.lzin.y))
770863
wf.add(multiply(name="mult", x=wf.add2x.lzout.out, y=wf.add2y.lzout.out))
@@ -790,11 +883,11 @@ def test_wf_st_9(plugin):
790883

791884

792885
@pytest.mark.parametrize("plugin", Plugins)
793-
def test_wf_ndst_9(plugin):
886+
def test_wf_ndst_10(plugin):
794887
""" workflow with three tasks, third one connected to two previous tasks,
795888
splitter and full combiner on the tasks levels
796889
"""
797-
wf = Workflow(name="wf_ndst_9", input_spec=["x", "y"])
890+
wf = Workflow(name="wf_ndst_10", input_spec=["x", "y"])
798891
wf.add(add2(name="add2x", x=wf.lzin.x).split("x"))
799892
wf.add(add2(name="add2y", x=wf.lzin.y).split("x"))
800893
wf.add(

0 commit comments

Comments
 (0)