Skip to content

Commit a847b39

Browse files
authored
Merge pull request #203 from djarecka/mnt/wf_copyfile
[mnt] copy files to the wf directory (closes # 145)
2 parents 3cd9880 + 071bb69 commit a847b39

File tree

5 files changed

+138
-1
lines changed

5 files changed

+138
-1
lines changed

pydra/engine/helpers.py

+30-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import subprocess as sp
1111

1212
from .specs import Runtime, File, attr_fields
13-
from .helpers_file import is_existing_file, hash_file
13+
from .helpers_file import is_existing_file, hash_file, copyfile, is_existing_file
1414

1515

1616
def ensure_list(obj, tuple2list=False):
@@ -112,13 +112,42 @@ def save(task_path: Path, result=None, task=None):
112112
raise ValueError("Nothing to be saved")
113113
task_path.mkdir(parents=True, exist_ok=True)
114114
if result:
115+
if Path(task_path).name.startswith("Workflow"):
116+
# copy files to the workflow directory
117+
result = copyfile_workflow(wf_path=task_path, result=result)
115118
with (task_path / "_result.pklz").open("wb") as fp:
116119
cp.dump(result, fp)
117120
if task:
118121
with (task_path / "_task.pklz").open("wb") as fp:
119122
cp.dump(task, fp)
120123

121124

125+
def copyfile_workflow(wf_path, result):
126+
""" if file in the wf results, the file will be copied to the workflow directory"""
127+
for field in attr_fields(result.output):
128+
value = getattr(result.output, field.name)
129+
new_value = _copyfile_single_value(wf_path=wf_path, value=value)
130+
if new_value != value:
131+
setattr(result.output, field.name, new_value)
132+
return result
133+
134+
135+
def _copyfile_single_value(wf_path, value):
136+
""" checking a single value for files that need to be copied to the wf dir"""
137+
if isinstance(value, (tuple, list)):
138+
return [_copyfile_single_value(wf_path, val) for val in value]
139+
elif isinstance(value, dict):
140+
return {
141+
key: _copyfile_single_value(wf_path, val) for (key, val) in value.items()
142+
}
143+
elif is_existing_file(value):
144+
new_path = wf_path / Path(value).name
145+
copyfile(originalfile=value, newfile=new_path, copy=True, use_hardlink=True)
146+
return new_path
147+
else:
148+
return value
149+
150+
122151
def task_hash(task):
123152
"""
124153
Calculate the checksum of a task.

pydra/engine/helpers_file.py

+2
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,8 @@ def is_local_file(f):
516516

517517
def is_existing_file(f):
518518
""" checking if an object is an existing file"""
519+
if not f:
520+
return False
519521
try:
520522
return Path(f).exists()
521523
except TypeError:

pydra/engine/tests/test_shelltask.py

+2
Original file line numberDiff line numberDiff line change
@@ -2018,6 +2018,8 @@ def test_shell_cmd_outputspec_wf_1(plugin):
20182018
res = wf.result()
20192019
assert res.output.stdout == ""
20202020
assert res.output.newfile.exists()
2021+
# checking if the file was copied to the wf dir
2022+
assert res.output.newfile.parent == wf.output_dir
20212023

20222024

20232025
def no_fsl():

pydra/engine/tests/test_workflow.py

+72
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
add2_sub2_res,
1515
fun_addvar_none,
1616
fun_addvar_default,
17+
fun_write_file,
18+
fun_write_file_list,
19+
fun_write_file_list2dict,
1720
)
1821
from ..submitter import Submitter
1922
from ..core import Workflow
@@ -2825,3 +2828,72 @@ def test_wf_lzoutall_st_2a(plugin):
28252828
{"out_add": [8, 62], "out_sub": [4, 58]},
28262829
{"out_add": [62, 602], "out_sub": [58, 598]},
28272830
]
2831+
2832+
2833+
# worfklows that have files in the result, the files should be copied to the wf dir
2834+
2835+
2836+
@pytest.mark.parametrize("plugin", Plugins)
2837+
def test_wf_resultfile_1(plugin):
2838+
""" workflow with a file in the result, file should be copied to the wf dir"""
2839+
wf = Workflow(name="wf_file_1", input_spec=["x"])
2840+
wf.add(fun_write_file(name="writefile", filename=wf.lzin.x))
2841+
wf.inputs.x = "file_1.txt"
2842+
wf.plugin = plugin
2843+
wf.set_output([("wf_out", wf.writefile.lzout.out)])
2844+
2845+
with Submitter(plugin=plugin) as sub:
2846+
sub(wf)
2847+
2848+
results = wf.result()
2849+
# checking if the file exists and if it is in the Workflow directory
2850+
assert results.output.wf_out.exists()
2851+
assert results.output.wf_out == wf.output_dir / "file_1.txt"
2852+
2853+
2854+
@pytest.mark.parametrize("plugin", Plugins)
2855+
def test_wf_resultfile_2(plugin):
2856+
""" workflow with a list of files in the wf result,
2857+
all files should be copied to the wf dir
2858+
"""
2859+
wf = Workflow(name="wf_file_1", input_spec=["x"])
2860+
wf.add(fun_write_file_list(name="writefile", filename_list=wf.lzin.x))
2861+
file_list = ["file_1.txt", "file_2.txt", "file_3.txt"]
2862+
wf.inputs.x = file_list
2863+
wf.plugin = plugin
2864+
wf.set_output([("wf_out", wf.writefile.lzout.out)])
2865+
2866+
with Submitter(plugin=plugin) as sub:
2867+
sub(wf)
2868+
2869+
results = wf.result()
2870+
# checking if the file exists and if it is in the Workflow directory
2871+
for ii, file in enumerate(results.output.wf_out):
2872+
assert file.exists()
2873+
assert file == wf.output_dir / file_list[ii]
2874+
2875+
2876+
@pytest.mark.parametrize("plugin", Plugins)
2877+
def test_wf_resultfile_3(plugin):
2878+
""" workflow with a dictionaries of files in the wf result,
2879+
all files should be copied to the wf dir
2880+
"""
2881+
wf = Workflow(name="wf_file_1", input_spec=["x"])
2882+
wf.add(fun_write_file_list2dict(name="writefile", filename_list=wf.lzin.x))
2883+
file_list = ["file_1.txt", "file_2.txt", "file_3.txt"]
2884+
wf.inputs.x = file_list
2885+
wf.plugin = plugin
2886+
wf.set_output([("wf_out", wf.writefile.lzout.out)])
2887+
2888+
with Submitter(plugin=plugin) as sub:
2889+
sub(wf)
2890+
2891+
results = wf.result()
2892+
# checking if the file exists and if it is in the Workflow directory
2893+
for key, val in results.output.wf_out.items():
2894+
if key == "random_int":
2895+
assert val == 20
2896+
else:
2897+
assert val.exists()
2898+
ii = int(key.split("_")[1])
2899+
assert val == wf.output_dir / file_list[ii]

pydra/engine/tests/utils.py

+32
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# Tasks for testing
22
import time
3+
import typing as tp
4+
from pathlib import Path
35

46
from ..core import Workflow
57
from ... import mark
@@ -102,6 +104,36 @@ def fun_dict(d):
102104
return "_".join(kv_list)
103105

104106

107+
@mark.task
108+
def fun_write_file(filename: tp.Union[str, File, Path], text="hello"):
109+
with open(filename, "w") as f:
110+
f.write(text)
111+
return Path(filename).absolute()
112+
113+
114+
@mark.task
115+
def fun_write_file_list(filename_list: tp.List[tp.Union[str, File, Path]], text="hi"):
116+
for ii, filename in enumerate(filename_list):
117+
with open(filename, "w") as f:
118+
f.write(f"from file {ii}: {text}")
119+
filename_list = [Path(filename).absolute() for filename in filename_list]
120+
return filename_list
121+
122+
123+
@mark.task
124+
def fun_write_file_list2dict(
125+
filename_list: tp.List[tp.Union[str, File, Path]], text="hi"
126+
):
127+
filename_dict = {}
128+
for ii, filename in enumerate(filename_list):
129+
with open(filename, "w") as f:
130+
f.write(f"from file {ii}: {text}")
131+
filename_dict[f"file_{ii}"] = Path(filename).absolute()
132+
# adding an additional field with int
133+
filename_dict["random_int"] = 20
134+
return filename_dict
135+
136+
105137
@mark.task
106138
def fun_file(filename: File):
107139
with open(filename) as f:

0 commit comments

Comments
 (0)