diff --git a/src/rbc/bids/longitudinal/metrics.py b/src/rbc/bids/longitudinal/metrics.py new file mode 100644 index 00000000..93e55704 --- /dev/null +++ b/src/rbc/bids/longitudinal/metrics.py @@ -0,0 +1,64 @@ +"""BIDS resolve and export for the longitudinal metrics workflow. + +Reuses the cross-sectional :func:`~rbc.bids.metrics.export_metrics` for +export since the output structure (ALFF, fALFF, ReHo, timeseries, +correlations) is identical -- only the space changes to ``longitudinal``. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, TypedDict + +from rbc.bids import Suffix, bids_safe_label + +if TYPE_CHECKING: + from pathlib import Path + + import polars as pl + + from rbc.bids import Bids + + +class MetricsLongInputs(TypedDict): + """Resolved functional inputs for the longitudinal metrics workflow.""" + + template_brain_mask: Path + regressed_bold: Path + cleaned_bold: Path + + +def resolve_longitudinal_metrics( + func_long_q: Bids, + func_long_df: pl.DataFrame, + *, + regressor: str, +) -> MetricsLongInputs: + """Resolve longitudinal-space functional derivatives for metrics. + + Args: + func_long_q: Bids builder configured for ``space=longitudinal`` + func queries. + func_long_df: DataFrame of longitudinal-space derivative outputs. + regressor: Single regressor name (e.g. ``"36-parameter"``). + + Returns: + Dict with keys: ``template_brain_mask``, ``regressed_bold``, + ``cleaned_bold``. + """ + return { + "template_brain_mask": func_long_q.expect( + func_long_df, suffix=Suffix.MASK, desc="brain" + ), + "regressed_bold": func_long_q.expect( + func_long_df, + suffix=Suffix.BOLD, + desc="regressed", + extra={"reg": bids_safe_label(regressor)}, + ), + "cleaned_bold": func_long_q.expect( + func_long_df, + suffix=Suffix.BOLD, + desc="preproc", + extra={"reg": bids_safe_label(regressor)}, + ), + } diff --git a/src/rbc/bids/longitudinal/qc.py b/src/rbc/bids/longitudinal/qc.py new file mode 100644 index 00000000..821c880d --- /dev/null +++ b/src/rbc/bids/longitudinal/qc.py @@ -0,0 +1,149 @@ +"""BIDS resolve and export for the longitudinal QC workflow. + +Longitudinal QC is minimal: Dice/Jaccard overlap between the anatomical +brain mask and BOLD brain mask, both in longitudinal template space. +""" + +from __future__ import annotations + +import csv +from typing import TYPE_CHECKING, NamedTuple, TypedDict + +from rbc.bids import Suffix + +if TYPE_CHECKING: + from pathlib import Path + + import polars as pl + + from rbc.bids import Bids + + +class QCLongInputs(TypedDict): + """Resolved inputs for the longitudinal registration QC.""" + + anat_brain_mask: Path + bold_mask: Path + + +class LongitudinalQCOutputs(NamedTuple): + """QC outputs from the longitudinal registration overlap check. + + Attributes: + dice: Dice coefficient between anat and BOLD masks. + jaccard: Jaccard index between anat and BOLD masks. + coverage: Coverage of the smaller mask by the overlap. + cross_corr: Pearson correlation between flattened masks. + passed: Whether the run passes the Dice threshold. + qc_file: Path to the written single-row QC TSV. + """ + + dice: float + jaccard: float + coverage: float + cross_corr: float + passed: bool + qc_file: Path + + +def resolve_longitudinal_qc( + anat_long_q: Bids, + func_long_q: Bids, + anat_long_df: pl.DataFrame, + func_long_df: pl.DataFrame, +) -> QCLongInputs: + """Resolve longitudinal-space masks for registration QC. + + Args: + anat_long_q: Bids builder for ``space=longitudinal`` anat queries. + func_long_q: Bids builder for ``space=longitudinal`` func queries. + anat_long_df: DataFrame of longitudinal anatomical derivatives. + func_long_df: DataFrame of longitudinal functional derivatives. + + Returns: + Dict with ``anat_brain_mask`` and ``bold_mask`` paths. + """ + return { + "anat_brain_mask": anat_long_q.expect( + anat_long_df, suffix=Suffix.MASK, desc="T1w" + ), + "bold_mask": func_long_q.expect(func_long_df, suffix=Suffix.MASK, desc="brain"), + } + + +def export_longitudinal_qc( + func_long: Bids, + outputs: LongitudinalQCOutputs, +) -> None: + """Export longitudinal QC results as a single-row TSV. + + Args: + func_long: Bids builder with ``space="longitudinal"`` for func. + outputs: QC overlap metrics. + """ + func_long.save( + outputs.qc_file, + suffix="quality", + desc="registration", + extension=".tsv", + ) + + +def write_longitudinal_qc_tsv( + out_path: Path, + *, + sub: str, + ses: str, + task: str, + run: int | str, + dice: float, + jaccard: float, + coverage: float, + cross_corr: float, + passed: bool, +) -> Path: + """Write a single-row longitudinal QC TSV. + + Args: + out_path: Destination file path. + sub: Subject ID. + ses: Session label. + task: Task label. + run: Run number. + dice: Dice coefficient. + jaccard: Jaccard index. + coverage: Coverage metric. + cross_corr: Cross-correlation. + passed: Pass/fail flag. + + Returns: + The written file path. + """ + fieldnames = [ + "sub", + "ses", + "task", + "run", + "dice", + "jaccard", + "coverage", + "cross_corr", + "passed", + ] + row = { + "sub": sub, + "ses": ses, + "task": task, + "run": run, + "dice": f"{dice:.6f}", + "jaccard": f"{jaccard:.6f}", + "coverage": f"{coverage:.6f}", + "cross_corr": f"{cross_corr:.6f}", + "passed": str(passed), + } + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.DictWriter(fh, fieldnames=fieldnames, delimiter="\t") + writer.writeheader() + writer.writerow(row) + return out_path diff --git a/src/rbc/cli/longitudinal/all.py b/src/rbc/cli/longitudinal/all.py index 493d0858..7f0f27e8 100644 --- a/src/rbc/cli/longitudinal/all.py +++ b/src/rbc/cli/longitudinal/all.py @@ -1,11 +1,16 @@ -"""``rbc longitudinal all`` subcommand (placeholder for Stage 6).""" +"""``rbc longitudinal all`` subcommand.""" from __future__ import annotations from dataclasses import dataclass -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal -from rbc.cli.base import _or_default, _validate_nifti_path, _validate_positive +from rbc.cli.base import ( + _or_default, + _validate_nifti_path, + _validate_positive, + _validate_task, +) from rbc.cli.longitudinal._base import LongitudinalBaseArgs, add_fs_license_argument from rbc.cli.metrics import _resolve_atlas_args from rbc.orchestration import Filters, RunnerConfig @@ -25,11 +30,14 @@ class AllLongArgs(LongitudinalBaseArgs): registration_template: Path atlas_files: dict[str, Path] fwhm: float + regressor: Sequence[Literal["36-parameter", "aCompCor"]] + task: str | None @classmethod def validate_namespace(cls, ns: argparse.Namespace) -> AllLongArgs: """Validate namespace for the full longitudinal pipeline subcommand.""" _validate_positive(ns.fwhm, "FWHM") + _validate_task(ns.task) return cls( **LongitudinalBaseArgs.validate_namespace(ns).__dict__, registration_template=_or_default( @@ -37,6 +45,8 @@ def validate_namespace(cls, ns: argparse.Namespace) -> AllLongArgs: ), atlas_files=_resolve_atlas_args(ns.atlas), fwhm=ns.fwhm, + regressor=ns.regressor, + task=ns.task, ) @@ -48,7 +58,9 @@ def main(args: AllLongArgs) -> int: filters=Filters( participant_label=args.participant_label, session_label=args.session_label, + task=args.task, ), + regressors=args.regressor, fs_license=args.fs_license, atlas_files=args.atlas_files, fwhm=args.fwhm, @@ -66,21 +78,33 @@ def register_command( subparsers: argparse._SubParsersAction, parents: Sequence[argparse.ArgumentParser], ) -> None: - """Register ``rbc longitudinal all`` (Stage 6 placeholder).""" + """Register ``rbc longitudinal all``.""" parser = subparsers.add_parser( "all", parents=parents, description=( - "Run the full longitudinal pipeline (template → anat → func → " - "metrics → qc). Placeholder wired up by Stage 3; full " - "implementation ships in Stage 6." + "Run the full longitudinal pipeline (template -> anat -> func -> " + "metrics -> qc). Passes functional outputs in-memory to metrics " + "and QC stages." ), - help="Full longitudinal pipeline (Stage 6)", + help="Full longitudinal pipeline", usage=( "rbc longitudinal all INPUT_DIR [INPUT_DIR ...] -o OUTPUT_DIR [options]" ), ) add_fs_license_argument(parser) + parser.add_argument( + "--regressor", + nargs="+", + choices=["36-parameter", "aCompCor"], + default=["36-parameter"], + help="Space-delimited nuisance regression method(s) to apply.", + ) + parser.add_argument( + "--task", + default=None, + help="Task label to filter BOLD runs (without 'task-' prefix).", + ) parser.add_argument( "--atlas", nargs="+", diff --git a/src/rbc/cli/longitudinal/metrics.py b/src/rbc/cli/longitudinal/metrics.py index e642bb96..a09a0bd3 100644 --- a/src/rbc/cli/longitudinal/metrics.py +++ b/src/rbc/cli/longitudinal/metrics.py @@ -1,9 +1,9 @@ -"""``rbc longitudinal metrics`` subcommand (placeholder for Stage 6).""" +"""``rbc longitudinal metrics`` subcommand.""" from __future__ import annotations from dataclasses import dataclass -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal from rbc.cli.base import _validate_positive, _validate_task from rbc.cli.longitudinal._base import LongitudinalBaseArgs, add_fs_license_argument @@ -24,18 +24,23 @@ class MetricsLongArgs(LongitudinalBaseArgs): atlas_files: dict[str, Path] fwhm: float + tr: float | None task: str | None + regressor: Sequence[Literal["36-parameter", "aCompCor"]] @classmethod def validate_namespace(cls, ns: argparse.Namespace) -> MetricsLongArgs: """Validate namespace for the longitudinal metrics subcommand.""" _validate_task(ns.task) _validate_positive(ns.fwhm, "FWHM") + _validate_positive(ns.tr, "TR") return cls( **LongitudinalBaseArgs.validate_namespace(ns).__dict__, atlas_files=_resolve_atlas_args(ns.atlas), fwhm=ns.fwhm, + tr=ns.tr, task=ns.task, + regressor=ns.regressor, ) @@ -49,8 +54,10 @@ def main(args: MetricsLongArgs) -> int: session_label=args.session_label, task=args.task, ), + regressors=args.regressor, atlas_files=args.atlas_files, fwhm=args.fwhm, + tr=args.tr, runner_config=RunnerConfig( runner=args.runner, verbose=bool(args.verbose), @@ -65,20 +72,27 @@ def register_command( subparsers: argparse._SubParsersAction, parents: Sequence[argparse.ArgumentParser], ) -> None: - """Register ``rbc longitudinal metrics`` (Stage 6 placeholder).""" + """Register ``rbc longitudinal metrics``.""" parser = subparsers.add_parser( "metrics", parents=parents, description=( - "Compute resting-state metrics in longitudinal space. Placeholder " - "wired up by Stage 3; full implementation ships in Stage 6." + "Compute resting-state metrics (ALFF, fALFF, ReHo, atlas " + "timeseries) on longitudinal-space functional derivatives." ), - help="Longitudinal metrics stage (Stage 6)", + help="Compute resting-state metrics in longitudinal space", usage=( "rbc longitudinal metrics INPUT_DIR [INPUT_DIR ...] -o OUTPUT_DIR [options]" ), ) add_fs_license_argument(parser) + parser.add_argument( + "--regressor", + nargs="+", + choices=["36-parameter", "aCompCor"], + default=["36-parameter"], + help=("Space-delimited nuisance regression method(s) to compute metrics for."), + ) parser.add_argument( "--atlas", nargs="+", @@ -96,6 +110,12 @@ def register_command( default=6.0, help="Smoothing kernel FWHM in mm.", ) + parser.add_argument( + "--tr", + type=float, + default=None, + help="Repetition time in seconds. Overrides NIfTI header.", + ) parser.add_argument( "--task", default=None, diff --git a/src/rbc/cli/longitudinal/qc.py b/src/rbc/cli/longitudinal/qc.py index 6ebbf264..7c5ae7ce 100644 --- a/src/rbc/cli/longitudinal/qc.py +++ b/src/rbc/cli/longitudinal/qc.py @@ -1,4 +1,4 @@ -"""``rbc longitudinal qc`` subcommand (placeholder for Stage 6).""" +"""``rbc longitudinal qc`` subcommand.""" from __future__ import annotations @@ -39,15 +39,16 @@ def register_command( subparsers: argparse._SubParsersAction, parents: Sequence[argparse.ArgumentParser], ) -> None: - """Register ``rbc longitudinal qc`` (Stage 6 placeholder).""" + """Register ``rbc longitudinal qc``.""" parser = subparsers.add_parser( "qc", parents=parents, description=( - "Run registration QC for longitudinal derivatives. Placeholder " - "wired up by Stage 3; full implementation ships in Stage 6." + "Run registration QC for longitudinal derivatives. Computes " + "Dice/Jaccard overlap between anatomical and BOLD brain masks " + "in longitudinal template space." ), - help="Longitudinal QC stage (Stage 6)", + help="Longitudinal registration QC (mask overlap)", usage=("rbc longitudinal qc INPUT_DIR [INPUT_DIR ...] -o OUTPUT_DIR [options]"), ) add_fs_license_argument(parser) diff --git a/src/rbc/metadata.py b/src/rbc/metadata.py index e3708671..09161627 100644 --- a/src/rbc/metadata.py +++ b/src/rbc/metadata.py @@ -24,7 +24,7 @@ _TR_PLAUSIBLE_HIGH = 20.0 # very slow sparse designs -def _resolve_tr( +def resolve_tr( *, sidecar_tr: float | None, header_tr: float | None, @@ -90,7 +90,7 @@ def _resolve_tr( raise ValueError(msg) -def _warn_implausible_tr(tr: float) -> None: +def warn_implausible_tr(tr: float) -> None: """Log a warning if TR falls outside the plausible fMRI range.""" if tr < _TR_PLAUSIBLE_LOW: _logger.warning( @@ -169,12 +169,12 @@ def load( raw_pixdim = float(hdr["pixdim"][4]) # type: ignore[index] header_tr: float | None = raw_pixdim if raw_pixdim > 0 else None - tr = _resolve_tr( + tr = resolve_tr( sidecar_tr=sidecar_tr, header_tr=header_tr, override=tr_override, ) - _warn_implausible_tr(tr) + warn_implausible_tr(tr) slice_timing: list[float] | None = sidecar.get("SliceTiming") if slice_timing is not None: diff --git a/src/rbc/orchestration/longitudinal/__init__.py b/src/rbc/orchestration/longitudinal/__init__.py index 41fc5345..c783f39c 100644 --- a/src/rbc/orchestration/longitudinal/__init__.py +++ b/src/rbc/orchestration/longitudinal/__init__.py @@ -5,5 +5,13 @@ from rbc.orchestration.longitudinal._iter import iter_sessions_with_template from rbc.orchestration.longitudinal.anatomical import process_anat from rbc.orchestration.longitudinal.functional import process_func +from rbc.orchestration.longitudinal.metrics import process_metrics +from rbc.orchestration.longitudinal.qc import process_qc -__all__ = ["iter_sessions_with_template", "process_anat", "process_func"] +__all__ = [ + "iter_sessions_with_template", + "process_anat", + "process_func", + "process_metrics", + "process_qc", +] diff --git a/src/rbc/orchestration/longitudinal/all.py b/src/rbc/orchestration/longitudinal/all.py index 40fb36f3..bd8902d0 100644 --- a/src/rbc/orchestration/longitudinal/all.py +++ b/src/rbc/orchestration/longitudinal/all.py @@ -1,35 +1,179 @@ -"""Orchestration for the combined longitudinal pipeline (not yet implemented).""" +"""Orchestration for the combined longitudinal pipeline. + +Runs template, anatomical, functional, metrics, and QC in sequence for +each subject, passing outputs in-memory between stages where possible. + +The template step is inherently cross-session and writes to disk. +Subsequent per-session stages (anat, func) resolve their own inputs +from disk but return outputs for in-memory handoff to metrics and QC. +""" from __future__ import annotations +import logging from typing import TYPE_CHECKING +from tqdm import tqdm + +from rbc.bids import FUNC_GROUP_ENTITIES, Datatype, Suffix, extract_entities, load_table +from rbc.bids.longitudinal.template import discover_template_inputs +from rbc.bids.metrics import export_metrics +from rbc.bids.session import iter_session_files +from rbc.context import RunContext +from rbc.orchestration import Filters, RunnerConfig, init_runner +from rbc.orchestration.longitudinal._iter import iter_sessions_with_template +from rbc.orchestration.longitudinal.anatomical import process_anat +from rbc.orchestration.longitudinal.functional import process_func +from rbc.orchestration.longitudinal.metrics import _read_derivative_tr +from rbc.orchestration.longitudinal.qc import process_qc +from rbc.orchestration.longitudinal.template import ( + process_subject, + setup_freesurfer_auth, +) +from rbc.workflows.metrics import single_session_metrics + if TYPE_CHECKING: - from collections.abc import Sequence + from collections.abc import Mapping, Sequence from pathlib import Path - - from rbc.orchestration import Filters, RunnerConfig + from typing import Literal __all__ = ["run"] +_logger = logging.getLogger(__name__) + def run( input_dirs: Sequence[Path], output_dir: Path, *, filters: Filters, + regressors: Sequence[Literal["36-parameter", "aCompCor"]] = ("36-parameter",), fs_license: Path | None = None, - atlas_files: dict[str, Path] | None = None, - fwhm: float | None = None, + atlas_files: Mapping[str, Path] | None = None, + fwhm: float = 6.0, runner_config: RunnerConfig | None = None, ) -> None: - """Run the full longitudinal pipeline (template → anat → func → metrics → qc). + """Run the full longitudinal pipeline (template -> anat -> func -> metrics -> qc). - Placeholder wired up by Stage 3; full implementation ships in Stage 6. + Unlike running each workflow separately, this passes functional + outputs in-memory to the metrics and QC stages without disk + round-trips. + + Args: + input_dirs: BIDS dataset directories (must include preprocessed + cross-sectional derivatives). + output_dir: Output directory for derivatives. + filters: Participant/session/task filters. + regressors: Regressor strategies to apply. + fs_license: Optional FreeSurfer license for template generation. + atlas_files: Mapping of atlas labels to NIfTI file paths. + If ``None``, metrics are skipped. + fwhm: Smoothing kernel FWHM in mm. + runner_config: Execution backend configuration. """ - del input_dirs, output_dir, filters, fs_license - del atlas_files, fwhm, runner_config - raise NotImplementedError( - "rbc longitudinal all is planned for Stage 6 of the longitudinal " - "refactor (tracker: #301). It is not yet implemented." + config = runner_config or RunnerConfig() + init_runner(config) + verbose = config.verbose + + _logger.warning( + "This workflow is experimental and may be sensitive to input file " + "naming conventions." + ) + _logger.info("Preparing to run RBC full longitudinal pipeline") + + # --- Step 1: Template generation (cross-session, writes to disk) --- + setup_freesurfer_auth(fs_license) + + df = load_table( + dataset_dirs=input_dirs, index_fpath=None, max_workers=0, verbose=verbose ) + # Template discovery needs all sessions (not just the filtered ones), + # so apply only the participant filter here. + tpl_filters = Filters(participant_label=filters.participant_label) + tpl_inputs, skipped = discover_template_inputs(tpl_filters.apply(df)) + for sub in skipped: + _logger.warning( + "Skipping sub-%s: only one preprocessed T1w brain volume found.", + sub, + ) + + for subject_inputs in tqdm(tpl_inputs, desc="Templates", disable=not verbose): + pipe_ctx = RunContext(sub=subject_inputs.sub, ses=None, output_dir=output_dir) + process_subject(pipe_ctx, subject_inputs) + pipe_ctx.ensure_dataset_description() + + _logger.info("Template generation complete; proceeding to per-session stages") + + # --- Step 2: Per-session anat -> func -> metrics -> qc --- + for pipe_ctx, session, tpl_df in iter_sessions_with_template( + input_dirs, output_dir, filters=filters, verbose=verbose + ): + # Anatomical + for _, anat_df in session.anat.group_by(("run", "acq"), maintain_order=True): + anat_outputs = process_anat( + pipe_ctx=pipe_ctx, anat_df=anat_df, tpl_df=tpl_df + ) + + # Functional + Metrics + QC (per BOLD run) + for func_df, _ in iter_session_files(session, groupby=FUNC_GROUP_ENTITIES): + func_outputs = process_func( + pipe_ctx=pipe_ctx, + func_df=func_df, + tpl_df=tpl_df, + regressors=regressors, + ) + + row = func_df.filter(suffix=Suffix.BOLD).row(0, named=True) + ents = extract_entities(row, ["task", "run"]) + func_q = pipe_ctx.bids(datatype=Datatype.FUNC, entities=ents) + func_long = func_q.derive(space="longitudinal") + + # Metrics (per regressor, in-memory from func_outputs) + if atlas_files: + tr = _read_derivative_tr(func_outputs.bold) + for regressor in regressors: + _logger.info( + "Longitudinal metrics: sub-%s ses-%s regressor-%s", + pipe_ctx.sub, + pipe_ctx.ses, + regressor, + ) + metrics_outputs = single_session_metrics( + regressed_bold=func_outputs.regressed_bold[regressor], + cleaned_bold=func_outputs.cleaned_bold[regressor], + template_brain_mask=func_outputs.bold_mask, + tr=tr, + atlas_files=atlas_files, + fwhm=fwhm, + ) + export_metrics( + func_long, + metrics_outputs, + regressor=regressor, + atlases=list(atlas_files), + ) + + # QC (in-memory from func_outputs + anat_outputs) + if anat_outputs.brain_mask is None: + _logger.warning( + "Skipping longitudinal QC for sub-%s ses-%s: " + "no anatomical brain mask in longitudinal space.", + pipe_ctx.sub, + pipe_ctx.ses, + ) + continue + + _logger.info("Longitudinal QC: sub-%s ses-%s", pipe_ctx.sub, pipe_ctx.ses) + process_qc( + func_long, + anat_brain_mask=anat_outputs.brain_mask, + bold_mask=func_outputs.bold_mask, + sub=pipe_ctx.sub, + ses=pipe_ctx.ses or "", + task=ents.get("task", ""), + run=ents.get("run", 0), + ) + + pipe_ctx.ensure_dataset_description() + + _logger.info("RBC full longitudinal pipeline complete") diff --git a/src/rbc/orchestration/longitudinal/anatomical.py b/src/rbc/orchestration/longitudinal/anatomical.py index 5e46ac50..509116d1 100644 --- a/src/rbc/orchestration/longitudinal/anatomical.py +++ b/src/rbc/orchestration/longitudinal/anatomical.py @@ -14,6 +14,9 @@ ) from rbc.orchestration import Filters, RunnerConfig, init_runner from rbc.orchestration.longitudinal._iter import iter_sessions_with_template +from rbc.workflows.longitudinal.anatomical import ( + AnatomicalLongOutputs, +) from rbc.workflows.longitudinal.anatomical import ( longitudinal_process as anatomical_longitudinal, ) @@ -35,7 +38,7 @@ def process_anat( anat_df: pl.DataFrame, tpl_df: pl.DataFrame, registration_template: Path = REGISTRATION_TEMPLATES.brain_1mm, -) -> None: +) -> AnatomicalLongOutputs: """Handle anatomical longitudinal processing for one anat group. Args: @@ -43,6 +46,9 @@ def process_anat( anat_df: Anatomical derivative DataFrame for this group. tpl_df: Longitudinal template DataFrame. registration_template: Brain template for ANTs registration. + + Returns: + Workflow outputs for in-memory handoff to downstream stages. """ anat_df = anat_df.filter(pl.col("space").is_null()) ents = extract_entities(anat_df.row(0, named=True), ["run"]) @@ -63,6 +69,7 @@ def process_anat( ) aex = anat_q.derive(entities=ents, space="longitudinal") export_longitudinal_anat(aex, outputs) + return outputs def run( @@ -96,9 +103,7 @@ def run( for pipe_ctx, session, tpl_df in iter_sessions_with_template( input_dirs, output_dir, filters=filters, verbose=verbose ): - for _, anat_df in session.anat.filter(pl.col("suffix") == "T1w").group_by( - ("run", "acq"), maintain_order=True - ): + for _, anat_df in session.anat.group_by(("run", "acq"), maintain_order=True): process_anat( pipe_ctx=pipe_ctx, anat_df=anat_df, diff --git a/src/rbc/orchestration/longitudinal/functional.py b/src/rbc/orchestration/longitudinal/functional.py index d5145be1..dc971b38 100644 --- a/src/rbc/orchestration/longitudinal/functional.py +++ b/src/rbc/orchestration/longitudinal/functional.py @@ -13,6 +13,9 @@ from rbc.bids.session import iter_session_files from rbc.orchestration import Filters, RunnerConfig, init_runner from rbc.orchestration.longitudinal._iter import iter_sessions_with_template +from rbc.workflows.longitudinal.functional import ( + FunctionalLongOutputs, +) from rbc.workflows.longitudinal.functional import ( longitudinal_process as functional_longitudinal, ) @@ -37,7 +40,7 @@ def process_func( tpl_df: pl.DataFrame, *, regressors: Sequence[Literal["36-parameter", "aCompCor"]] = ("36-parameter",), -) -> None: +) -> FunctionalLongOutputs: """Handle functional longitudinal processing for one BOLD run. Args: @@ -45,6 +48,9 @@ def process_func( func_df: Functional derivative DataFrame for this run. tpl_df: Longitudinal template DataFrame. regressors: Regressor strategies to apply in longitudinal space. + + Returns: + Workflow outputs for in-memory handoff to downstream stages. """ row = func_df.filter(suffix=Suffix.BOLD).row(0, named=True) ents = extract_entities(row, ["task", "run"]) @@ -63,6 +69,7 @@ def process_func( func_outputs = functional_longitudinal(**resolved) # type: ignore[arg-type] fex = func_q.derive(space="longitudinal") export_longitudinal_func(fex, func_outputs, regressors=regressors) + return func_outputs def run( diff --git a/src/rbc/orchestration/longitudinal/metrics.py b/src/rbc/orchestration/longitudinal/metrics.py index 8aaf699f..8137eb54 100644 --- a/src/rbc/orchestration/longitudinal/metrics.py +++ b/src/rbc/orchestration/longitudinal/metrics.py @@ -1,16 +1,91 @@ -"""Orchestration for the longitudinal metrics workflow (not yet implemented).""" +"""Orchestration for the longitudinal metrics workflow. + +Computes resting-state metrics (ALFF, fALFF, ReHo, atlas timeseries) +on longitudinal-space functional derivatives produced by the functional +stage. Reuses :func:`~rbc.workflows.metrics.single_session_metrics` +unchanged; only the input resolution targets ``space=longitudinal``. +""" from __future__ import annotations +import logging from typing import TYPE_CHECKING +import nibabel as nib +import polars as pl + +from rbc.bids import FUNC_GROUP_ENTITIES, Datatype, Suffix, extract_entities +from rbc.bids.longitudinal.metrics import resolve_longitudinal_metrics +from rbc.bids.metrics import export_metrics +from rbc.bids.session import iter_session_files +from rbc.metadata import resolve_tr, warn_implausible_tr +from rbc.orchestration import Filters, RunnerConfig, init_runner +from rbc.orchestration.longitudinal._iter import iter_sessions_with_template +from rbc.workflows.metrics import single_session_metrics + if TYPE_CHECKING: - from collections.abc import Sequence + from collections.abc import Mapping, Sequence from pathlib import Path - from rbc.orchestration import Filters, RunnerConfig + from rbc.bids import Bids + from rbc.workflows.longitudinal.functional import FunctionalLongOutputs + from rbc.workflows.metrics import MetricsOutputs + +__all__ = ["process_metrics", "run"] + +_logger = logging.getLogger(__name__) + + +def _read_derivative_tr(nifti_path: Path, *, override: float | None = None) -> float: + """Resolve TR from a derivative NIfTI, with optional CLI override. + + Pipes through :func:`~rbc.metadata.resolve_tr` for validation and + plausibility warnings, matching the cross-sectional metadata path. + Derivatives have no BIDS sidecar, so ``sidecar_tr`` is always None. + """ + hdr = nib.nifti1.load(nifti_path).header + raw_pixdim = float(hdr["pixdim"][4]) # type: ignore[index] + header_tr = raw_pixdim if raw_pixdim > 0 else None + tr = resolve_tr(sidecar_tr=None, header_tr=header_tr, override=override) + warn_implausible_tr(tr) + return tr + + +def process_metrics( + func_long: Bids, + *, + func_outputs: FunctionalLongOutputs, + tr: float, + regressor: str, + atlas_files: Mapping[str, Path], + fwhm: float, +) -> MetricsOutputs: + """Run metrics for a single regressor on a single longitudinal BOLD run. + + Used by ``orchestration.longitudinal.all`` to process in-memory + functional outputs. + + Args: + func_long: Bids builder with ``space="longitudinal"`` for this run. + func_outputs: Functional outputs (in-memory from ``all`` pipeline). + tr: Repetition time in seconds. + regressor: Regressor name. + atlas_files: Mapping of atlas labels to resolved NIfTI file paths. + fwhm: Smoothing kernel FWHM in mm. -__all__ = ["run"] + Returns: + Metrics outputs for this run/regressor. + """ + outputs = single_session_metrics( + regressed_bold=func_outputs.regressed_bold[regressor], + cleaned_bold=func_outputs.cleaned_bold[regressor], + template_brain_mask=func_outputs.bold_mask, + tr=tr, + atlas_files=atlas_files, + fwhm=fwhm, + ) + export_metrics(func_long, outputs, regressor=regressor, atlases=list(atlas_files)) + return outputs def run( @@ -18,16 +93,74 @@ def run( output_dir: Path, *, filters: Filters, - atlas_files: dict[str, Path], + regressors: Sequence[str], + atlas_files: Mapping[str, Path], fwhm: float, + tr: float | None = None, runner_config: RunnerConfig | None = None, ) -> None: """Compute resting-state metrics in longitudinal space. - Placeholder wired up by Stage 3; full implementation ships in Stage 6. + Resolves per-regressor ``regressed_bold``/``cleaned_bold`` and the + brain mask from longitudinal-space functional derivatives, reads TR + from the NIfTI header (with optional CLI override), and calls + :func:`~rbc.workflows.metrics.single_session_metrics` unchanged. + + Args: + input_dirs: BIDS dataset directories (must include longitudinal + functional derivatives). + output_dir: Output directory for derivatives. + filters: Participant/session/task filters. + regressors: Regressor strategy names to compute metrics for. + atlas_files: Mapping of atlas labels to resolved NIfTI file paths. + fwhm: Smoothing kernel FWHM in mm. + tr: TR override in seconds, or ``None`` to read from headers. + runner_config: Execution backend configuration. """ - del input_dirs, output_dir, filters, atlas_files, fwhm, runner_config - raise NotImplementedError( - "rbc longitudinal metrics is planned for Stage 6 of the longitudinal " - "refactor (tracker: #301). It is not yet implemented." - ) + config = runner_config or RunnerConfig() + init_runner(config) + verbose = config.verbose + + _logger.info("Preparing to run RBC longitudinal metrics workflow") + + for pipe_ctx, session, tpl_df in iter_sessions_with_template( + input_dirs, output_dir, filters=filters, verbose=verbose + ): + for func_df, _ in iter_session_files(session, groupby=FUNC_GROUP_ENTITIES): + row = func_df.filter(suffix=Suffix.BOLD).row(0, named=True) + ents = extract_entities(row, ["task", "run"]) + + func_q = pipe_ctx.bids(datatype=Datatype.FUNC, entities=ents) + func_long_q = func_q.derive(space="longitudinal") + + # Build a DataFrame of longitudinal-space functional derivatives + full_df = pl.concat([func_df, tpl_df]) + func_long_df = full_df.filter(pl.col("space") == "longitudinal") + + for regressor in regressors: + resolved = resolve_longitudinal_metrics( + func_long_q, func_long_df, regressor=regressor + ) + run_tr = ( + tr + if tr is not None + else _read_derivative_tr(resolved["regressed_bold"]) + ) + outputs = single_session_metrics( + regressed_bold=resolved["regressed_bold"], + cleaned_bold=resolved["cleaned_bold"], + template_brain_mask=resolved["template_brain_mask"], + tr=run_tr, + atlas_files=atlas_files, + fwhm=fwhm, + ) + export_metrics( + func_long_q, + outputs, + regressor=regressor, + atlases=list(atlas_files), + ) + + pipe_ctx.ensure_dataset_description() + + _logger.info("RBC longitudinal metrics workflow complete") diff --git a/src/rbc/orchestration/longitudinal/qc.py b/src/rbc/orchestration/longitudinal/qc.py index 278a41ed..0fd8b7e0 100644 --- a/src/rbc/orchestration/longitudinal/qc.py +++ b/src/rbc/orchestration/longitudinal/qc.py @@ -1,16 +1,113 @@ -"""Orchestration for the longitudinal QC workflow (not yet implemented).""" +"""Orchestration for the longitudinal QC workflow. + +Minimal scope: Dice/Jaccard overlap between the anatomical brain mask +and BOLD brain mask in longitudinal template space. No visualizations; +see #303/#304 for future viz pipelines. +""" from __future__ import annotations +import logging from typing import TYPE_CHECKING +import nibabel as nib +import polars as pl + +from rbc.bids import FUNC_GROUP_ENTITIES, Datatype, Suffix, extract_entities, load_table +from rbc.bids.longitudinal.qc import ( + LongitudinalQCOutputs, + export_longitudinal_qc, + resolve_longitudinal_qc, + write_longitudinal_qc_tsv, +) +from rbc.bids.session import iter_session_files +from rbc.core.niwrap import generate_exec_folder +from rbc.core.qc.registration import registration_qc_metrics +from rbc.orchestration import Filters, RunnerConfig, init_runner +from rbc.orchestration.longitudinal._iter import iter_sessions_with_template + if TYPE_CHECKING: from collections.abc import Sequence from pathlib import Path - from rbc.orchestration import Filters, RunnerConfig + from rbc.bids import Bids + +__all__ = ["process_qc", "run"] -__all__ = ["run"] +_logger = logging.getLogger(__name__) + +# Dice threshold for pass/fail on longitudinal registration QC. +DICE_THRESHOLD = 0.85 + + +def process_qc( + func_long: Bids, + *, + anat_brain_mask: Path, + bold_mask: Path, + sub: str, + ses: str, + task: str, + run: int | str, +) -> LongitudinalQCOutputs: + """Compute longitudinal registration QC for a single BOLD run. + + Compares the anatomical brain mask with the BOLD brain mask, both in + longitudinal template space, and writes a single-row QC TSV. + + Args: + func_long: Bids builder with ``space="longitudinal"`` for this run. + anat_brain_mask: Anatomical brain mask in longitudinal space. + bold_mask: BOLD brain mask in longitudinal space. + sub: Subject ID. + ses: Session label. + task: Task label. + run: Run number. + + Returns: + QC outputs with overlap metrics and pass/fail flag. + """ + anat_mask_arr = nib.nifti1.load(anat_brain_mask).get_fdata() + bold_mask_arr = nib.nifti1.load(bold_mask).get_fdata() + reg_metrics = registration_qc_metrics(anat_mask_arr, bold_mask_arr) + + passed = reg_metrics.dice >= DICE_THRESHOLD + + work_dir = generate_exec_folder("longitudinal_qc") + qc_file = write_longitudinal_qc_tsv( + work_dir / "registration_quality.tsv", + sub=sub, + ses=ses, + task=task, + run=run, + dice=reg_metrics.dice, + jaccard=reg_metrics.jaccard, + coverage=reg_metrics.coverage, + cross_corr=reg_metrics.cross_corr, + passed=passed, + ) + + outputs = LongitudinalQCOutputs( + dice=reg_metrics.dice, + jaccard=reg_metrics.jaccard, + coverage=reg_metrics.coverage, + cross_corr=reg_metrics.cross_corr, + passed=passed, + qc_file=qc_file, + ) + export_longitudinal_qc(func_long, outputs) + + status = "PASSED" if passed else "FAILED" + _logger.info( + "Longitudinal QC %s (Dice=%.4f) for sub-%s ses-%s task-%s run-%s", + status, + reg_metrics.dice, + sub, + ses, + task, + run, + ) + return outputs def run( @@ -22,10 +119,61 @@ def run( ) -> None: """Run registration QC for longitudinal derivatives. - Placeholder wired up by Stage 3; full implementation ships in Stage 6. + For each BOLD run, computes Dice/Jaccard overlap between the + anatomical brain mask and BOLD brain mask in longitudinal space. + + Args: + input_dirs: BIDS dataset directories (must include longitudinal + anatomical and functional derivatives). + output_dir: Output directory for derivatives. + filters: Participant/session/task filters. + runner_config: Execution backend configuration. """ - del input_dirs, output_dir, filters, runner_config - raise NotImplementedError( - "rbc longitudinal qc is planned for Stage 6 of the longitudinal " - "refactor (tracker: #301). It is not yet implemented." + config = runner_config or RunnerConfig() + init_runner(config) + verbose = config.verbose + + _logger.info("Preparing to run RBC longitudinal QC workflow") + full_df = load_table( + dataset_dirs=input_dirs, index_fpath=None, max_workers=0, verbose=verbose ) + + for pipe_ctx, session, _tpl_df in iter_sessions_with_template( + input_dirs, output_dir, filters=filters, verbose=verbose + ): + anat_q = pipe_ctx.bids(datatype=Datatype.ANAT) + anat_long_q = anat_q.derive(space="longitudinal") + anat_long_df = full_df.filter( + pl.col("datatype") == "anat", + pl.col("space") == "longitudinal", + ) + + func_long_df = full_df.filter( + pl.col("datatype") == "func", + pl.col("space") == "longitudinal", + ) + + for func_df, _ in iter_session_files(session, groupby=FUNC_GROUP_ENTITIES): + row = func_df.filter(suffix=Suffix.BOLD).row(0, named=True) + ents = extract_entities(row, ["task", "run"]) + + func_q = pipe_ctx.bids(datatype=Datatype.FUNC, entities=ents) + func_long_q = func_q.derive(space="longitudinal") + + resolved = resolve_longitudinal_qc( + anat_long_q, func_long_q, anat_long_df, func_long_df + ) + + process_qc( + func_long_q, + anat_brain_mask=resolved["anat_brain_mask"], + bold_mask=resolved["bold_mask"], + sub=pipe_ctx.sub, + ses=pipe_ctx.ses or "", + task=ents.get("task", ""), + run=ents.get("run", 0), + ) + + pipe_ctx.ensure_dataset_description() + + _logger.info("RBC longitudinal QC workflow complete") diff --git a/src/rbc/orchestration/longitudinal/template.py b/src/rbc/orchestration/longitudinal/template.py index f858b30d..026c3b65 100644 --- a/src/rbc/orchestration/longitudinal/template.py +++ b/src/rbc/orchestration/longitudinal/template.py @@ -28,7 +28,7 @@ from rbc.bids.longitudinal.template import TemplateInputs -__all__ = ["process_subject", "run"] +__all__ = ["process_subject", "run", "setup_freesurfer_auth"] _logger = logging.getLogger(__name__) @@ -77,7 +77,7 @@ def run( """ config = runner_config or RunnerConfig() init_runner(config) - _setup_freesurfer_auth(fs_license) + setup_freesurfer_auth(fs_license) verbose = config.verbose _logger.warning( @@ -111,7 +111,7 @@ def run( _logger.info("Longitudinal template construction complete") -def _setup_freesurfer_auth(fs_license: Path | None) -> None: +def setup_freesurfer_auth(fs_license: Path | None) -> None: """Resolve the FS license; fall back to the chklc bypass if absent.""" license_path = fs_license if license_path is None and (env := os.environ.get("FS_LICENSE")): diff --git a/tests/full_pipeline/longitudinal/__init__.py b/tests/full_pipeline/longitudinal/__init__.py new file mode 100644 index 00000000..2642d657 --- /dev/null +++ b/tests/full_pipeline/longitudinal/__init__.py @@ -0,0 +1 @@ +"""Full pipeline tests for the longitudinal workflow.""" diff --git a/tests/full_pipeline/longitudinal/conftest.py b/tests/full_pipeline/longitudinal/conftest.py new file mode 100644 index 00000000..c6828e5d --- /dev/null +++ b/tests/full_pipeline/longitudinal/conftest.py @@ -0,0 +1,282 @@ +"""Shared fixtures for longitudinal full-pipeline tests. + +Mirrors the subprocess-driven style in ``tests/integration/test_all.py``: +session-scoped fixtures run each ``rbc`` invocation once and return the +output directory. The ds000114 dataset is used (sub-01, ses-test + +ses-retest). +""" + +from __future__ import annotations + +import shutil +import subprocess +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from collections.abc import Sequence + +_REPO_ROOT = Path(__file__).resolve().parents[3] +_DOWNLOAD_SCRIPT = _REPO_ROOT / "scripts" / "download_ds000114.py" +_DATASET_DIR = _REPO_ROOT / "tests" / "data" / "ds000114" +_DATASET_SENTINEL = ( + _DATASET_DIR / "sub-01" / "ses-test" / "anat" / "sub-01_ses-test_T1w.nii.gz" +) + +_SUB = "01" +_TASK = "fingerfootlips" + + +def _rbc_exe() -> str: + exe = shutil.which("rbc") + assert exe is not None, "rbc CLI not found on PATH" + return exe + + +def _run_rbc( + args: Sequence[str], *, timeout: int = 7200 +) -> subprocess.CompletedProcess[str]: + """Run the ``rbc`` CLI and assert it exits cleanly.""" + result = subprocess.run( # noqa: S603 + [_rbc_exe(), *args], + capture_output=True, + text=True, + timeout=timeout, + ) + assert result.returncode == 0, ( + f"rbc {args[0]} exited with code {result.returncode}\n" + f"--- stdout ---\n{result.stdout[-2000:]}\n" + f"--- stderr ---\n{result.stderr[-2000:]}" + ) + return result + + +def _relative_files(root: Path) -> set[str]: + """Return the set of file paths relative to *root*.""" + return {str(p.relative_to(root)) for p in root.rglob("*") if p.is_file()} + + +# --------------------------------------------------------------------------- +# Dataset fixture +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="session") +def ds000114_dataset() -> Path: + """Return the ds000114 BIDS dataset root, downloading it on first use.""" + if _DATASET_SENTINEL.exists(): + return _DATASET_DIR + + if not _DOWNLOAD_SCRIPT.exists(): + pytest.skip(f"download script missing: {_DOWNLOAD_SCRIPT}") + + uv = shutil.which("uv") + if uv is None: + pytest.skip("uv not found on PATH; cannot run download script") + + result = subprocess.run( # noqa: S603 + [uv, "run", str(_DOWNLOAD_SCRIPT), str(_DATASET_DIR)], + capture_output=True, + text=True, + ) + if result.returncode != 0 or not _DATASET_SENTINEL.exists(): + pytest.skip( + "ds000114 download failed; skipping longitudinal tests.\n" + f"--- stdout ---\n{result.stdout[-1000:]}\n" + f"--- stderr ---\n{result.stderr[-1000:]}" + ) + return _DATASET_DIR + + +@pytest.fixture(scope="session") +def _runner(request: pytest.FixtureRequest) -> str: + return request.config.getoption("--runner") + + +# --------------------------------------------------------------------------- +# Stage-by-stage pipeline fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="session") +def longitudinal_pipeline_data( + ds000114_dataset: Path, + tmp_path_factory: pytest.TempPathFactory, + _runner: str, +) -> Path: + """Run the full longitudinal pipeline stage-by-stage. + + Executes: anatomical -> functional -> longitudinal template -> + longitudinal anatomical -> longitudinal functional -> + longitudinal metrics -> longitudinal qc. + + Session-scoped so the cost is paid once across all tests. + Returns the derivatives directory. + """ + out = tmp_path_factory.mktemp("long_pipeline") / "derivatives" + out.mkdir() + + raw = str(ds000114_dataset) + deriv = str(out) + common = [ + "--runner", + _runner, + "--participant-label", + _SUB, + ] + + # Cross-sectional anatomical + _run_rbc(["anatomical", raw, "-o", deriv, *common]) + + # Cross-sectional functional (ses-test only) + _run_rbc( + [ + "functional", + raw, + deriv, + "-o", + deriv, + *common, + "--session-label", + "test", + "--task", + _TASK, + ] + ) + + # Longitudinal template + _run_rbc( + [ + "longitudinal", + "template", + deriv, + "-o", + deriv, + *common, + ] + ) + + # Longitudinal anatomical + _run_rbc( + [ + "longitudinal", + "anatomical", + deriv, + "-o", + deriv, + *common, + "--session-label", + "test", + ] + ) + + # Longitudinal functional + _run_rbc( + [ + "longitudinal", + "functional", + deriv, + "-o", + deriv, + *common, + "--session-label", + "test", + "--task", + _TASK, + ] + ) + + # Longitudinal metrics + _run_rbc( + [ + "longitudinal", + "metrics", + deriv, + "-o", + deriv, + *common, + "--session-label", + "test", + "--task", + _TASK, + ] + ) + + # Longitudinal QC + _run_rbc( + [ + "longitudinal", + "qc", + deriv, + "-o", + deriv, + *common, + "--session-label", + "test", + ] + ) + + return out + + +@pytest.fixture(scope="session") +def longitudinal_all_data( + ds000114_dataset: Path, + tmp_path_factory: pytest.TempPathFactory, + _runner: str, +) -> Path: + """Run ``rbc longitudinal all`` and return the derivatives directory. + + Cross-sectional anatomical + functional must run first (``all`` does + not re-run cross-sectional stages). + """ + out = tmp_path_factory.mktemp("long_all") / "derivatives" + out.mkdir() + + raw = str(ds000114_dataset) + deriv = str(out) + common = [ + "--runner", + _runner, + "--participant-label", + _SUB, + ] + + # Cross-sectional anatomical + _run_rbc(["anatomical", raw, "-o", deriv, *common]) + + # Cross-sectional functional (ses-test only) + _run_rbc( + [ + "functional", + raw, + deriv, + "-o", + deriv, + *common, + "--session-label", + "test", + "--task", + _TASK, + ] + ) + + # Full longitudinal pipeline (ses-test only, matching sequential run) + _run_rbc( + [ + "longitudinal", + "all", + deriv, + "-o", + deriv, + *common, + "--session-label", + "test", + "--task", + _TASK, + ] + ) + + return out diff --git a/tests/full_pipeline/longitudinal/test_all.py b/tests/full_pipeline/longitudinal/test_all.py new file mode 100644 index 00000000..87c77869 --- /dev/null +++ b/tests/full_pipeline/longitudinal/test_all.py @@ -0,0 +1,98 @@ +"""Full pipeline test for ``rbc longitudinal all``. + +Verifies that the combined pipeline produces the same outputs as running +each stage individually, mirroring tests/integration/test_all.py for the +cross-sectional pipeline. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from pathlib import Path + + +_SUB = "01" +_SES = "test" +_TASK = "fingerfootlips" +_STEM = f"sub-{_SUB}_ses-{_SES}_task-{_TASK}" + + +def _relative_files(root: Path) -> set[str]: + """Return the set of file paths relative to *root*.""" + return {str(p.relative_to(root)) for p in root.rglob("*") if p.is_file()} + + +def _file_tree(root: Path) -> str: + files = sorted(p.relative_to(root) for p in root.rglob("*") if p.is_file()) + return "\n".join(str(f) for f in files) if files else "(empty)" + + +def _longitudinal_files(root: Path) -> set[str]: + """Return only longitudinal-space files relative to *root*.""" + return { + str(p.relative_to(root)) + for p in root.rglob("*") + if p.is_file() and "longitudinal" in p.name + } + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.slow +def test_longitudinal_all_produces_derivatives( + longitudinal_all_data: Path, +) -> None: + """``rbc longitudinal all`` runs end-to-end and writes expected files.""" + func = longitudinal_all_data / f"sub-{_SUB}" / f"ses-{_SES}" / "func" + tree = _file_tree(longitudinal_all_data) + + # Functional derivatives in longitudinal space + assert list(func.glob(f"{_STEM}_space-longitudinal_desc-preproc_bold.nii.gz")), ( + f"Missing longitudinal BOLD\n--- file tree ---\n{tree}" + ) + + # Metrics + assert list(func.glob(f"{_STEM}_space-longitudinal_*_alff.nii.gz")), ( + f"Missing ALFF\n--- file tree ---\n{tree}" + ) + assert list(func.glob(f"{_STEM}_space-longitudinal_*_timeseries.tsv")), ( + f"Missing timeseries\n--- file tree ---\n{tree}" + ) + + # QC + assert list(func.glob(f"{_STEM}_space-longitudinal_*quality*.tsv")), ( + f"Missing QC TSV\n--- file tree ---\n{tree}" + ) + + +@pytest.mark.slow +def test_longitudinal_all_vs_sequential_filenames_match( + longitudinal_pipeline_data: Path, + longitudinal_all_data: Path, +) -> None: + """Both invocation styles must produce the same longitudinal-space files. + + Compares only ``*longitudinal*`` filenames (not cross-sectional + derivatives, which may differ in tmp path layout). + """ + seq_files = _longitudinal_files(longitudinal_pipeline_data) + all_files = _longitudinal_files(longitudinal_all_data) + + missing_from_all = seq_files - all_files + extra_in_all = all_files - seq_files + + assert not missing_from_all, ( + "Files produced by sequential run but missing from 'rbc longitudinal all':\n" + + "\n".join(sorted(missing_from_all)) + ) + assert not extra_in_all, ( + "Files produced by 'rbc longitudinal all' but missing from sequential run:\n" + + "\n".join(sorted(extra_in_all)) + ) diff --git a/tests/full_pipeline/longitudinal/test_metrics.py b/tests/full_pipeline/longitudinal/test_metrics.py new file mode 100644 index 00000000..64d36f4c --- /dev/null +++ b/tests/full_pipeline/longitudinal/test_metrics.py @@ -0,0 +1,65 @@ +"""Full pipeline test for longitudinal metrics. + +Verifies that ``rbc longitudinal metrics`` produces ALFF, fALFF, ReHo, +and atlas timeseries/correlation outputs in ``space-longitudinal``. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from pathlib import Path + + +_SUB = "01" +_SES = "test" +_TASK = "fingerfootlips" +_STEM = f"sub-{_SUB}_ses-{_SES}_task-{_TASK}" + + +def _file_tree(root: Path) -> str: + files = sorted(p.relative_to(root) for p in root.rglob("*") if p.is_file()) + return "\n".join(str(f) for f in files) if files else "(empty)" + + +@pytest.mark.slow +def test_longitudinal_metrics_produces_expected_files( + longitudinal_pipeline_data: Path, +) -> None: + """Metrics stage writes ALFF, fALFF, ReHo, timeseries, and correlations.""" + func = longitudinal_pipeline_data / f"sub-{_SUB}" / f"ses-{_SES}" / "func" + tree = _file_tree(longitudinal_pipeline_data) + + expected_fragments = [ + f"{_STEM}_space-longitudinal_reg-36parameter_alff.nii.gz", + f"{_STEM}_space-longitudinal_reg-36parameter_falff.nii.gz", + f"{_STEM}_space-longitudinal_reg-36parameter_desc-smooth_alff.nii.gz", + f"{_STEM}_space-longitudinal_reg-36parameter_desc-smooth_falff.nii.gz", + f"{_STEM}_space-longitudinal_reg-36parameter_desc-smoothZstd_alff.nii.gz", + f"{_STEM}_space-longitudinal_reg-36parameter_desc-smoothZstd_falff.nii.gz", + f"{_STEM}_space-longitudinal_reg-36parameter_reho.nii.gz", + f"{_STEM}_space-longitudinal_reg-36parameter_desc-smooth_reho.nii.gz", + f"{_STEM}_space-longitudinal_reg-36parameter_desc-smoothZstd_reho.nii.gz", + ] + for name in expected_fragments: + assert (func / name).is_file(), ( + f"Missing metrics file: {name}\n--- file tree ---\n{tree}" + ) + + +@pytest.mark.slow +def test_longitudinal_metrics_timeseries_exist( + longitudinal_pipeline_data: Path, +) -> None: + """Atlas timeseries and correlation TSVs are produced.""" + func = longitudinal_pipeline_data / f"sub-{_SUB}" / f"ses-{_SES}" / "func" + tree = _file_tree(longitudinal_pipeline_data) + + timeseries = list(func.glob(f"{_STEM}_space-longitudinal_*_timeseries.tsv")) + assert timeseries, f"No timeseries TSV found\n--- file tree ---\n{tree}" + + correlations = list(func.glob(f"{_STEM}_space-longitudinal_*_correlations.tsv")) + assert correlations, f"No correlation TSV found\n--- file tree ---\n{tree}" diff --git a/tests/full_pipeline/longitudinal/test_qc.py b/tests/full_pipeline/longitudinal/test_qc.py new file mode 100644 index 00000000..89b3a6da --- /dev/null +++ b/tests/full_pipeline/longitudinal/test_qc.py @@ -0,0 +1,72 @@ +"""Full pipeline test for longitudinal QC. + +Verifies that ``rbc longitudinal qc`` produces a registration-quality +TSV with Dice/Jaccard metrics and a pass/fail flag. +""" + +from __future__ import annotations + +import csv +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from pathlib import Path + + +_SUB = "01" +_SES = "test" +_TASK = "fingerfootlips" +_STEM = f"sub-{_SUB}_ses-{_SES}_task-{_TASK}" + + +def _file_tree(root: Path) -> str: + files = sorted(p.relative_to(root) for p in root.rglob("*") if p.is_file()) + return "\n".join(str(f) for f in files) if files else "(empty)" + + +@pytest.mark.slow +def test_longitudinal_qc_tsv_exists( + longitudinal_pipeline_data: Path, +) -> None: + """QC stage writes a registration quality TSV.""" + func = longitudinal_pipeline_data / f"sub-{_SUB}" / f"ses-{_SES}" / "func" + tree = _file_tree(longitudinal_pipeline_data) + + qc_files = list(func.glob(f"{_STEM}_space-longitudinal_*quality*.tsv")) + assert qc_files, f"No QC quality TSV found\n--- file tree ---\n{tree}" + + +@pytest.mark.slow +def test_longitudinal_qc_tsv_has_expected_columns( + longitudinal_pipeline_data: Path, +) -> None: + """QC TSV contains dice, jaccard, coverage, cross_corr, passed columns.""" + func = longitudinal_pipeline_data / f"sub-{_SUB}" / f"ses-{_SES}" / "func" + qc_files = list(func.glob(f"{_STEM}_space-longitudinal_*quality*.tsv")) + assert qc_files, "No QC quality TSV found" + + with qc_files[0].open() as fh: + reader = csv.DictReader(fh, delimiter="\t") + rows = list(reader) + + assert len(rows) == 1, f"Expected 1 row, got {len(rows)}" + expected_cols = { + "sub", + "ses", + "task", + "run", + "dice", + "jaccard", + "coverage", + "cross_corr", + "passed", + } + assert expected_cols.issubset(rows[0].keys()), ( + f"Missing columns: {expected_cols - rows[0].keys()}" + ) + + # Dice should be a reasonable value (> 0 at minimum) + dice = float(rows[0]["dice"]) + assert dice > 0.0, f"Dice coefficient is {dice}, expected > 0" diff --git a/tests/unit/cli/test_longitudinal.py b/tests/unit/cli/test_longitudinal.py index 66f97918..1bcc62ac 100644 --- a/tests/unit/cli/test_longitudinal.py +++ b/tests/unit/cli/test_longitudinal.py @@ -106,16 +106,22 @@ def test_defaults(self, base_ns: argparse.Namespace) -> None: """FWHM defaults to 6 mm and atlas resolves from the registry.""" base_ns.atlas = ["schaefer_200"] base_ns.fwhm = 6.0 + base_ns.tr = None base_ns.task = None + base_ns.regressor = ["36-parameter"] args = MetricsLongArgs.validate_namespace(base_ns) assert args.fwhm == 6.0 + assert args.tr is None assert "schaefer_200" in args.atlas_files + assert args.regressor == ["36-parameter"] def test_nonpositive_fwhm_rejected(self, base_ns: argparse.Namespace) -> None: """FWHM must be strictly positive.""" base_ns.atlas = ["schaefer_200"] base_ns.fwhm = 0.0 + base_ns.tr = None base_ns.task = None + base_ns.regressor = ["36-parameter"] with pytest.raises(ValueError, match="FWHM"): MetricsLongArgs.validate_namespace(base_ns) @@ -128,10 +134,14 @@ def test_defaults(self, base_ns: argparse.Namespace) -> None: base_ns.anat_template = None base_ns.atlas = ["schaefer_200"] base_ns.fwhm = 6.0 + base_ns.regressor = ["36-parameter"] + base_ns.task = None args = AllLongArgs.validate_namespace(base_ns) assert args.fwhm == 6.0 assert "schaefer_200" in args.atlas_files assert args.registration_template.name.endswith(".nii.gz") + assert args.regressor == ["36-parameter"] + assert args.task is None class TestParentSubparser: @@ -196,29 +206,59 @@ def test_functional_dispatches(self, tmp_path: Path) -> None: assert rc == 0 mock_run.assert_called_once() - def test_metrics_subcommand_registers(self, tmp_path: Path) -> None: - """Metrics subcommand is registered; Stage 6 raises NotImplementedError.""" + def test_metrics_dispatches(self, tmp_path: Path) -> None: + """``rbc longitudinal metrics`` routes to the metrics orchestration.""" input_dir = tmp_path / "input" input_dir.mkdir() output_dir = tmp_path / "output" - with pytest.raises(NotImplementedError, match="Stage 6"): - cli(["longitudinal", "metrics", str(input_dir), "-o", str(output_dir)]) + with patch("rbc.cli.longitudinal.metrics.run") as mock_run: + rc = cli( + [ + "longitudinal", + "metrics", + str(input_dir), + "-o", + str(output_dir), + ] + ) + assert rc == 0 + mock_run.assert_called_once() - def test_qc_subcommand_registers(self, tmp_path: Path) -> None: - """QC subcommand is registered; Stage 6 raises NotImplementedError.""" + def test_qc_dispatches(self, tmp_path: Path) -> None: + """``rbc longitudinal qc`` routes to the QC orchestration.""" input_dir = tmp_path / "input" input_dir.mkdir() output_dir = tmp_path / "output" - with pytest.raises(NotImplementedError, match="Stage 6"): - cli(["longitudinal", "qc", str(input_dir), "-o", str(output_dir)]) + with patch("rbc.cli.longitudinal.qc.run") as mock_run: + rc = cli( + [ + "longitudinal", + "qc", + str(input_dir), + "-o", + str(output_dir), + ] + ) + assert rc == 0 + mock_run.assert_called_once() - def test_all_subcommand_registers(self, tmp_path: Path) -> None: - """All subcommand is registered; Stage 6 raises NotImplementedError.""" + def test_all_dispatches(self, tmp_path: Path) -> None: + """``rbc longitudinal all`` routes to the all orchestration.""" input_dir = tmp_path / "input" input_dir.mkdir() output_dir = tmp_path / "output" - with pytest.raises(NotImplementedError, match="Stage 6"): - cli(["longitudinal", "all", str(input_dir), "-o", str(output_dir)]) + with patch("rbc.cli.longitudinal.all.run") as mock_run: + rc = cli( + [ + "longitudinal", + "all", + str(input_dir), + "-o", + str(output_dir), + ] + ) + assert rc == 0 + mock_run.assert_called_once() diff --git a/tests/unit/orchestration/test_longitudinal_template.py b/tests/unit/orchestration/test_longitudinal_template.py index 9aaba19e..c2b6af28 100644 --- a/tests/unit/orchestration/test_longitudinal_template.py +++ b/tests/unit/orchestration/test_longitudinal_template.py @@ -10,7 +10,7 @@ import pytest from rbc.orchestration import Filters -from rbc.orchestration.longitudinal.template import _setup_freesurfer_auth, run +from rbc.orchestration.longitudinal.template import run, setup_freesurfer_auth if TYPE_CHECKING: from pathlib import Path @@ -70,7 +70,7 @@ def test_explicit_license_mounted(self, tmp_path: Path) -> None: "rbc.orchestration.longitudinal.template.mount_fs_license" ) as mock_mount, ): - _setup_freesurfer_auth(license_path) + setup_freesurfer_auth(license_path) mock_mount.assert_called_once_with(runner, license_path) def test_env_fallback( @@ -91,7 +91,7 @@ def test_env_fallback( "rbc.orchestration.longitudinal.template.mount_fs_license" ) as mock_mount, ): - _setup_freesurfer_auth(None) + setup_freesurfer_auth(None) mock_mount.assert_called_once_with(runner, license_path) def test_bypass_when_no_license(self, monkeypatch: pytest.MonkeyPatch) -> None: @@ -109,7 +109,7 @@ def test_bypass_when_no_license(self, monkeypatch: pytest.MonkeyPatch) -> None: "rbc.orchestration.longitudinal.template.mount_fs_license" ) as mock_mount, ): - _setup_freesurfer_auth(None) + setup_freesurfer_auth(None) assert runner.environ["SURFER_SIDEDOOR"] == "1" mock_mount.assert_not_called() @@ -129,13 +129,13 @@ def test_bypass_skipped_when_runner_lacks_environ( "rbc.orchestration.longitudinal.template.mount_fs_license" ) as mock_mount, ): - _setup_freesurfer_auth(None) + setup_freesurfer_auth(None) mock_mount.assert_not_called() def test_missing_license_raises(self, tmp_path: Path) -> None: """A non-existent license path raises before reaching the runner.""" with pytest.raises(FileNotFoundError, match="not found"): - _setup_freesurfer_auth(tmp_path / "missing.txt") + setup_freesurfer_auth(tmp_path / "missing.txt") class TestRunSkipWarning: @@ -152,7 +152,7 @@ def test_warns_on_skipped_subject( ) with ( patch("rbc.orchestration.longitudinal.template.init_runner"), - patch("rbc.orchestration.longitudinal.template._setup_freesurfer_auth"), + patch("rbc.orchestration.longitudinal.template.setup_freesurfer_auth"), patch( "rbc.orchestration.longitudinal.template.load_table", return_value=df, diff --git a/tests/unit/test_metadata.py b/tests/unit/test_metadata.py index 353bef7a..61821487 100644 --- a/tests/unit/test_metadata.py +++ b/tests/unit/test_metadata.py @@ -11,9 +11,9 @@ from rbc.metadata import ( FunctionalMetadata, - _resolve_tr, _validate_slice_timing, - _warn_implausible_tr, + resolve_tr, + warn_implausible_tr, ) if TYPE_CHECKING: @@ -25,68 +25,68 @@ class TestResolveTr: def test_override_wins(self) -> None: """CLI override takes priority over sidecar and header.""" - tr = _resolve_tr(sidecar_tr=2.0, header_tr=2.0, override=1.5) + tr = resolve_tr(sidecar_tr=2.0, header_tr=2.0, override=1.5) assert tr == 1.5 def test_override_warns_on_sidecar_mismatch( self, caplog: pytest.LogCaptureFixture ) -> None: """CLI override logs warning when sidecar disagrees.""" - _resolve_tr(sidecar_tr=2.0, header_tr=2.0, override=1.5) + resolve_tr(sidecar_tr=2.0, header_tr=2.0, override=1.5) assert any("differs from sidecar" in msg for msg in caplog.messages) def test_override_warns_on_header_mismatch( self, caplog: pytest.LogCaptureFixture ) -> None: """CLI override logs warning when header disagrees.""" - _resolve_tr(sidecar_tr=1.5, header_tr=2.0, override=1.5) + resolve_tr(sidecar_tr=1.5, header_tr=2.0, override=1.5) assert any("differs from NIfTI header" in msg for msg in caplog.messages) def test_sidecar_used_when_no_override(self) -> None: """Sidecar TR is used when no override is provided.""" - tr = _resolve_tr(sidecar_tr=2.0, header_tr=2.0, override=None) + tr = resolve_tr(sidecar_tr=2.0, header_tr=2.0, override=None) assert tr == 2.0 def test_sidecar_header_mismatch_raises(self) -> None: """Sidecar/header mismatch without override raises ValueError.""" with pytest.raises(ValueError, match="TR mismatch"): - _resolve_tr(sidecar_tr=2.0, header_tr=1.5, override=None) + resolve_tr(sidecar_tr=2.0, header_tr=1.5, override=None) def test_sidecar_header_within_tolerance(self) -> None: """Sidecar/header difference within tolerance is OK.""" - tr = _resolve_tr(sidecar_tr=2.0, header_tr=2.0005, override=None) + tr = resolve_tr(sidecar_tr=2.0, header_tr=2.0005, override=None) assert tr == 2.0 def test_header_fallback_when_no_sidecar(self) -> None: """Header TR is used when sidecar has no RepetitionTime.""" - tr = _resolve_tr(sidecar_tr=None, header_tr=2.0, override=None) + tr = resolve_tr(sidecar_tr=None, header_tr=2.0, override=None) assert tr == 2.0 def test_header_fallback_logs_warning( self, caplog: pytest.LogCaptureFixture ) -> None: """Header fallback logs a warning.""" - _resolve_tr(sidecar_tr=None, header_tr=2.0, override=None) + resolve_tr(sidecar_tr=None, header_tr=2.0, override=None) assert any("falling back to NIfTI header" in msg for msg in caplog.messages) def test_no_tr_anywhere_raises(self) -> None: """Missing TR from all sources raises ValueError.""" with pytest.raises(ValueError, match="Cannot determine TR"): - _resolve_tr(sidecar_tr=None, header_tr=None, override=None) + resolve_tr(sidecar_tr=None, header_tr=None, override=None) def test_zero_header_treated_as_missing(self) -> None: """Zero header TR is treated as missing.""" with pytest.raises(ValueError, match="Cannot determine TR"): - _resolve_tr(sidecar_tr=None, header_tr=0.0, override=None) + resolve_tr(sidecar_tr=None, header_tr=0.0, override=None) def test_zero_sidecar_treated_as_missing(self) -> None: """Zero sidecar TR falls through to header.""" - tr = _resolve_tr(sidecar_tr=0.0, header_tr=2.0, override=None) + tr = resolve_tr(sidecar_tr=0.0, header_tr=2.0, override=None) assert tr == 2.0 def test_negative_sidecar_treated_as_missing(self) -> None: """Negative sidecar TR falls through to header.""" - tr = _resolve_tr(sidecar_tr=-1.0, header_tr=2.0, override=None) + tr = resolve_tr(sidecar_tr=-1.0, header_tr=2.0, override=None) assert tr == 2.0 @@ -269,27 +269,27 @@ class TestWarnImplausibleTr: def test_normal_tr_no_warning(self, caplog: pytest.LogCaptureFixture) -> None: """Normal TR values produce no warning.""" - _warn_implausible_tr(2.0) + warn_implausible_tr(2.0) assert not any("unusually" in msg for msg in caplog.messages) def test_very_short_tr_warns(self, caplog: pytest.LogCaptureFixture) -> None: """TR below 0.1s triggers a warning.""" - _warn_implausible_tr(0.05) + warn_implausible_tr(0.05) assert any("unusually short" in msg for msg in caplog.messages) def test_very_long_tr_warns(self, caplog: pytest.LogCaptureFixture) -> None: """TR above 20s triggers a warning.""" - _warn_implausible_tr(30.0) + warn_implausible_tr(30.0) assert any("unusually long" in msg for msg in caplog.messages) def test_boundary_low_no_warning(self, caplog: pytest.LogCaptureFixture) -> None: """TR exactly at the low boundary does not warn.""" - _warn_implausible_tr(0.1) + warn_implausible_tr(0.1) assert not any("unusually" in msg for msg in caplog.messages) def test_boundary_high_no_warning(self, caplog: pytest.LogCaptureFixture) -> None: """TR exactly at the high boundary does not warn.""" - _warn_implausible_tr(20.0) + warn_implausible_tr(20.0) assert not any("unusually" in msg for msg in caplog.messages)