Skip to content

Commit abd12ba

Browse files
bshifawbshifaw
and
bshifaw
authored
Add list-outputs (#248)
* added list-outputs command * Added option to get workflow level outputs or task level outputs * Added option to print json summary and text * add integration test_list_outputs.py draft * Added options to utility_test_functions.py run cromshell function * Added function to confirm results from cromwell outputs endpoint contain outputs else throws an error. * Added variable to hold workflow id in cromshellconfig.py * add check of outputs for detailed list-outputs option --------- Co-authored-by: bshifaw <[email protected]>
1 parent 3a323e4 commit abd12ba

17 files changed

+843
-2
lines changed

src/cromshell/__main__.py

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from .cost import command as cost
1212
from .counts import command as counts
1313
from .list import command as list
14+
from .list_outputs import command as list_outputs
1415
from .logs import command as logs
1516
from .metadata import command as metadata
1617
from .slim_metadata import command as slim_metadata
@@ -168,6 +169,7 @@ def version():
168169
main_entry.add_command(update_server.main)
169170
main_entry.add_command(timing.main)
170171
main_entry.add_command(list.main)
172+
main_entry.add_command(list_outputs.main)
171173

172174

173175
if __name__ == "__main__":

src/cromshell/list_outputs/__init__.py

Whitespace-only changes.

src/cromshell/list_outputs/command.py

+220
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
import logging
2+
3+
import click
4+
import requests
5+
6+
import cromshell.utilities.http_utils as http_utils
7+
import cromshell.utilities.io_utils as io_utils
8+
from cromshell.metadata import command as metadata_command
9+
from cromshell.utilities import command_setup_utils
10+
11+
LOGGER = logging.getLogger(__name__)
12+
13+
14+
@click.command(name="list-outputs")
15+
@click.argument("workflow_ids", required=True, nargs=-1)
16+
@click.option(
17+
"-d",
18+
"--detailed",
19+
is_flag=True,
20+
default=False,
21+
help="Get the output for a workflow at the task level",
22+
)
23+
@click.option(
24+
"-j",
25+
"--json-summary",
26+
is_flag=True,
27+
default=False,
28+
help="Print a json summary of outputs, including non-file types.",
29+
)
30+
@click.pass_obj
31+
def main(config, workflow_ids, detailed, json_summary):
32+
"""List all output files produced by a workflow."""
33+
34+
LOGGER.info("list-outputs")
35+
36+
return_code = 0
37+
38+
for workflow_id in workflow_ids:
39+
command_setup_utils.resolve_workflow_id_and_server(
40+
workflow_id=workflow_id, cromshell_config=config
41+
)
42+
43+
if not detailed:
44+
workflow_outputs = get_workflow_level_outputs(config).get("outputs")
45+
46+
if json_summary:
47+
io_utils.pretty_print_json(format_json=workflow_outputs)
48+
else:
49+
print_file_like_value_in_dict(
50+
outputs_metadata=workflow_outputs,
51+
indent=False,
52+
)
53+
else:
54+
task_outputs = get_task_level_outputs(config)
55+
56+
if json_summary:
57+
io_utils.pretty_print_json(format_json=task_outputs)
58+
else:
59+
print_task_level_outputs(task_outputs)
60+
61+
return return_code
62+
63+
64+
def get_workflow_level_outputs(config) -> dict:
65+
"""Get the workflow level outputs from the workflow outputs
66+
67+
Args:
68+
config (dict): The cromshell config object
69+
"""
70+
71+
requests_out = requests.get(
72+
f"{config.cromwell_api_workflow_id}/outputs",
73+
timeout=config.requests_connect_timeout,
74+
verify=config.requests_verify_certs,
75+
headers=http_utils.generate_headers(config),
76+
)
77+
78+
if requests_out.ok:
79+
check_for_empty_output(requests_out.json().get("outputs"), config.workflow_id)
80+
return requests_out.json()
81+
else:
82+
http_utils.check_http_request_status_code(
83+
short_error_message="Failed to retrieve outputs for "
84+
f"workflow: {config.workflow_id}",
85+
response=requests_out,
86+
# Raising exception is set false to allow
87+
# command to retrieve outputs of remaining workflows.
88+
raise_exception=False,
89+
)
90+
91+
92+
def get_task_level_outputs(config) -> dict:
93+
"""Get the task level outputs from the workflow metadata
94+
95+
Args:
96+
config (dict): The cromshell config object
97+
"""
98+
# Get metadata
99+
formatted_metadata_parameter = metadata_command.format_metadata_params(
100+
list_of_keys=config.METADATA_KEYS_TO_OMIT,
101+
exclude_keys=True,
102+
expand_subworkflows=True,
103+
)
104+
105+
workflow_metadata = metadata_command.get_workflow_metadata(
106+
meta_params=formatted_metadata_parameter,
107+
api_workflow_id=config.cromwell_api_workflow_id,
108+
timeout=config.requests_connect_timeout,
109+
verify_certs=config.requests_verify_certs,
110+
headers=http_utils.generate_headers(config),
111+
)
112+
113+
return filter_outputs_from_workflow_metadata(workflow_metadata)
114+
115+
116+
def filter_outputs_from_workflow_metadata(workflow_metadata: dict) -> dict:
117+
"""Get the outputs from the workflow metadata
118+
119+
Args:
120+
workflow_metadata (dict): The workflow metadata
121+
"""
122+
calls_metadata = workflow_metadata["calls"]
123+
output_metadata = {}
124+
extract_task_key = "outputs"
125+
126+
for call, index_list in calls_metadata.items():
127+
if "subWorkflowMetadata" in calls_metadata[call][0]:
128+
output_metadata[call] = []
129+
for scatter in calls_metadata[call]:
130+
output_metadata[call].append(
131+
filter_outputs_from_workflow_metadata(
132+
scatter["subWorkflowMetadata"]
133+
)
134+
)
135+
else:
136+
output_metadata[call] = []
137+
for index in index_list:
138+
output_metadata[call].append(index.get(extract_task_key))
139+
140+
check_for_empty_output(output_metadata, workflow_metadata["id"])
141+
142+
return output_metadata
143+
144+
145+
def print_task_level_outputs(output_metadata: dict) -> None:
146+
"""Print the outputs from the workflow metadata
147+
output_metadata: {call_name:[index1{output_name: outputvalue}, index2{...}, ...], call_name:[], ...}
148+
149+
Args:
150+
output_metadata (dict): The output metadata from the workflow
151+
"""
152+
for call, index_list in output_metadata.items():
153+
print(call)
154+
for call_index in index_list:
155+
if call_index is not None:
156+
print_file_like_value_in_dict(outputs_metadata=call_index, indent=True)
157+
158+
159+
def print_file_like_value_in_dict(outputs_metadata: dict, indent: bool) -> None:
160+
"""Print the file like values in the output metadata dictionary
161+
162+
Args:
163+
outputs_metadata (dict): The output metadata
164+
indent (bool): Whether to indent the output
165+
"""
166+
167+
for output_name, output_value in outputs_metadata.items():
168+
if isinstance(output_value, str):
169+
print_output_name_and_file(output_name, output_value, indent=indent)
170+
elif isinstance(output_value, list):
171+
for output_value_item in output_value:
172+
print_output_name_and_file(
173+
output_name, output_value_item, indent=indent
174+
)
175+
176+
177+
def print_output_name_and_file(
178+
output_name: str, output_value: str, indent: bool = True
179+
) -> None:
180+
"""Print the task name and the file name
181+
182+
Args:
183+
output_name (str): The task output name
184+
output_value (str): The task output value
185+
indent (bool): Whether to indent the output"""
186+
187+
i = "\t" if indent else ""
188+
189+
if isinstance(output_value, str):
190+
if is_path_or_url_like(output_value):
191+
print(f"{i}{output_name}: {output_value}")
192+
193+
194+
def is_path_or_url_like(in_string: str) -> bool:
195+
"""Check if the string is a path or url
196+
197+
Args:
198+
in_string (str): The string to check for path or url like-ness
199+
"""
200+
if (
201+
in_string.startswith("gs://")
202+
or in_string.startswith("/")
203+
or in_string.startswith("http://")
204+
or in_string.startswith("https://")
205+
):
206+
return True
207+
else:
208+
return False
209+
210+
211+
def check_for_empty_output(workflow_outputs: dict, workflow_id: str) -> None:
212+
"""Check if the workflow outputs are empty
213+
214+
Args:
215+
cromwell_outputs (dict): Dictionary of workflow outputs
216+
:param workflow_id: The workflow id
217+
"""
218+
if not workflow_outputs:
219+
LOGGER.error(f"No outputs found for workflow: {workflow_id}")
220+
raise Exception(f"No outputs found for workflow: {workflow_id}")

src/cromshell/utilities/command_setup_utils.py

+12
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,17 @@ def resolve_workflow_id_and_server(workflow_id: str, cromshell_config) -> str:
1717
http_utils.set_and_check_cromwell_server(
1818
config=cromshell_config, workflow_id=resolved_workflow_id
1919
)
20+
set_workflow_id(workflow_id=resolved_workflow_id, cromshell_config=cromshell_config)
2021

2122
return resolved_workflow_id
23+
24+
25+
def set_workflow_id(workflow_id: str, cromshell_config) -> None:
26+
"""
27+
Sets the workflow id in the config object
28+
29+
:param workflow_id: workflow UUID
30+
:param cromshell_config:
31+
:return: None
32+
"""
33+
cromshell_config.workflow_id = workflow_id

src/cromshell/utilities/cromshellconfig.py

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
]
2828
CROMWELL_API_STRING = "/api/workflows/v1"
2929
WOMTOOL_API_STRING = "/api/womtool/v1"
30+
workflow_id = None
3031
# Concatenate the cromwell url, api string, and workflow ID. Set in subcommand.
3132
cromwell_api_workflow_id = None
3233
# Defaults for variables will be set after functions have been defined
+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
from pathlib import Path
2+
3+
import pytest
4+
5+
from tests.integration import utility_test_functions
6+
7+
workflows_path = Path(__file__).parents[1].joinpath("workflows/")
8+
9+
10+
class TestListOutputs:
11+
@pytest.mark.parametrize(
12+
"wdl, json_file, options, output_template",
13+
[
14+
(
15+
"tests/workflows/helloWorld.wdl",
16+
"tests/workflows/helloWorld.json",
17+
None,
18+
[
19+
"HelloWorld.output_file: /cromwell-executions/HelloWorld/<workflow-id>/call-HelloWorldTask/execution/stdout",
20+
"",
21+
],
22+
),
23+
(
24+
"tests/workflows/helloWorld.wdl",
25+
"tests/workflows/helloWorld.json",
26+
["-d"],
27+
[
28+
"HelloWorld.HelloWorldTask",
29+
"\toutput_file: /cromwell-executions/HelloWorld/<workflow-id>/call-HelloWorldTask/execution/stdout",
30+
"",
31+
],
32+
),
33+
(
34+
"tests/workflows/helloWorld.wdl",
35+
"tests/workflows/helloWorld.json",
36+
["-j"],
37+
[
38+
"{",
39+
' "HelloWorld.output_file": "/cromwell-executions/HelloWorld/<workflow-id>/call-HelloWorldTask/execution/stdout"',
40+
"}",
41+
"",
42+
],
43+
),
44+
(
45+
"tests/workflows/helloWorld.wdl",
46+
"tests/workflows/helloWorld.json",
47+
["-j", "-d"],
48+
[
49+
"{",
50+
' "HelloWorld.HelloWorldTask": [',
51+
" {",
52+
' "output_file": "/cromwell-executions/HelloWorld/<workflow-id>/call-HelloWorldTask/execution/stdout"',
53+
" }",
54+
" ]",
55+
"}",
56+
"",
57+
],
58+
),
59+
],
60+
)
61+
def test_list_outputs(
62+
self,
63+
local_cromwell_url: str,
64+
wdl: str,
65+
json_file: str,
66+
options: list,
67+
output_template: list,
68+
ansi_escape,
69+
):
70+
# submit workflow
71+
test_workflow_id = utility_test_functions.submit_workflow(
72+
local_cromwell_url=local_cromwell_url,
73+
wdl=wdl,
74+
json_file=json_file,
75+
exit_code=0,
76+
)
77+
78+
utility_test_functions.wait_for_workflow_completion(
79+
test_workflow_id=test_workflow_id
80+
)
81+
82+
# run list-outputs
83+
status_result = utility_test_functions.run_cromshell_command(
84+
command=["list-outputs", test_workflow_id],
85+
exit_code=0,
86+
subcommand_options=options,
87+
)
88+
89+
status_result_per_line = status_result.stdout.split("\n")
90+
91+
workflow_outputs = [
92+
sub.replace("<workflow-id>", test_workflow_id) for sub in output_template
93+
]
94+
95+
print("Print workflow list-outputs results:")
96+
for line in status_result_per_line:
97+
print(line)
98+
99+
assert status_result_per_line == workflow_outputs

tests/integration/utility_test_functions.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@
1010
from cromshell.utilities import cromshellconfig
1111

1212

13-
def run_cromshell_command(command: list, exit_code: int):
13+
def run_cromshell_command(
14+
command: list, exit_code: int, subcommand_options: list = None
15+
):
1416
"""
1517
Run cromshell alias using CliRunner and assert job is successful
1618
19+
:param subcommand_options: The options to pass to the subcommand
1720
:param command: The subcommand, options, and arguments in list form e.g.
1821
[
1922
"alias",
@@ -25,12 +28,18 @@ def run_cromshell_command(command: list, exit_code: int):
2528
:return: results from execution
2629
"""
2730

31+
if subcommand_options:
32+
command_with_options = command[:1] + subcommand_options + command[1:]
33+
else:
34+
command_with_options = command
35+
2836
runner = CliRunner(mix_stderr=False)
2937
# The absolute path will be passed to the invoke command because
3038
# the test is being run in temp directory created by CliRunner.
3139
with runner.isolated_filesystem():
32-
result = runner.invoke(cromshell, command)
40+
result = runner.invoke(cromshell, command_with_options)
3341
assert result.exit_code == exit_code, (
42+
f"\nCOMMAND:\n{command_with_options}"
3443
f"\nSTDOUT:\n{result.stdout}"
3544
f"\nSTDERR:\n{result.stderr}"
3645
f"\nExceptions:\n{result.exception}"

0 commit comments

Comments
 (0)