Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: implement upgrade_all_documents #84

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 72 additions & 49 deletions cwlupgrader/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import argparse
import copy
import logging
import os
import stat
import sys
from collections.abc import MutableSequence, Sequence
from glob import iglob
from pathlib import Path
from typing import Any, Callable, Dict, List, MutableMapping, Optional, Set, Union

Expand Down Expand Up @@ -61,7 +63,6 @@ def main(args: Optional[List[str]] = None) -> int:

def run(args: argparse.Namespace) -> int:
"""Main function."""
imports: Set[str] = set()
for path in args.inputs:
_logger.info("Processing %s", path)
document = load_cwl_document(path)
Expand All @@ -85,22 +86,16 @@ def run(args: argparse.Namespace) -> int:
target_version = "latest"
upgraded_document = upgrade_document(
document,
args.dir,
target_version=target_version,
imports=imports,
)
write_cwl_document(upgraded_document, Path(path).name, args.dir)
return 0


def upgrade_document(
document: Any,
output_dir: str,
target_version: Optional[str] = "latest",
imports: Optional[Set[str]] = None,
) -> Any:
if imports is None:
imports = set()
supported_versions = ["v1.0", "v1.1", "v1.2", "latest"]
if target_version not in supported_versions:
_logger.error(f"Unsupported target cwlVersion: {target_version}")
Expand All @@ -109,16 +104,12 @@ def upgrade_document(
if version == "cwl:draft-3" or version == "draft-3":
if target_version == "v1.0":
main_updater = draft3_to_v1_0
inner_updater = _draft3_to_v1_0
elif target_version == "v1.1":
main_updater = draft3_to_v1_1
inner_updater = _draft3_to_v1_1
elif target_version == "v1.2":
main_updater = draft3_to_v1_2
inner_updater = _draft3_to_v1_2
elif target_version == "latest":
main_updater = draft3_to_v1_2
inner_updater = _draft3_to_v1_2
else:
pass # does not happen
elif version == "v1.0":
Expand All @@ -127,13 +118,10 @@ def upgrade_document(
return
elif target_version == "v1.1":
main_updater = v1_0_to_v1_1
inner_updater = _v1_0_to_v1_1
elif target_version == "v1.2":
main_updater = v1_0_to_v1_2
inner_updater = _v1_0_to_v1_2
elif target_version == "latest":
main_updater = v1_0_to_v1_2
inner_updater = _v1_0_to_v1_2
else:
pass # does not happen
elif version == "v1.1":
Expand All @@ -142,16 +130,49 @@ def upgrade_document(
return
elif target_version == "v1.2":
main_updater = v1_1_to_v1_2
inner_updater = _v1_1_to_v1_2
elif target_version == "latest":
main_updater = v1_1_to_v1_2
inner_updater = _v1_1_to_v1_2
else:
pass # does not happen? How to do the case that base version is v1.0?
else:
_logger.error(f"Unsupported cwlVersion: {version}")
process_imports(document, imports, inner_updater, output_dir)
return main_updater(document, output_dir)
return main_updater(document)


def upgrade_all_documents(
base_dir: str,
output_dir: str,
target_version: Optional[str] = "latest",
overwrite: bool = False,
) -> None:
"""
Upgrade all the CWL documents in basedir and write them to output_dir
while keeping the same directory structure.
"""
base = Path(base_dir).resolve()
if not base.exists():
raise Exception(f"Directory does not exist: {base_dir}")
if not base.is_dir():
raise Exception(f"{base_dir} is not a directory")

outdir = Path(output_dir).resolve()
if outdir.exists() and not overwrite:
raise Exception(f"{output_dir} already exists")

imports = set()

for cwl_ in iglob(str(base / "**/*.cwl"), recursive=True):
cwl = Path(cwl_).relative_to(base)
target = outdir / cwl
doc = load_cwl_document(base / cwl)
im = collect_imports(doc, cwl.parent)
imports = imports | im
upgraded = upgrade_document(doc, target_version=target_version)
os.makedirs(target.parent, exist_ok=True)
write_cwl_document(upgraded, target.name, target.parent)

for i in imports:
pass


def load_cwl_document(path: str) -> Any:
Expand Down Expand Up @@ -184,6 +205,12 @@ def write_cwl_document(document: Any, name: str, dirname: str) -> None:
path.chmod(path.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)


def collect_imports(document: Any, basedir: Path) -> Set[str]:
"""Find any '$import's and return them."""
# TODO
return set()


def process_imports(
document: Any, imports: Set[str], updater: Callable[[Any, str], Any], outdir: str
) -> None:
Expand All @@ -210,62 +237,62 @@ def process_imports(
process_imports(entry, imports, updater, outdir)


def v1_0_to_v1_1(document: CommentedMap, outdir: str) -> CommentedMap:
def v1_0_to_v1_1(document: CommentedMap) -> CommentedMap:
"""CWL v1.0.x to v1.1 transformation loop."""
_v1_0_to_v1_1(document, outdir)
_v1_0_to_v1_1(document)
if isinstance(document, MutableMapping):
for key, value in document.items():
with SourceLine(document, key, Exception):
if isinstance(value, CommentedMap):
document[key] = _v1_0_to_v1_1(value, outdir)
document[key] = _v1_0_to_v1_1(value)
elif isinstance(value, list):
for index, entry in enumerate(value):
if isinstance(entry, CommentedMap):
value[index] = _v1_0_to_v1_1(entry, outdir)
value[index] = _v1_0_to_v1_1(entry)
document["cwlVersion"] = "v1.1"
return sort_v1_0(document)


def v1_0_to_v1_2(document: CommentedMap, outdir: str) -> CommentedMap:
def v1_0_to_v1_2(document: CommentedMap) -> CommentedMap:
"""CWL v1.0.x to v1.2 transformation."""
document = v1_0_to_v1_1(document, outdir)
document = v1_0_to_v1_1(document)
document["cwlVersion"] = "v1.2"
return document


def v1_1_to_v1_2(document: CommentedMap, outdir: str) -> CommentedMap:
def v1_1_to_v1_2(document: CommentedMap) -> CommentedMap:
"""CWL v1.1 to v1.2 transformation."""
document["cwlVersion"] = "v1.2"
return document


def draft3_to_v1_0(document: CommentedMap, outdir: str) -> CommentedMap:
def draft3_to_v1_0(document: CommentedMap) -> CommentedMap:
"""Transformation loop."""
_draft3_to_v1_0(document, outdir)
_draft3_to_v1_0(document)
if isinstance(document, MutableMapping):
for key, value in document.items():
with SourceLine(document, key, Exception):
if isinstance(value, CommentedMap):
document[key] = _draft3_to_v1_0(value, outdir)
document[key] = _draft3_to_v1_0(value)
elif isinstance(value, list):
for index, entry in enumerate(value):
if isinstance(entry, CommentedMap):
value[index] = _draft3_to_v1_0(entry, outdir)
value[index] = _draft3_to_v1_0(entry)
document["cwlVersion"] = "v1.0"
return sort_v1_0(document)


def draft3_to_v1_1(document: CommentedMap, outdir: str) -> CommentedMap:
def draft3_to_v1_1(document: CommentedMap) -> CommentedMap:
"""transformation loop."""
return v1_0_to_v1_1(draft3_to_v1_0(document, outdir), outdir)
return v1_0_to_v1_1(draft3_to_v1_0(document))


def draft3_to_v1_2(document: CommentedMap, outdir: str) -> CommentedMap:
def draft3_to_v1_2(document: CommentedMap) -> CommentedMap:
"""transformation loop."""
return v1_1_to_v1_2(v1_0_to_v1_1(draft3_to_v1_0(document, outdir), outdir), outdir)
return v1_1_to_v1_2(v1_0_to_v1_1(draft3_to_v1_0(document)))


def _draft3_to_v1_0(document: CommentedMap, outdir: str) -> CommentedMap:
def _draft3_to_v1_0(document: CommentedMap) -> CommentedMap:
"""Inner loop for transforming draft-3 to v1.0."""
if "class" in document:
if document["class"] == "Workflow":
Expand All @@ -290,12 +317,12 @@ def _draft3_to_v1_0(document: CommentedMap, outdir: str) -> CommentedMap:
return document


def _draft3_to_v1_1(document: CommentedMap, outdir: str) -> CommentedMap:
return v1_0_to_v1_1(_draft3_to_v1_0(document, outdir), outdir)
def _draft3_to_v1_1(document: CommentedMap) -> CommentedMap:
return v1_0_to_v1_1(_draft3_to_v1_0(document))


def _draft3_to_v1_2(document: CommentedMap, outdir: str) -> CommentedMap:
return _draft3_to_v1_1(document, outdir) # nothing needs doing for 1.2
def _draft3_to_v1_2(document: CommentedMap) -> CommentedMap:
return _draft3_to_v1_1(document) # nothing needs doing for 1.2


WORKFLOW_INPUT_INPUTBINDING = (
Expand All @@ -313,7 +340,7 @@ def _draft3_to_v1_2(document: CommentedMap, outdir: str) -> CommentedMap:
}


def _v1_0_to_v1_1(document: CommentedMap, outdir: str) -> CommentedMap:
def _v1_0_to_v1_1(document: CommentedMap) -> CommentedMap:
"""Inner loop for transforming draft-3 to v1.0."""
if "class" in document:
if document["class"] == "Workflow":
Expand All @@ -327,7 +354,7 @@ def _v1_0_to_v1_1(document: CommentedMap, outdir: str) -> CommentedMap:
upgrade_v1_0_hints_and_reqs(entry)
if "run" in entry and isinstance(entry["run"], CommentedMap):
process = entry["run"]
_v1_0_to_v1_1(process, outdir)
_v1_0_to_v1_1(process)
if "cwlVersion" in process:
del process["cwlVersion"]
elif isinstance(steps, MutableMapping):
Expand All @@ -338,15 +365,11 @@ def _v1_0_to_v1_1(document: CommentedMap, outdir: str) -> CommentedMap:
if "run" in entry:
if isinstance(entry["run"], CommentedMap):
process = entry["run"]
_v1_0_to_v1_1(process, outdir)
_v1_0_to_v1_1(process)
if "cwlVersion" in process:
del process["cwlVersion"]
elif isinstance(entry["run"], str):
path = Path(document.lc.filename).parent / entry["run"]
process = v1_0_to_v1_1(
load_cwl_document(str(path)), outdir
)
write_cwl_document(process, path.name, outdir)
pass
else:
raise Exception(
"'run' entry was neither a CWL Process nor "
Expand Down Expand Up @@ -387,11 +410,11 @@ def _v1_0_to_v1_1(document: CommentedMap, outdir: str) -> CommentedMap:
return document


def _v1_0_to_v1_2(document: CommentedMap, outdir: str) -> CommentedMap:
return _v1_0_to_v1_1(document, outdir) # nothing needs doing for v1.2
def _v1_0_to_v1_2(document: CommentedMap) -> CommentedMap:
return _v1_0_to_v1_1(document) # nothing needs doing for v1.2


def _v1_1_to_v1_2(document: CommentedMap, outdir: str) -> CommentedMap:
def _v1_1_to_v1_2(document: CommentedMap) -> CommentedMap:
return document


Expand Down
34 changes: 24 additions & 10 deletions tests/test_complete.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
import filecmp
from pathlib import Path

from cwlupgrader.main import load_cwl_document, main, upgrade_document
from cwlupgrader.main import load_cwl_document, upgrade_all_documents, upgrade_document

from .util import get_data


def test_draft3_workflow(tmp_path: Path) -> None:
"""Basic draft3 to CWL v1.1 test."""
main([f"--dir={tmp_path}", "--v1-only", get_data("testdata/draft-3/wf.cwl")])
# def test_draft3_workflow(tmp_path: Path) -> None:
# """Basic draft3 to CWL v1.1 test."""
# main([f"--dir={tmp_path}", "--v1-only", get_data("testdata/draft-3/wf.cwl")])
# result = filecmp.cmp(
# get_data("testdata/v1.0/wf.cwl"),
# tmp_path / "wf.cwl",
# shallow=False,
# )
# assert result

def test_upgrade_all_documents(tmp_path: Path) -> None:
"""Test `upgrade_all_documents`"""
upgrade_all_documents(
base_dir=get_data("testdata/draft-3"),
output_dir=tmp_path,
target_version="v1.0",
overwrite=True)
result = filecmp.cmp(
get_data("testdata/v1.0/wf.cwl"),
tmp_path / "wf.cwl",
Expand All @@ -17,22 +31,22 @@ def test_draft3_workflow(tmp_path: Path) -> None:
assert result


def test_invalid_target(tmp_path: Path) -> None:
def test_invalid_target() -> None:
"""Test for invalid target version"""
doc = load_cwl_document(get_data("testdata/v1.0/listing_deep1.cwl"))
result = upgrade_document(doc, str(tmp_path), "invalid-version")
result = upgrade_document(doc, "invalid-version")
assert result is None


def test_v1_0_to_v1_1(tmp_path: Path) -> None:
def test_v1_0_to_v1_1() -> None:
"""Basic CWL v1.0 to CWL v1.1 test."""
doc = load_cwl_document(get_data("testdata/v1.0/listing_deep1.cwl"))
upgraded = upgrade_document(doc, str(tmp_path), "v1.1")
upgraded = upgrade_document(doc, "v1.1")
assert doc == upgraded


def test_v1_1_to_v1_2(tmp_path: Path) -> None:
def test_v1_1_to_v1_2() -> None:
"""Basic CWL v1.1 to CWL v1.2 test."""
doc = load_cwl_document(get_data("testdata/v1.1/listing_deep1.cwl"))
upgraded = upgrade_document(doc, str(tmp_path), "v1.2")
upgraded = upgrade_document(doc, "v1.2")
assert doc == upgraded