diff --git a/src/rbc/bids/metrics.py b/src/rbc/bids/metrics.py index b2bda843..422a9918 100644 --- a/src/rbc/bids/metrics.py +++ b/src/rbc/bids/metrics.py @@ -47,13 +47,13 @@ def resolve_metrics( deriv_df, suffix=Suffix.BOLD, desc="regressed", - extra={"reg": regressor}, + extra={"reg": bids_safe_label(regressor)}, ), "cleaned_bold": mni_q.expect( deriv_df, suffix=Suffix.BOLD, desc="preproc", - extra={"reg": regressor}, + extra={"reg": bids_safe_label(regressor)}, ), } diff --git a/src/rbc/bids/qc.py b/src/rbc/bids/qc.py index f249d619..d036c05a 100644 --- a/src/rbc/bids/qc.py +++ b/src/rbc/bids/qc.py @@ -51,13 +51,15 @@ def resolve_qc( Dict with keys matching ``single_session_qc`` parameters. """ return { - "template_bold": func_mni.expect(deriv_df, suffix=Suffix.BOLD, desc="preproc"), + "template_bold": func_mni.expect( + deriv_df, suffix=Suffix.BOLD, desc="preproc", extra={"reg": False} + ), "cleaned_bold": { reg: func_mni.expect( deriv_df, suffix=Suffix.BOLD, desc="preproc", - extra={"reg": reg}, + extra={"reg": bids_safe_label(reg)}, ) for reg in regressors }, diff --git a/src/rbc/orchestration/all.py b/src/rbc/orchestration/all.py index 2b920778..2a5454fa 100644 --- a/src/rbc/orchestration/all.py +++ b/src/rbc/orchestration/all.py @@ -84,7 +84,7 @@ def run( df = filters.apply( df, - pl.col("ses") != "longitudinal", + pl.col("ses").ne_missing("longitudinal"), pl.col("space").is_null(), pl.col("desc").is_null(), ) diff --git a/src/rbc/orchestration/anatomical.py b/src/rbc/orchestration/anatomical.py index 1c229a02..db377455 100644 --- a/src/rbc/orchestration/anatomical.py +++ b/src/rbc/orchestration/anatomical.py @@ -93,7 +93,7 @@ def run( df = filters.apply( df, - pl.col("ses") != "longitudinal", + pl.col("ses").ne_missing("longitudinal"), pl.col("space").is_null(), pl.col("desc").is_null(), ) diff --git a/src/rbc/orchestration/functional.py b/src/rbc/orchestration/functional.py index 084d5199..d6058814 100644 --- a/src/rbc/orchestration/functional.py +++ b/src/rbc/orchestration/functional.py @@ -132,7 +132,7 @@ def run( df = filters.apply( df, - pl.col("ses") != "longitudinal", + pl.col("ses").ne_missing("longitudinal"), pl.col("space").is_null(), ) diff --git a/src/rbc/workflows/metrics.py b/src/rbc/workflows/metrics.py index 06e4da0f..929f2d9d 100644 --- a/src/rbc/workflows/metrics.py +++ b/src/rbc/workflows/metrics.py @@ -17,6 +17,7 @@ from rbc.core.metrics.smoothing import smooth from rbc.core.metrics.standardization import compute_zscore from rbc.core.metrics.timeseries import compute_timeseries +from rbc.core.niwrap import generate_exec_folder if TYPE_CHECKING: from collections.abc import Mapping @@ -76,15 +77,23 @@ def single_session_metrics( Returns: All metric outputs bundled in a :class:`MetricsOutputs` tuple. """ + work_dir = generate_exec_folder("metrics") + # 1. ALFF / fALFF on regressed BOLD (non-bandpassed) _logger.info("Computing ALFF/fALFF") alff_path, falff_path = compute_alff( - regressed_bold, template_brain_mask, tr=tr, method="qm" + regressed_bold, + template_brain_mask, + tr=tr, + method="qm", + out_file=work_dir / "alff.nii.gz", ) # 2. ReHo on bandpass-filtered cleaned BOLD _logger.info("Computing ReHo") - reho_path = compute_reho(cleaned_bold, template_brain_mask) + reho_path = compute_reho( + cleaned_bold, template_brain_mask, out_file=work_dir / "reho.nii.gz" + ) # 3. Smooth raw maps _logger.info("Smoothing maps (FWHM=%.1f mm)", fwhm) @@ -103,7 +112,9 @@ def single_session_metrics( ts_outputs = {} for label, atlas_path in atlas_files.items(): _logger.info("Extracting atlas timeseries (%s)", label) - ts_outputs[label] = compute_timeseries(cleaned_bold, atlas_path) + ts_outputs[label] = compute_timeseries( + cleaned_bold, atlas_path, out_dir=work_dir + ) return MetricsOutputs( alff=alff_path, diff --git a/src/rbc/workflows/qc.py b/src/rbc/workflows/qc.py index 096cb14c..d2558428 100644 --- a/src/rbc/workflows/qc.py +++ b/src/rbc/workflows/qc.py @@ -17,6 +17,7 @@ from niwrap import fsl from rbc.bids import TemplateSpace +from rbc.core.niwrap import generate_exec_folder from rbc.core.qc.dvars import dvars_qc_metrics from rbc.core.qc.motion import framewise_displacement_jenkinson, motion_qc_metrics from rbc.core.qc.registration import registration_qc_metrics @@ -85,6 +86,8 @@ def single_session_qc( All QC outputs bundled in a :class:`QCOutputs` tuple. """ _logger.info("Computing QC metrics") + work_dir = generate_exec_folder("qc") + # 1. Load motion data rms_values = np.loadtxt(rms_rel) motion_data = np.loadtxt(motion_params) @@ -157,8 +160,7 @@ def single_session_qc( # 9. Write QC TSV qc_outputs.qc_file[regressor] = write_xcp_qc( qc_outputs.metrics[regressor], - template_bold.parent - / f"sub-{sub}_ses-{ses}_task-{task}_run-{run}_reg-{regressor}_qc.tsv", + work_dir / f"reg-{regressor}_qc.tsv", ) # 10. RBC pass/fail diff --git a/tests/integration/test_all.py b/tests/integration/test_all.py new file mode 100644 index 00000000..fc6159fd --- /dev/null +++ b/tests/integration/test_all.py @@ -0,0 +1,236 @@ +"""Integration test for ``rbc all`` pipeline stage handoff. + +Runs the full pipeline via ``rbc all`` (in-memory handoff) and via the +individual subcommands in sequence (disk round-trip), then verifies that +both approaches complete without errors, produce key derivative files, +and yield identical outputs. + +The sequential run reuses anatomical outputs from the ``rbc all`` run to +avoid running brain extraction and registration twice. +""" + +from __future__ import annotations + +import shutil +import subprocess +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from collections.abc import Sequence + +_TEST_DATASET = Path(__file__).parents[1] / "data" / "ds000001" + +# Subject with no session in ds000001. +_SUB = "01" +_TASK = "balloonanalogrisktask" +_RUN = "1" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _rbc_exe() -> str: + exe = shutil.which("rbc") + assert exe is not None, "rbc CLI not found on PATH" + return exe + + +def _run_rbc( + args: Sequence[str], *, timeout: int = 7200 +) -> subprocess.CompletedProcess[str]: + result = subprocess.run( # noqa: S603 + [_rbc_exe(), *args], + capture_output=True, + text=True, + timeout=timeout, + ) + assert result.returncode == 0, ( + f"rbc {args[0]} exited with code {result.returncode}\n" + f"--- stdout ---\n{result.stdout[-2000:]}\n" + f"--- stderr ---\n{result.stderr[-2000:]}" + ) + return result + + +_COMMON_ARGS: list[str] = ["--participant-label", _SUB] + + +def _relative_files(root: Path) -> set[str]: + """Return the set of file paths relative to *root*.""" + return {str(p.relative_to(root)) for p in root.rglob("*") if p.is_file()} + + +def _file_tree(root: Path) -> str: + """Return a newline-separated listing of all files under *root*.""" + files = sorted(p.relative_to(root) for p in root.rglob("*") if p.is_file()) + return "\n".join(str(f) for f in files) if files else "(empty)" + + +# --------------------------------------------------------------------------- +# Fixtures — each pipeline variant runs once per session +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="session") +def _runner(request: pytest.FixtureRequest) -> str: + return request.config.getoption("--runner") + + +@pytest.fixture(scope="session") +def all_output(tmp_path_factory: pytest.TempPathFactory, _runner: str) -> Path: + """Run ``rbc all`` once and return the output directory.""" + out = tmp_path_factory.mktemp("all") / "derivatives" + out.mkdir() + _run_rbc( + [ + "all", + str(_TEST_DATASET), + "-o", + str(out), + "--runner", + _runner, + *_COMMON_ARGS, + ] + ) + return out + + +@pytest.fixture(scope="session") +def sequential_output( + tmp_path_factory: pytest.TempPathFactory, + _runner: str, + all_output: Path, +) -> Path: + """Run functional/metrics/qc using anat outputs from ``rbc all``. + + Copies the anatomical derivatives produced by the ``all_output`` + fixture so that only the functional, metrics, and QC stages run, + saving ~30-40 min of redundant brain extraction and registration. + """ + out = tmp_path_factory.mktemp("sequential") / "derivatives" + out.mkdir() + + # Seed with anatomical outputs + dataset_description from rbc all + anat_src = all_output / f"sub-{_SUB}" / "anat" + anat_dst = out / f"sub-{_SUB}" / "anat" + shutil.copytree(anat_src, anat_dst) + shutil.copy2( + all_output / "dataset_description.json", + out / "dataset_description.json", + ) + + runner_args = ["--runner", _runner, *_COMMON_ARGS] + raw = str(_TEST_DATASET) + deriv = str(out) + + # functional (raw BIDS + anat derivatives) + _run_rbc(["functional", raw, deriv, "-o", deriv, *runner_args]) + + # metrics (derivatives from previous stages) + _run_rbc(["metrics", raw, deriv, "-o", deriv, *runner_args]) + + # qc (derivatives from previous stages) + _run_rbc(["qc", raw, deriv, "-o", deriv, *runner_args]) + + return out + + +# --------------------------------------------------------------------------- +# Assertion helpers +# --------------------------------------------------------------------------- + + +def _assert_derivatives_exist(output_dir: Path) -> None: + """Check that all expected derivative files are present.""" + tree = _file_tree(output_dir) + sub_dir = output_dir / f"sub-{_SUB}" + + # -- Dataset-level metadata -- + assert (output_dir / "dataset_description.json").is_file(), ( + f"Missing dataset_description.json\n--- file tree ---\n{tree}" + ) + + # -- Anatomical derivatives -- + anat = sub_dir / "anat" + anat_files = [ + f"sub-{_SUB}_desc-brain_T1w.nii.gz", + f"sub-{_SUB}_desc-T1w_mask.nii.gz", + f"sub-{_SUB}_desc-csf_mask.nii.gz", + f"sub-{_SUB}_desc-gm_mask.nii.gz", + f"sub-{_SUB}_desc-wm_mask.nii.gz", + f"sub-{_SUB}_desc-wmBBR_mask.nii.gz", + ] + for name in anat_files: + assert (anat / name).is_file(), ( + f"Missing anatomical file: {name}\n--- file tree ---\n{tree}" + ) + + # -- Functional derivatives -- + func = sub_dir / "func" + bold_stem = f"sub-{_SUB}_task-{_TASK}_run-{_RUN}" + func_files = [ + f"{bold_stem}_sbref.nii.gz", + f"{bold_stem}_desc-preproc_bold.nii.gz", + f"{bold_stem}_desc-motionParams_motion.1D", + f"{bold_stem}_desc-brain_mask.nii.gz", + ] + for name in func_files: + assert (func / name).is_file(), ( + f"Missing functional file: {name}\n--- file tree ---\n{tree}" + ) + + # -- QC -- + qc_files = list(func.glob(f"{bold_stem}_space-*_*_quality.tsv")) + assert qc_files, f"No QC quality TSV files found\n--- file tree ---\n{tree}" + + # -- Metrics -- + assert list(func.glob(f"{bold_stem}_space-*_*_timeseries.tsv")), ( + f"No timeseries TSV files found\n--- file tree ---\n{tree}" + ) + assert list(func.glob(f"{bold_stem}_space-*_*_correlations.tsv")), ( + f"No correlation matrix TSV files found\n--- file tree ---\n{tree}" + ) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.slow +def test_rbc_all_produces_derivatives(all_output: Path) -> None: + """``rbc all`` runs end-to-end and writes expected derivative files.""" + _assert_derivatives_exist(all_output) + + +@pytest.mark.slow +def test_sequential_produces_derivatives(sequential_output: Path) -> None: + """Running functional/metrics/qc individually produces the same files.""" + _assert_derivatives_exist(sequential_output) + + +@pytest.mark.slow +def test_all_vs_sequential_outputs_match( + all_output: Path, + sequential_output: Path, +) -> None: + """Both invocation styles must produce the same set of derivative files.""" + all_files = _relative_files(all_output) + seq_files = _relative_files(sequential_output) + + missing_from_seq = all_files - seq_files + extra_in_seq = seq_files - all_files + + assert not missing_from_seq, ( + "Files produced by 'rbc all' but missing from sequential run:\n" + + "\n".join(sorted(missing_from_seq)) + ) + assert not extra_in_seq, ( + "Files produced by sequential run but missing from 'rbc all':\n" + + "\n".join(sorted(extra_in_seq)) + )