Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs/ro-crate.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ sapporo-service accepts arbitrary workflow engines and workflow languages via th
| Non-numeric `exit_code` | `FailedActionStatus` set; `exitCode` property omitted |
| Output file disappeared after listing | File skipped; remaining outputs processed normally |
| Missing `run_request.json` | `TypeError` raised with a descriptive message (generation cannot proceed) |
| tataki Docker unavailable or fails | Warning logged; `encodingFormat` left unchanged |

## Entity Graph

Expand Down Expand Up @@ -174,6 +175,14 @@ For output files with VCF format (`.vcf`, `.vcf.gz`), `vcf-stats` is run in a Do

Output files are automatically annotated with [EDAM ontology](http://edamontology.org/) format identifiers based on file extension. EDAM entities use `@type: "Thing"` as they represent ontology terms rather than web resources. The mapping is defined in `sapporo/ro_crate.py` (`EDAM_MAPPING` dict). Common non-bioinformatics formats (JSON, CSV, TSV, HTML, YAML, Markdown, ZIP, gzip, plain text) are also mapped to their IANA media types.

### tataki Content-Based Format Detection

[tataki](https://github.com/sapporo-wes/tataki) is run in a Docker container (`ghcr.io/sapporo-wes/tataki:latest`) against all output files after the extension-based EDAM detection. tataki detects file formats by inspecting file content (magic bytes, structure analysis) rather than relying on file extensions, covering both bioinformatics formats (BAM, VCF, FASTQ, ...) and common formats (TSV, CSV, JSON, HTML, PDF, PNG, SVG).

When tataki identifies a file's format, the file's `encodingFormat` is replaced with the EDAM ontology entity returned by tataki. Files that tataki cannot identify retain their original `encodingFormat` (extension-based EDAM + MIME type).

This enrichment enables [tonkaz](https://github.com/sapporo-wes/tonkaz) Level 1-3 file-content comparison on typical workflow outputs. If Docker is not available or tataki fails, the enrichment is silently skipped.

## API Endpoint

### `GET /runs/{run_id}/ro-crate`
Expand Down Expand Up @@ -207,7 +216,8 @@ The generation flow:
4. Build the `CreateAction` with inputs, outputs, logs, and metadata.
5. Run MultiQC in Docker and attach statistics (skipped if Docker is unavailable).
6. Run samtools/vcftools in Docker on applicable output files (skipped if Docker is unavailable).
7. Write `ro-crate-metadata.json` and `README.md` to the run directory.
7. Run tataki in Docker to enrich output files with EDAM format IDs (skipped if Docker is unavailable).
8. Write `ro-crate-metadata.json` and `README.md` to the run directory.

## Validation

Expand Down
73 changes: 73 additions & 0 deletions sapporo/ro_crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
BIOSCHEMAS_COMPUTATIONAL_WORKFLOW = "https://bioschemas.org/profiles/ComputationalWorkflow/1.0-RELEASE"

_STDERR_TAIL_LINES = 20
_TATAKI_IMAGE = "ghcr.io/sapporo-wes/tataki:latest"
_DOCKER_IMAGE_RE = re.compile(
r"(?:^|\s)"
r"((?:[\w.-]+(?::\d+)?/)?[\w.-]+(?:/[\w.-]+)*:[\w.+-]+)"
Expand Down Expand Up @@ -1028,6 +1029,77 @@ def add_readme_entity(crate: ROCrate, run_dir: Path, run_id: str) -> None:
crate.add(file_ins)


# === tataki EDAM enrichment ===


def add_tataki_edam(crate: ROCrate, run_dir: Path) -> None:
"""Enrich output File entities with EDAM format IDs from tataki.

Runs tataki via Docker against all files in the outputs directory and
replaces their ``encodingFormat`` with a proper EDAM ontology entity.
Silently skips if Docker is unavailable or tataki fails — enrichment
is always best-effort and never causes the run to fail.
"""
if shutil.which("docker") is None:
return

outputs_dir = run_dir / RUN_DIR_STRUCTURE["outputs_dir"]
if not outputs_dir.exists():
return

rel_paths = [p.relative_to(outputs_dir) for p in outputs_dir.glob("**/*") if p.is_file()]
if not rel_paths:
return

cmd = [
"docker",
"run",
"--rm",
"-v",
f"{outputs_dir}:/work",
"-w",
"/work",
_TATAKI_IMAGE,
"-f",
"json",
"--quiet",
*[f"/work/{rel}" for rel in rel_paths],
]
try:
proc = subprocess.run(cmd, capture_output=True, check=False, timeout=300)
except subprocess.TimeoutExpired:
LOGGER.warning("tataki Docker command timed out after 300s")
return
if proc.returncode != 0:
LOGGER.warning("tataki Docker command failed (rc=%d): %s", proc.returncode, proc.stderr.decode()[:500])
return
try:
detections: dict[str, Any] = json.loads(proc.stdout)
except json.JSONDecodeError:
LOGGER.warning("Failed to parse tataki JSON output")
return

for container_path, detection in detections.items():
edam_id: str | None = detection.get("id")
edam_label: str | None = detection.get("label")
if not edam_id:
continue

rel_from_work = container_path.removeprefix("/work/")
entity_rel = str(Path(RUN_DIR_STRUCTURE["outputs_dir"]) / rel_from_work)
entity = crate.get(entity_rel)
if entity is None:
continue

edam_entity = ContextEntity(
crate,
edam_id,
properties={"@type": "Thing", "name": edam_label or edam_id},
)
crate.add(edam_entity)
entity["encodingFormat"] = edam_entity


# === Entry points ===


Expand Down Expand Up @@ -1083,6 +1155,7 @@ def generate_ro_crate_metadata(run_dir: Path) -> dict[str, Any]:
action.append_to("executedBy", sapporo_ins, compact=True)

add_readme_entity(crate, run_dir, run_id)
add_tataki_edam(crate, run_dir)

result: dict[str, Any] = crate.metadata.generate()

Expand Down
169 changes: 169 additions & 0 deletions tests/unit/test_ro_crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
add_file_stats,
add_multiqc_stats,
add_samtools_stats,
add_tataki_edam,
add_vcftools_stats,
add_workflow_entity,
compute_sha256,
Expand Down Expand Up @@ -1716,6 +1717,174 @@ def test_generated_by_has_vcftools_software(self, tmp_path: Path, mocker: "Mocke
assert generated_by is not None


class TestAddTatakiEdam:
_TATAKI_JSON = json.dumps(
{
"/work/result.tsv": {
"id": "http://edamontology.org/format_3475",
"label": "TSV",
"decompressed": {"id": None, "label": None},
},
}
).encode()

def _make_run_dir_with_outputs(self, tmp_path: Path, output_files: dict[str, str | bytes]) -> Path:
"""Create a run directory with output files and generate an RO-Crate."""
return create_run_dir(
tmp_path,
RUN_ID,
exit_code="0",
end_time="2024-01-01T00:10:00",
wf_params="{}",
output_files=output_files,
outputs_json=[],
)

def test_skips_when_docker_not_available(self, tmp_path: Path) -> None:
"""Should return immediately when docker binary is not found."""
rd = self._make_run_dir_with_outputs(tmp_path, {"result.tsv": "a\tb\n"})
crate = create_base_crate()
with patch("sapporo.ro_crate.shutil.which", return_value=None):
add_tataki_edam(crate, rd)
# No EDAM entities added
edam_entities = [e for e in crate.get_entities() if "edamontology.org" in str(e.id)]
assert edam_entities == []

def test_skips_when_outputs_dir_missing(self, tmp_path: Path) -> None:
"""Should return immediately when outputs directory does not exist."""
rd = create_run_dir(tmp_path, RUN_ID, exit_code="0", end_time="2024-01-01T00:10:00", wf_params="{}")
crate = create_base_crate()
add_tataki_edam(crate, rd)
edam_entities = [e for e in crate.get_entities() if "edamontology.org" in str(e.id)]
assert edam_entities == []

def test_skips_when_no_output_files(self, tmp_path: Path) -> None:
"""Should return immediately when outputs directory is empty."""
rd = create_run_dir(tmp_path, RUN_ID, exit_code="0", end_time="2024-01-01T00:10:00", wf_params="{}")
from sapporo.config import RUN_DIR_STRUCTURE

rd.joinpath(RUN_DIR_STRUCTURE["outputs_dir"]).mkdir(parents=True, exist_ok=True)
crate = create_base_crate()
add_tataki_edam(crate, rd)
edam_entities = [e for e in crate.get_entities() if "edamontology.org" in str(e.id)]
assert edam_entities == []

def test_skips_on_docker_failure(self, tmp_path: Path, mocker: "MockerFixture") -> None:
"""Should log warning and return when Docker command fails."""
from subprocess import CompletedProcess

rd = self._make_run_dir_with_outputs(tmp_path, {"result.tsv": "a\tb\n"})
crate = create_base_crate()
mocker.patch("sapporo.ro_crate.shutil.which", return_value="/usr/bin/docker")
mocker.patch(
"sapporo.ro_crate.subprocess.run",
return_value=CompletedProcess(args=[], returncode=1, stdout=b"", stderr=b"image not found"),
)
add_tataki_edam(crate, rd)
edam_entities = [e for e in crate.get_entities() if "edamontology.org" in str(e.id)]
assert edam_entities == []

def test_skips_on_timeout(self, tmp_path: Path, mocker: "MockerFixture") -> None:
"""Should log warning and return when Docker command times out."""
import subprocess as sp

rd = self._make_run_dir_with_outputs(tmp_path, {"result.tsv": "a\tb\n"})
crate = create_base_crate()
mocker.patch("sapporo.ro_crate.shutil.which", return_value="/usr/bin/docker")
mocker.patch("sapporo.ro_crate.subprocess.run", side_effect=sp.TimeoutExpired(cmd="docker", timeout=300))
add_tataki_edam(crate, rd)
edam_entities = [e for e in crate.get_entities() if "edamontology.org" in str(e.id)]
assert edam_entities == []

def test_skips_on_invalid_json(self, tmp_path: Path, mocker: "MockerFixture") -> None:
"""Should log warning and return when tataki output is not valid JSON."""
from subprocess import CompletedProcess

rd = self._make_run_dir_with_outputs(tmp_path, {"result.tsv": "a\tb\n"})
crate = create_base_crate()
mocker.patch("sapporo.ro_crate.shutil.which", return_value="/usr/bin/docker")
mocker.patch(
"sapporo.ro_crate.subprocess.run",
return_value=CompletedProcess(args=[], returncode=0, stdout=b"not json", stderr=b""),
)
add_tataki_edam(crate, rd)
edam_entities = [e for e in crate.get_entities() if "edamontology.org" in str(e.id)]
assert edam_entities == []

def test_enriches_encoding_format(self, tmp_path: Path, mocker: "MockerFixture") -> None:
"""Should replace encodingFormat with EDAM entity from tataki."""
from subprocess import CompletedProcess

from sapporo.config import RUN_DIR_STRUCTURE

rd = self._make_run_dir_with_outputs(tmp_path, {"result.tsv": "col1\tcol2\nval1\tval2\n"})
mocker.patch("sapporo.ro_crate.shutil.which", return_value="/usr/bin/docker")
mocker.patch(
"sapporo.ro_crate.subprocess.run",
return_value=CompletedProcess(args=[], returncode=0, stdout=self._TATAKI_JSON, stderr=b""),
)

jsonld = generate_ro_crate_metadata(rd)
graph = jsonld["@graph"]

# Find the output file entity
outputs_prefix = RUN_DIR_STRUCTURE["outputs_dir"]
file_entity = next(
(e for e in graph if e.get("@id", "").startswith(outputs_prefix) and "result.tsv" in e.get("@id", "")), None
)
assert file_entity is not None

# encodingFormat should be the EDAM entity reference
enc = file_entity["encodingFormat"]
assert enc == {"@id": "http://edamontology.org/format_3475"}

# EDAM entity should exist in the graph
edam_entity = next((e for e in graph if e.get("@id") == "http://edamontology.org/format_3475"), None)
assert edam_entity is not None
assert edam_entity["@type"] == "Thing"
assert edam_entity["name"] == "TSV"

def test_skips_undetected_files(self, tmp_path: Path, mocker: "MockerFixture") -> None:
"""Should not modify encodingFormat when tataki returns null id."""
from subprocess import CompletedProcess

from sapporo.config import RUN_DIR_STRUCTURE

tataki_null = json.dumps(
{
"/work/unknown.bin": {
"id": None,
"label": None,
"decompressed": {"id": None, "label": None},
},
}
).encode()

rd = self._make_run_dir_with_outputs(tmp_path, {"unknown.bin": "\x00\x01\x02"})
mocker.patch("sapporo.ro_crate.shutil.which", return_value="/usr/bin/docker")
mocker.patch(
"sapporo.ro_crate.subprocess.run",
return_value=CompletedProcess(args=[], returncode=0, stdout=tataki_null, stderr=b""),
)

jsonld = generate_ro_crate_metadata(rd)
graph = jsonld["@graph"]

# File entity should exist with original encodingFormat (not replaced)
outputs_prefix = RUN_DIR_STRUCTURE["outputs_dir"]
file_entity = next(
(e for e in graph if e.get("@id", "").startswith(outputs_prefix) and "unknown.bin" in e.get("@id", "")),
None,
)
assert file_entity is not None

# No EDAM entity from tataki should be in the graph
tataki_edam = [
e for e in graph if e.get("@id", "").startswith("http://edamontology.org/format_") and e.get("name") is None
]
assert tataki_edam == []


class TestAddFileStats:
def test_skips_when_docker_not_available(self, tmp_path: Path) -> None:
"""Should return immediately when docker binary is not found."""
Expand Down