Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/rbc/bids/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ def resolve_metrics(
deriv_df,
suffix=Suffix.BOLD,
desc="regressed",
extra={"reg": regressor},
extra={"reg": bids_safe_label(regressor)},
),
"cleaned_bold": mni_q.expect(
deriv_df,
suffix=Suffix.BOLD,
desc="preproc",
extra={"reg": regressor},
extra={"reg": bids_safe_label(regressor)},
),
}

Expand Down
6 changes: 4 additions & 2 deletions src/rbc/bids/qc.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ def resolve_qc(
Dict with keys matching ``single_session_qc`` parameters.
"""
return {
"template_bold": func_mni.expect(deriv_df, suffix=Suffix.BOLD, desc="preproc"),
"template_bold": func_mni.expect(
deriv_df, suffix=Suffix.BOLD, desc="preproc", extra={"reg": False}
),
"cleaned_bold": {
reg: func_mni.expect(
deriv_df,
suffix=Suffix.BOLD,
desc="preproc",
extra={"reg": reg},
extra={"reg": bids_safe_label(reg)},
)
for reg in regressors
},
Expand Down
2 changes: 1 addition & 1 deletion src/rbc/orchestration/all.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def run(

df = filters.apply(
df,
pl.col("ses") != "longitudinal",
pl.col("ses").ne_missing("longitudinal"),
pl.col("space").is_null(),
pl.col("desc").is_null(),
)
Expand Down
2 changes: 1 addition & 1 deletion src/rbc/orchestration/anatomical.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def run(

df = filters.apply(
df,
pl.col("ses") != "longitudinal",
pl.col("ses").ne_missing("longitudinal"),
pl.col("space").is_null(),
pl.col("desc").is_null(),
)
Expand Down
2 changes: 1 addition & 1 deletion src/rbc/orchestration/functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def run(

df = filters.apply(
df,
pl.col("ses") != "longitudinal",
pl.col("ses").ne_missing("longitudinal"),
pl.col("space").is_null(),
)

Expand Down
17 changes: 14 additions & 3 deletions src/rbc/workflows/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from rbc.core.metrics.smoothing import smooth
from rbc.core.metrics.standardization import compute_zscore
from rbc.core.metrics.timeseries import compute_timeseries
from rbc.core.niwrap import generate_exec_folder

if TYPE_CHECKING:
from collections.abc import Mapping
Expand Down Expand Up @@ -76,15 +77,23 @@ def single_session_metrics(
Returns:
All metric outputs bundled in a :class:`MetricsOutputs` tuple.
"""
work_dir = generate_exec_folder("metrics")

# 1. ALFF / fALFF on regressed BOLD (non-bandpassed)
_logger.info("Computing ALFF/fALFF")
alff_path, falff_path = compute_alff(
regressed_bold, template_brain_mask, tr=tr, method="qm"
regressed_bold,
template_brain_mask,
tr=tr,
method="qm",
out_file=work_dir / "alff.nii.gz",
)

# 2. ReHo on bandpass-filtered cleaned BOLD
_logger.info("Computing ReHo")
reho_path = compute_reho(cleaned_bold, template_brain_mask)
reho_path = compute_reho(
cleaned_bold, template_brain_mask, out_file=work_dir / "reho.nii.gz"
)

# 3. Smooth raw maps
_logger.info("Smoothing maps (FWHM=%.1f mm)", fwhm)
Expand All @@ -103,7 +112,9 @@ def single_session_metrics(
ts_outputs = {}
for label, atlas_path in atlas_files.items():
_logger.info("Extracting atlas timeseries (%s)", label)
ts_outputs[label] = compute_timeseries(cleaned_bold, atlas_path)
ts_outputs[label] = compute_timeseries(
cleaned_bold, atlas_path, out_dir=work_dir
)

return MetricsOutputs(
alff=alff_path,
Expand Down
6 changes: 4 additions & 2 deletions src/rbc/workflows/qc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from niwrap import fsl

from rbc.bids import TemplateSpace
from rbc.core.niwrap import generate_exec_folder
from rbc.core.qc.dvars import dvars_qc_metrics
from rbc.core.qc.motion import framewise_displacement_jenkinson, motion_qc_metrics
from rbc.core.qc.registration import registration_qc_metrics
Expand Down Expand Up @@ -85,6 +86,8 @@ def single_session_qc(
All QC outputs bundled in a :class:`QCOutputs` tuple.
"""
_logger.info("Computing QC metrics")
work_dir = generate_exec_folder("qc")

# 1. Load motion data
rms_values = np.loadtxt(rms_rel)
motion_data = np.loadtxt(motion_params)
Expand Down Expand Up @@ -157,8 +160,7 @@ def single_session_qc(
# 9. Write QC TSV
qc_outputs.qc_file[regressor] = write_xcp_qc(
qc_outputs.metrics[regressor],
template_bold.parent
/ f"sub-{sub}_ses-{ses}_task-{task}_run-{run}_reg-{regressor}_qc.tsv",
work_dir / f"reg-{regressor}_qc.tsv",
)

# 10. RBC pass/fail
Expand Down
236 changes: 236 additions & 0 deletions tests/integration/test_all.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
"""Integration test for ``rbc all`` pipeline stage handoff.

Runs the full pipeline via ``rbc all`` (in-memory handoff) and via the
individual subcommands in sequence (disk round-trip), then verifies that
both approaches complete without errors, produce key derivative files,
and yield identical outputs.

The sequential run reuses anatomical outputs from the ``rbc all`` run to
avoid running brain extraction and registration twice.
"""

from __future__ import annotations

import shutil
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING

import pytest

if TYPE_CHECKING:
from collections.abc import Sequence

_TEST_DATASET = Path(__file__).parents[1] / "data" / "ds000001"

# Subject with no session in ds000001.
_SUB = "01"
_TASK = "balloonanalogrisktask"
_RUN = "1"


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


def _rbc_exe() -> str:
exe = shutil.which("rbc")
assert exe is not None, "rbc CLI not found on PATH"
return exe


def _run_rbc(
args: Sequence[str], *, timeout: int = 7200
) -> subprocess.CompletedProcess[str]:
result = subprocess.run( # noqa: S603
[_rbc_exe(), *args],
capture_output=True,
text=True,
timeout=timeout,
)
assert result.returncode == 0, (
f"rbc {args[0]} exited with code {result.returncode}\n"
f"--- stdout ---\n{result.stdout[-2000:]}\n"
f"--- stderr ---\n{result.stderr[-2000:]}"
)
return result


_COMMON_ARGS: list[str] = ["--participant-label", _SUB]


def _relative_files(root: Path) -> set[str]:
"""Return the set of file paths relative to *root*."""
return {str(p.relative_to(root)) for p in root.rglob("*") if p.is_file()}


def _file_tree(root: Path) -> str:
"""Return a newline-separated listing of all files under *root*."""
files = sorted(p.relative_to(root) for p in root.rglob("*") if p.is_file())
return "\n".join(str(f) for f in files) if files else "(empty)"


# ---------------------------------------------------------------------------
# Fixtures — each pipeline variant runs once per session
# ---------------------------------------------------------------------------


@pytest.fixture(scope="session")
def _runner(request: pytest.FixtureRequest) -> str:
return request.config.getoption("--runner")


@pytest.fixture(scope="session")
def all_output(tmp_path_factory: pytest.TempPathFactory, _runner: str) -> Path:
"""Run ``rbc all`` once and return the output directory."""
out = tmp_path_factory.mktemp("all") / "derivatives"
out.mkdir()
_run_rbc(
[
"all",
str(_TEST_DATASET),
"-o",
str(out),
"--runner",
_runner,
*_COMMON_ARGS,
]
)
return out


@pytest.fixture(scope="session")
def sequential_output(
tmp_path_factory: pytest.TempPathFactory,
_runner: str,
all_output: Path,
) -> Path:
"""Run functional/metrics/qc using anat outputs from ``rbc all``.

Copies the anatomical derivatives produced by the ``all_output``
fixture so that only the functional, metrics, and QC stages run,
saving ~30-40 min of redundant brain extraction and registration.
"""
out = tmp_path_factory.mktemp("sequential") / "derivatives"
out.mkdir()

# Seed with anatomical outputs + dataset_description from rbc all
anat_src = all_output / f"sub-{_SUB}" / "anat"
anat_dst = out / f"sub-{_SUB}" / "anat"
shutil.copytree(anat_src, anat_dst)
shutil.copy2(
all_output / "dataset_description.json",
out / "dataset_description.json",
)

runner_args = ["--runner", _runner, *_COMMON_ARGS]
raw = str(_TEST_DATASET)
deriv = str(out)

# functional (raw BIDS + anat derivatives)
_run_rbc(["functional", raw, deriv, "-o", deriv, *runner_args])

# metrics (derivatives from previous stages)
_run_rbc(["metrics", raw, deriv, "-o", deriv, *runner_args])

# qc (derivatives from previous stages)
_run_rbc(["qc", raw, deriv, "-o", deriv, *runner_args])

return out


# ---------------------------------------------------------------------------
# Assertion helpers
# ---------------------------------------------------------------------------


def _assert_derivatives_exist(output_dir: Path) -> None:
"""Check that all expected derivative files are present."""
tree = _file_tree(output_dir)
sub_dir = output_dir / f"sub-{_SUB}"

# -- Dataset-level metadata --
assert (output_dir / "dataset_description.json").is_file(), (
f"Missing dataset_description.json\n--- file tree ---\n{tree}"
)

# -- Anatomical derivatives --
anat = sub_dir / "anat"
anat_files = [
f"sub-{_SUB}_desc-brain_T1w.nii.gz",
f"sub-{_SUB}_desc-T1w_mask.nii.gz",
f"sub-{_SUB}_desc-csf_mask.nii.gz",
f"sub-{_SUB}_desc-gm_mask.nii.gz",
f"sub-{_SUB}_desc-wm_mask.nii.gz",
f"sub-{_SUB}_desc-wmBBR_mask.nii.gz",
]
for name in anat_files:
assert (anat / name).is_file(), (
f"Missing anatomical file: {name}\n--- file tree ---\n{tree}"
)

# -- Functional derivatives --
func = sub_dir / "func"
bold_stem = f"sub-{_SUB}_task-{_TASK}_run-{_RUN}"
func_files = [
f"{bold_stem}_sbref.nii.gz",
f"{bold_stem}_desc-preproc_bold.nii.gz",
f"{bold_stem}_desc-motionParams_motion.1D",
f"{bold_stem}_desc-brain_mask.nii.gz",
]
for name in func_files:
assert (func / name).is_file(), (
f"Missing functional file: {name}\n--- file tree ---\n{tree}"
)

# -- QC --
qc_files = list(func.glob(f"{bold_stem}_space-*_*_quality.tsv"))
assert qc_files, f"No QC quality TSV files found\n--- file tree ---\n{tree}"

# -- Metrics --
assert list(func.glob(f"{bold_stem}_space-*_*_timeseries.tsv")), (
f"No timeseries TSV files found\n--- file tree ---\n{tree}"
)
assert list(func.glob(f"{bold_stem}_space-*_*_correlations.tsv")), (
f"No correlation matrix TSV files found\n--- file tree ---\n{tree}"
)


# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------


@pytest.mark.slow
def test_rbc_all_produces_derivatives(all_output: Path) -> None:
"""``rbc all`` runs end-to-end and writes expected derivative files."""
_assert_derivatives_exist(all_output)


@pytest.mark.slow
def test_sequential_produces_derivatives(sequential_output: Path) -> None:
"""Running functional/metrics/qc individually produces the same files."""
_assert_derivatives_exist(sequential_output)


@pytest.mark.slow
def test_all_vs_sequential_outputs_match(
all_output: Path,
sequential_output: Path,
) -> None:
"""Both invocation styles must produce the same set of derivative files."""
all_files = _relative_files(all_output)
seq_files = _relative_files(sequential_output)

missing_from_seq = all_files - seq_files
extra_in_seq = seq_files - all_files

assert not missing_from_seq, (
"Files produced by 'rbc all' but missing from sequential run:\n"
+ "\n".join(sorted(missing_from_seq))
)
assert not extra_in_seq, (
"Files produced by sequential run but missing from 'rbc all':\n"
+ "\n".join(sorted(extra_in_seq))
)