diff --git a/README.md b/README.md index c3013b5e..1dd2559e 100644 --- a/README.md +++ b/README.md @@ -41,16 +41,29 @@ Run any command with `--help` for full options. ## Workflows -| Command | Description | -| ---------------- | ------------------------------------------------------------------------------------------------ | -| `rbc anatomical` | Brain extraction (ANTs), tissue segmentation (FSL FAST), registration to MNI152 | -| `rbc functional` | Motion correction, slice timing, BBR coregistration, single-step resampling, nuisance regression | -| `rbc metrics` | ALFF/fALFF, ReHo, smoothing, z-scoring, atlas-based timeseries and correlation matrices | -| `rbc qc` | XCP-D format quality metrics, framewise displacement, DVARS, RBC pass/fail thresholds | -| `rbc all` | Runs all four stages in sequence, passing results in memory between stages | +| Command | Description | +| -------------------- | ------------------------------------------------------------------------------------------------ | +| `rbc anatomical` | Brain extraction (ANTs), tissue segmentation (FSL FAST), registration to MNI152 | +| `rbc functional` | Motion correction, slice timing, BBR coregistration, single-step resampling, nuisance regression | +| `rbc metrics` | ALFF/fALFF, ReHo, smoothing, z-scoring, atlas-based timeseries and correlation matrices | +| `rbc qc` | XCP-D format quality metrics, framewise displacement, DVARS, RBC pass/fail thresholds | +| `rbc all` | Runs all four stages in sequence, passing results in memory between stages | +| `rbc longitudinal …` | Multi-session workflows: `template`, `anatomical`, `functional`, `metrics`, `qc`, `all` | Workflows must be run in order: `anatomical` → `functional` → `metrics` / `qc`. The `all` command handles this automatically. +Longitudinal workflows consume cross-sectional derivatives. Typical flow: + +```bash +rbc all /data -o /data/derivatives --runner docker +rbc longitudinal template /data/derivatives -o /data/derivatives --runner docker +rbc longitudinal anatomical /data/derivatives -o /data/derivatives --runner docker +``` + +`rbc long` is an alias for `rbc longitudinal`. The `metrics`, `qc`, and `all` +longitudinal stages are registered but raise `NotImplementedError` until +Stage 6 of the longitudinal refactor lands (tracker: #301). + ## Outputs See the [data dictionary](docs/data_dictionary.md) for a complete description of every output file, including format details and the processing step that produces each one. diff --git a/docs/data_dictionary.md b/docs/data_dictionary.md index 5319297b..40f4d4c7 100644 --- a/docs/data_dictionary.md +++ b/docs/data_dictionary.md @@ -125,7 +125,7 @@ Produced by `rbc qc`. A single summary file per functional run containing qualit ## Longitudinal outputs -Produced by `rbc longitudinal`. Anatomical outputs aligned to a subject-specific longitudinal template built from multiple sessions. Note: longitudinal functional processing is not yet implemented. +Produced by the `rbc longitudinal` subcommand group (`template`, `anatomical`, `functional`, `metrics`, `qc`, `all`). Anatomical outputs aligned to a subject-specific longitudinal template built from multiple sessions. Note: longitudinal metrics, QC, and the combined `all` stage are not yet implemented (tracker: #301). | File | Suffix | Description | Created by | Format | | -------------------------------------------------- | ------ | ------------------------------------------------------------------------- | ------------------------------------------ | ---------------------------- | diff --git a/src/rbc/cli/longitudinal/__init__.py b/src/rbc/cli/longitudinal/__init__.py index 883f51a9..0cc27d77 100644 --- a/src/rbc/cli/longitudinal/__init__.py +++ b/src/rbc/cli/longitudinal/__init__.py @@ -1,15 +1,25 @@ """``rbc longitudinal`` parent command and nested subcommand registration. -Stage 2 introduces the nested subcommand layout. The pre-existing -``--anatomical --functional`` flow lives under ``rbc longitudinal process`` -until Stage 3 splits it into per-stage subcommands. +Stage 3 lands the full nested subcommand layout. Every stage of the +longitudinal pipeline has a dedicated subcommand; ``metrics``, ``qc``, and +``all`` are registered so they surface in ``--help`` but raise +``NotImplementedError`` until Stage 6. """ from __future__ import annotations from typing import TYPE_CHECKING -from rbc.cli.longitudinal import process, template +from rbc.cli.longitudinal import ( + all as all_, +) +from rbc.cli.longitudinal import ( + anatomical, + functional, + metrics, + qc, + template, +) if TYPE_CHECKING: import argparse @@ -26,7 +36,7 @@ def register_command( aliases=["long"], description="RBC longitudinal workflows", help="Longitudinal workflows", - usage="rbc longitudinal {template,process} ...", + usage=("rbc longitudinal {template,anatomical,functional,metrics,qc,all} ..."), ) nested = parser.add_subparsers( title="longitudinal stages", @@ -36,4 +46,8 @@ def register_command( help="Stage help", ) template.register_command(nested, parents=parents) - process.register_command(nested, parents=parents) + anatomical.register_command(nested, parents=parents) + functional.register_command(nested, parents=parents) + metrics.register_command(nested, parents=parents) + qc.register_command(nested, parents=parents) + all_.register_command(nested, parents=parents) diff --git a/src/rbc/cli/longitudinal/_base.py b/src/rbc/cli/longitudinal/_base.py new file mode 100644 index 00000000..edc36f48 --- /dev/null +++ b/src/rbc/cli/longitudinal/_base.py @@ -0,0 +1,46 @@ +"""Shared base arguments for ``rbc longitudinal`` subcommands.""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING + +from rbc.cli.base import BaseArgs + +if TYPE_CHECKING: + import argparse + + +@dataclass(frozen=True) +class LongitudinalBaseArgs(BaseArgs): + """Base args for longitudinal subcommands. + + Adds ``--fs-license`` on top of :class:`~rbc.cli.base.BaseArgs`. + Only the template stage currently consumes the license, but the flag is + accepted across all longitudinal subcommands for a consistent surface. + """ + + fs_license: Path | None + + @classmethod + def validate_namespace(cls, ns: argparse.Namespace) -> LongitudinalBaseArgs: + """Validate base args plus the optional ``--fs-license`` path.""" + fs_license: Path | None = ns.fs_license + if fs_license is not None and not fs_license.exists(): + raise ValueError(f"FreeSurfer license not found: {fs_license}") + return cls(**BaseArgs.validate_namespace(ns).__dict__, fs_license=fs_license) + + +def add_fs_license_argument(parser: argparse.ArgumentParser) -> None: + """Attach the ``--fs-license`` argument to a subcommand parser.""" + parser.add_argument( + "--fs-license", + type=Path, + default=None, + help=( + "Optional path to a FreeSurfer license file. Falls back to the " + "FS_LICENSE environment variable, then to a license-free bypass " + "if neither is set. Only the ``template`` stage consumes it." + ), + ) diff --git a/src/rbc/cli/longitudinal/all.py b/src/rbc/cli/longitudinal/all.py new file mode 100644 index 00000000..493d0858 --- /dev/null +++ b/src/rbc/cli/longitudinal/all.py @@ -0,0 +1,110 @@ +"""``rbc longitudinal all`` subcommand (placeholder for Stage 6).""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from rbc.cli.base import _or_default, _validate_nifti_path, _validate_positive +from rbc.cli.longitudinal._base import LongitudinalBaseArgs, add_fs_license_argument +from rbc.cli.metrics import _resolve_atlas_args +from rbc.orchestration import Filters, RunnerConfig +from rbc.orchestration.longitudinal.all import run +from rbc_resources import ATLAS_REGISTRY, REGISTRATION_TEMPLATES + +if TYPE_CHECKING: + import argparse + from collections.abc import Sequence + from pathlib import Path + + +@dataclass(frozen=True) +class AllLongArgs(LongitudinalBaseArgs): + """Arguments for ``rbc longitudinal all``.""" + + registration_template: Path + atlas_files: dict[str, Path] + fwhm: float + + @classmethod + def validate_namespace(cls, ns: argparse.Namespace) -> AllLongArgs: + """Validate namespace for the full longitudinal pipeline subcommand.""" + _validate_positive(ns.fwhm, "FWHM") + return cls( + **LongitudinalBaseArgs.validate_namespace(ns).__dict__, + registration_template=_or_default( + ns.anat_template, REGISTRATION_TEMPLATES.brain_1mm + ), + atlas_files=_resolve_atlas_args(ns.atlas), + fwhm=ns.fwhm, + ) + + +def main(args: AllLongArgs) -> int: + """Run the combined longitudinal pipeline.""" + run( + input_dirs=args.input_dirs, + output_dir=args.output_dir, + filters=Filters( + participant_label=args.participant_label, + session_label=args.session_label, + ), + fs_license=args.fs_license, + atlas_files=args.atlas_files, + fwhm=args.fwhm, + runner_config=RunnerConfig( + runner=args.runner, + verbose=bool(args.verbose), + tmp_dir=args.tmp_dir, + ants_threads=args.ants_threads, + ), + ) + return 0 + + +def register_command( + subparsers: argparse._SubParsersAction, + parents: Sequence[argparse.ArgumentParser], +) -> None: + """Register ``rbc longitudinal all`` (Stage 6 placeholder).""" + parser = subparsers.add_parser( + "all", + parents=parents, + description=( + "Run the full longitudinal pipeline (template → anat → func → " + "metrics → qc). Placeholder wired up by Stage 3; full " + "implementation ships in Stage 6." + ), + help="Full longitudinal pipeline (Stage 6)", + usage=( + "rbc longitudinal all INPUT_DIR [INPUT_DIR ...] -o OUTPUT_DIR [options]" + ), + ) + add_fs_license_argument(parser) + parser.add_argument( + "--atlas", + nargs="+", + default=["schaefer_200"], + metavar="ATLAS", + help=( + "Atlas(es) for timeseries extraction. Accepts registry names " + f"({', '.join(sorted(ATLAS_REGISTRY))}) or paths to custom NIfTI " + "atlas files." + ), + ) + parser.add_argument( + "--fwhm", + type=float, + default=6.0, + help="Smoothing kernel FWHM in mm.", + ) + + templates = parser.add_argument_group("template overrides") + templates.add_argument( + "--anat-template", + type=_validate_nifti_path, + default=None, + help="Custom brain template for anatomical registration.", + ) + + parser.set_defaults(func=lambda args: main(AllLongArgs.validate_namespace(args))) diff --git a/src/rbc/cli/longitudinal/anatomical.py b/src/rbc/cli/longitudinal/anatomical.py new file mode 100644 index 00000000..d62bbc4e --- /dev/null +++ b/src/rbc/cli/longitudinal/anatomical.py @@ -0,0 +1,87 @@ +"""``rbc longitudinal anatomical`` subcommand.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from rbc.cli.base import _or_default, _validate_nifti_path +from rbc.cli.longitudinal._base import LongitudinalBaseArgs, add_fs_license_argument +from rbc.orchestration import Filters, RunnerConfig +from rbc.orchestration.longitudinal.anatomical import run +from rbc_resources import REGISTRATION_TEMPLATES + +if TYPE_CHECKING: + import argparse + from collections.abc import Sequence + from pathlib import Path + + +@dataclass(frozen=True) +class AnatomicalLongArgs(LongitudinalBaseArgs): + """Arguments for ``rbc longitudinal anatomical``.""" + + registration_template: Path + + @classmethod + def validate_namespace(cls, ns: argparse.Namespace) -> AnatomicalLongArgs: + """Validate namespace for the longitudinal anatomical subcommand.""" + return cls( + **LongitudinalBaseArgs.validate_namespace(ns).__dict__, + registration_template=_or_default( + ns.anat_template, REGISTRATION_TEMPLATES.brain_1mm + ), + ) + + +def main(args: AnatomicalLongArgs) -> int: + """Run the longitudinal anatomical stage.""" + run( + input_dirs=args.input_dirs, + output_dir=args.output_dir, + filters=Filters( + participant_label=args.participant_label, + session_label=args.session_label, + ), + registration_template=args.registration_template, + runner_config=RunnerConfig( + runner=args.runner, + verbose=bool(args.verbose), + tmp_dir=args.tmp_dir, + ants_threads=args.ants_threads, + ), + ) + return 0 + + +def register_command( + subparsers: argparse._SubParsersAction, + parents: Sequence[argparse.ArgumentParser], +) -> None: + """Register ``rbc longitudinal anatomical`` on a longitudinal subparser group.""" + parser = subparsers.add_parser( + "anatomical", + parents=parents, + description=( + "Warp preprocessed anatomical derivatives into each subject's " + "longitudinal template space." + ), + help="Longitudinal anatomical stage", + usage=( + "rbc longitudinal anatomical INPUT_DIR [INPUT_DIR ...] " + "-o OUTPUT_DIR [options]" + ), + ) + add_fs_license_argument(parser) + + templates = parser.add_argument_group("template overrides") + templates.add_argument( + "--anat-template", + type=_validate_nifti_path, + default=None, + help="Custom brain template for anatomical registration.", + ) + + parser.set_defaults( + func=lambda args: main(AnatomicalLongArgs.validate_namespace(args)) + ) diff --git a/src/rbc/cli/longitudinal/functional.py b/src/rbc/cli/longitudinal/functional.py new file mode 100644 index 00000000..5eea0cce --- /dev/null +++ b/src/rbc/cli/longitudinal/functional.py @@ -0,0 +1,81 @@ +"""``rbc longitudinal functional`` subcommand.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from rbc.cli.base import _validate_task +from rbc.cli.longitudinal._base import LongitudinalBaseArgs, add_fs_license_argument +from rbc.orchestration import Filters, RunnerConfig +from rbc.orchestration.longitudinal.functional import run + +if TYPE_CHECKING: + import argparse + from collections.abc import Sequence + + +@dataclass(frozen=True) +class FunctionalLongArgs(LongitudinalBaseArgs): + """Arguments for ``rbc longitudinal functional``.""" + + task: str | None + + @classmethod + def validate_namespace(cls, ns: argparse.Namespace) -> FunctionalLongArgs: + """Validate namespace for the longitudinal functional subcommand.""" + _validate_task(ns.task) + return cls( + **LongitudinalBaseArgs.validate_namespace(ns).__dict__, + task=ns.task, + ) + + +def main(args: FunctionalLongArgs) -> int: + """Run the longitudinal functional stage.""" + run( + input_dirs=args.input_dirs, + output_dir=args.output_dir, + filters=Filters( + participant_label=args.participant_label, + session_label=args.session_label, + task=args.task, + ), + runner_config=RunnerConfig( + runner=args.runner, + verbose=bool(args.verbose), + tmp_dir=args.tmp_dir, + ants_threads=args.ants_threads, + ), + ) + return 0 + + +def register_command( + subparsers: argparse._SubParsersAction, + parents: Sequence[argparse.ArgumentParser], +) -> None: + """Register ``rbc longitudinal functional`` on a longitudinal subparser group.""" + parser = subparsers.add_parser( + "functional", + parents=parents, + description=( + "Warp preprocessed BOLD derivatives into each subject's " + "longitudinal template space." + ), + help="Longitudinal functional stage", + usage=( + "rbc longitudinal functional INPUT_DIR [INPUT_DIR ...] " + "-o OUTPUT_DIR [options]" + ), + ) + add_fs_license_argument(parser) + parser.add_argument( + "--task", + default=None, + help="Task label to filter BOLD runs (without 'task-' prefix).", + ) + + parser.set_defaults( + func=lambda args: main(FunctionalLongArgs.validate_namespace(args)) + ) diff --git a/src/rbc/cli/longitudinal/metrics.py b/src/rbc/cli/longitudinal/metrics.py new file mode 100644 index 00000000..e642bb96 --- /dev/null +++ b/src/rbc/cli/longitudinal/metrics.py @@ -0,0 +1,107 @@ +"""``rbc longitudinal metrics`` subcommand (placeholder for Stage 6).""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from rbc.cli.base import _validate_positive, _validate_task +from rbc.cli.longitudinal._base import LongitudinalBaseArgs, add_fs_license_argument +from rbc.cli.metrics import _resolve_atlas_args +from rbc.orchestration import Filters, RunnerConfig +from rbc.orchestration.longitudinal.metrics import run +from rbc_resources import ATLAS_REGISTRY + +if TYPE_CHECKING: + import argparse + from collections.abc import Sequence + from pathlib import Path + + +@dataclass(frozen=True) +class MetricsLongArgs(LongitudinalBaseArgs): + """Arguments for ``rbc longitudinal metrics``.""" + + atlas_files: dict[str, Path] + fwhm: float + task: str | None + + @classmethod + def validate_namespace(cls, ns: argparse.Namespace) -> MetricsLongArgs: + """Validate namespace for the longitudinal metrics subcommand.""" + _validate_task(ns.task) + _validate_positive(ns.fwhm, "FWHM") + return cls( + **LongitudinalBaseArgs.validate_namespace(ns).__dict__, + atlas_files=_resolve_atlas_args(ns.atlas), + fwhm=ns.fwhm, + task=ns.task, + ) + + +def main(args: MetricsLongArgs) -> int: + """Run resting-state metrics in longitudinal space.""" + run( + input_dirs=args.input_dirs, + output_dir=args.output_dir, + filters=Filters( + participant_label=args.participant_label, + session_label=args.session_label, + task=args.task, + ), + atlas_files=args.atlas_files, + fwhm=args.fwhm, + runner_config=RunnerConfig( + runner=args.runner, + verbose=bool(args.verbose), + tmp_dir=args.tmp_dir, + ants_threads=args.ants_threads, + ), + ) + return 0 + + +def register_command( + subparsers: argparse._SubParsersAction, + parents: Sequence[argparse.ArgumentParser], +) -> None: + """Register ``rbc longitudinal metrics`` (Stage 6 placeholder).""" + parser = subparsers.add_parser( + "metrics", + parents=parents, + description=( + "Compute resting-state metrics in longitudinal space. Placeholder " + "wired up by Stage 3; full implementation ships in Stage 6." + ), + help="Longitudinal metrics stage (Stage 6)", + usage=( + "rbc longitudinal metrics INPUT_DIR [INPUT_DIR ...] -o OUTPUT_DIR [options]" + ), + ) + add_fs_license_argument(parser) + parser.add_argument( + "--atlas", + nargs="+", + default=["schaefer_200"], + metavar="ATLAS", + help=( + "Atlas(es) for timeseries extraction. Accepts registry names " + f"({', '.join(sorted(ATLAS_REGISTRY))}) or paths to custom NIfTI " + "atlas files." + ), + ) + parser.add_argument( + "--fwhm", + type=float, + default=6.0, + help="Smoothing kernel FWHM in mm.", + ) + parser.add_argument( + "--task", + default=None, + help="Task label to filter BOLD runs (without 'task-' prefix).", + ) + + parser.set_defaults( + func=lambda args: main(MetricsLongArgs.validate_namespace(args)) + ) diff --git a/src/rbc/cli/longitudinal/process.py b/src/rbc/cli/longitudinal/process.py deleted file mode 100644 index 9d4c13eb..00000000 --- a/src/rbc/cli/longitudinal/process.py +++ /dev/null @@ -1,112 +0,0 @@ -"""Legacy ``rbc longitudinal process`` subcommand. - -Carries the pre-Stage-2 ``--anatomical --functional`` flag flow under a -nested subcommand so the new ``rbc longitudinal template`` can sit beside -it. Stage 3 will replace this with dedicated ``anatomical`` / ``functional`` -subcommands. -""" - -from __future__ import annotations - -from dataclasses import dataclass -from typing import TYPE_CHECKING - -from rbc.cli.base import BaseArgs, _or_default, _validate_nifti_path -from rbc.orchestration import Filters, RunnerConfig -from rbc.orchestration.longitudinal import run -from rbc_resources import REGISTRATION_TEMPLATES - -if TYPE_CHECKING: - import argparse - from collections.abc import Sequence - from pathlib import Path - - -@dataclass(frozen=True) -class LongitudinalArgs(BaseArgs): - """Arguments for the legacy ``longitudinal process`` subcommand.""" - - anatomical: bool - functional: bool - registration_template: Path - - @classmethod - def validate_namespace(cls, ns: argparse.Namespace) -> LongitudinalArgs: - """Validate the legacy longitudinal namespace.""" - if not ns.functional and not ns.anatomical: - raise ValueError( - "At least one of '--anatomical' or '--functional' is required." - ) - return cls( - **BaseArgs.validate_namespace(ns).__dict__, - anatomical=ns.anatomical, - functional=ns.functional, - registration_template=_or_default( - ns.anat_template, REGISTRATION_TEMPLATES.brain_1mm - ), - ) - - -def main(args: LongitudinalArgs) -> int: - """Run the legacy longitudinal anat+func dispatcher.""" - run( - input_dirs=args.input_dirs, - output_dir=args.output_dir, - filters=Filters( - participant_label=args.participant_label, - session_label=args.session_label, - ), - anatomical=args.anatomical, - functional=args.functional, - registration_template=args.registration_template, - runner_config=RunnerConfig( - runner=args.runner, - verbose=bool(args.verbose), - tmp_dir=args.tmp_dir, - ants_threads=args.ants_threads, - ), - ) - return 0 - - -def register_command( - subparsers: argparse._SubParsersAction, - parents: Sequence[argparse.ArgumentParser], -) -> None: - """Register ``rbc longitudinal process`` on a longitudinal subparser group.""" - parser = subparsers.add_parser( - "process", - parents=parents, - description=( - "Legacy longitudinal anat/func dispatcher (will be split into " - "dedicated subcommands in Stage 3)." - ), - help="Legacy longitudinal anat/func dispatcher", - usage=( - "rbc longitudinal process INPUT_DIR [INPUT_DIR ...] -o OUTPUT_DIR [options]" - ), - ) - parser.add_argument( - "--anatomical", - default=False, - action="store_true", - help="Use anatomical longitudinal pipeline for processing", - ) - parser.add_argument( - "--functional", - default=False, - action="store_true", - help="Use functional longitudinal pipeline for processing", - ) - - templates = parser.add_argument_group("template overrides") - templates.add_argument( - "--anat-template", - type=_validate_nifti_path, - default=None, - help="Custom brain template for anatomical registration.", - ) - - parser.set_defaults( - func=lambda args: main(LongitudinalArgs.validate_namespace(args)) - ) diff --git a/src/rbc/cli/longitudinal/qc.py b/src/rbc/cli/longitudinal/qc.py new file mode 100644 index 00000000..6ebbf264 --- /dev/null +++ b/src/rbc/cli/longitudinal/qc.py @@ -0,0 +1,56 @@ +"""``rbc longitudinal qc`` subcommand (placeholder for Stage 6).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from rbc.cli.longitudinal._base import LongitudinalBaseArgs, add_fs_license_argument +from rbc.orchestration import Filters, RunnerConfig +from rbc.orchestration.longitudinal.qc import run + +if TYPE_CHECKING: + import argparse + from collections.abc import Sequence + + +QCLongArgs = LongitudinalBaseArgs + + +def main(args: QCLongArgs) -> int: + """Run registration QC for longitudinal derivatives.""" + run( + input_dirs=args.input_dirs, + output_dir=args.output_dir, + filters=Filters( + participant_label=args.participant_label, + session_label=args.session_label, + ), + runner_config=RunnerConfig( + runner=args.runner, + verbose=bool(args.verbose), + tmp_dir=args.tmp_dir, + ants_threads=args.ants_threads, + ), + ) + return 0 + + +def register_command( + subparsers: argparse._SubParsersAction, + parents: Sequence[argparse.ArgumentParser], +) -> None: + """Register ``rbc longitudinal qc`` (Stage 6 placeholder).""" + parser = subparsers.add_parser( + "qc", + parents=parents, + description=( + "Run registration QC for longitudinal derivatives. Placeholder " + "wired up by Stage 3; full implementation ships in Stage 6." + ), + help="Longitudinal QC stage (Stage 6)", + usage=("rbc longitudinal qc INPUT_DIR [INPUT_DIR ...] -o OUTPUT_DIR [options]"), + ) + add_fs_license_argument(parser) + parser.set_defaults( + func=lambda args: main(LongitudinalBaseArgs.validate_namespace(args)) + ) diff --git a/src/rbc/cli/longitudinal/template.py b/src/rbc/cli/longitudinal/template.py index 079e04bd..87ef4cc7 100644 --- a/src/rbc/cli/longitudinal/template.py +++ b/src/rbc/cli/longitudinal/template.py @@ -2,11 +2,9 @@ from __future__ import annotations -from dataclasses import dataclass -from pathlib import Path from typing import TYPE_CHECKING -from rbc.cli.base import BaseArgs +from rbc.cli.longitudinal._base import LongitudinalBaseArgs, add_fs_license_argument from rbc.orchestration import Filters, RunnerConfig from rbc.orchestration.longitudinal.template import run @@ -15,19 +13,7 @@ from collections.abc import Sequence -@dataclass(frozen=True) -class TemplateArgs(BaseArgs): - """Arguments for ``rbc longitudinal template``.""" - - fs_license: Path | None - - @classmethod - def validate_namespace(cls, ns: argparse.Namespace) -> TemplateArgs: - """Validate base args and the optional --fs-license path.""" - fs_license: Path | None = ns.fs_license - if fs_license is not None and not fs_license.exists(): - raise ValueError(f"FreeSurfer license not found: {fs_license}") - return cls(**BaseArgs.validate_namespace(ns).__dict__, fs_license=fs_license) +TemplateArgs = LongitudinalBaseArgs def main(args: TemplateArgs) -> int: @@ -68,14 +54,7 @@ def register_command( "-o OUTPUT_DIR [options]" ), ) - parser.add_argument( - "--fs-license", - type=Path, - default=None, - help=( - "Optional path to a FreeSurfer license file. Falls back to the " - "FS_LICENSE environment variable, then to a license-free bypass " - "if neither is set." - ), + add_fs_license_argument(parser) + parser.set_defaults( + func=lambda args: main(LongitudinalBaseArgs.validate_namespace(args)) ) - parser.set_defaults(func=lambda args: main(TemplateArgs.validate_namespace(args))) diff --git a/src/rbc/orchestration/longitudinal/__init__.py b/src/rbc/orchestration/longitudinal/__init__.py index c0967357..41fc5345 100644 --- a/src/rbc/orchestration/longitudinal/__init__.py +++ b/src/rbc/orchestration/longitudinal/__init__.py @@ -2,107 +2,8 @@ from __future__ import annotations -import logging -from typing import TYPE_CHECKING - -import polars as pl -from tqdm import tqdm - -from rbc.bids import ( - FUNC_GROUP_ENTITIES, - SUB_SES_QUERY, - load_table, -) -from rbc.bids.session import iter_session_files, load_session -from rbc.context import RunContext -from rbc.orchestration import Filters, RunnerConfig, init_runner +from rbc.orchestration.longitudinal._iter import iter_sessions_with_template from rbc.orchestration.longitudinal.anatomical import process_anat from rbc.orchestration.longitudinal.functional import process_func -from rbc_resources import REGISTRATION_TEMPLATES - -if TYPE_CHECKING: - from collections.abc import Sequence - from pathlib import Path - -__all__ = ["process_anat", "process_func", "run"] - -_logger = logging.getLogger(__name__) - - -def run( - input_dirs: Sequence[Path], - output_dir: Path, - *, - filters: Filters, - anatomical: bool = True, - functional: bool = True, - registration_template: Path = REGISTRATION_TEMPLATES.brain_1mm, - runner_config: RunnerConfig | None = None, -) -> None: - """Run the longitudinal pipeline for all matching subjects/sessions. - - Args: - input_dirs: BIDS dataset directories. - output_dir: Output directory for derivatives. - filters: Participant/session/task filters. - anatomical: Run anatomical longitudinal processing. - functional: Run functional longitudinal processing. - registration_template: Brain template for ANTs registration. - runner_config: Execution backend configuration. - """ - config = runner_config or RunnerConfig() - init_runner(config) - verbose = config.verbose - - _logger.warning( - "This workflow is experimental and may be sensitive to input file " - "naming conventions." - ) - _logger.info("Preparing to run RBC longitudinal workflow") - df = load_table( - dataset_dirs=input_dirs, index_fpath=None, max_workers=0, verbose=verbose - ) - - group_df = filters.apply(df, pl.col("ses") != "longitudinal") - - for _, sub_ses_group in tqdm( - group_df.group_by(SUB_SES_QUERY, maintain_order=True), - disable=not verbose, - ): - pipe_ctx = RunContext( - sub=sub_ses_group["sub"][0], - ses=sub_ses_group["ses"][0], - output_dir=output_dir, - ) - if pipe_ctx.ses is None: - raise ValueError( - "No session data, unable to perform longitudinal processing" - ) - session = load_session(sub_ses_group, pipe_ctx.sub, pipe_ctx.ses) - tpl_df = df.filter( - pl.all_horizontal( - pl.col("sub") == pipe_ctx.sub, - pl.col("ses") == "longitudinal", - ) - ) - if tpl_df.is_empty(): - raise ValueError("No longitudinal template found") - - if anatomical: - for _, anat_df in session.anat.filter(pl.col("suffix") == "T1w").group_by( - ("run", "acq"), maintain_order=True - ): - process_anat( - pipe_ctx=pipe_ctx, - anat_df=anat_df, - tpl_df=tpl_df, - registration_template=registration_template, - ) - - if functional: - for func_df, _ in iter_session_files(session, groupby=FUNC_GROUP_ENTITIES): - process_func(pipe_ctx=pipe_ctx, func_df=func_df, tpl_df=tpl_df) - - pipe_ctx.ensure_dataset_description() - _logger.info("RBC longitudinal workflow complete") +__all__ = ["iter_sessions_with_template", "process_anat", "process_func"] diff --git a/src/rbc/orchestration/longitudinal/_iter.py b/src/rbc/orchestration/longitudinal/_iter.py new file mode 100644 index 00000000..2b5c924a --- /dev/null +++ b/src/rbc/orchestration/longitudinal/_iter.py @@ -0,0 +1,86 @@ +"""Shared subject/session iteration primitive for longitudinal orchestration. + +Lives in its own module (rather than the package ``__init__``) so per-stage +``run()`` entrypoints can import it without introducing a cycle with the +package root, which re-exports ``process_anat`` / ``process_func``. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +import polars as pl +from tqdm import tqdm + +from rbc.bids import SUB_SES_QUERY, load_table +from rbc.bids.session import load_session +from rbc.context import RunContext + +if TYPE_CHECKING: + from collections.abc import Iterator, Sequence + from pathlib import Path + + from rbc.bids.session import SessionTables + from rbc.orchestration import Filters + +__all__ = ["iter_sessions_with_template"] + +_logger = logging.getLogger(__name__) + + +def iter_sessions_with_template( + input_dirs: Sequence[Path], + output_dir: Path, + *, + filters: Filters, + verbose: bool = False, +) -> Iterator[tuple[RunContext, SessionTables, pl.DataFrame]]: + """Yield ``(pipe_ctx, session, tpl_df)`` for each matching subject/session. + + Shared iteration primitive for the longitudinal anatomical and functional + stages. Loads the BIDS table, filters out ``ses-longitudinal`` rows, then + groups by subject/session. For each non-template session a longitudinal + template must exist for that subject, otherwise ``ValueError`` is raised. + + Args: + input_dirs: BIDS dataset directories. + output_dir: Output directory for derivatives. + filters: Participant/session filters applied before grouping. + verbose: Whether to show a progress bar. + + Yields: + Tuples of ``(pipe_ctx, session, tpl_df)`` for each subject/session. + + Raises: + ValueError: If a matching session has no longitudinal template, or + has no session label. + """ + df = load_table( + dataset_dirs=input_dirs, index_fpath=None, max_workers=0, verbose=verbose + ) + group_df = filters.apply(df, pl.col("ses") != "longitudinal") + + for _, sub_ses_group in tqdm( + group_df.group_by(SUB_SES_QUERY, maintain_order=True), + disable=not verbose, + ): + pipe_ctx = RunContext( + sub=sub_ses_group["sub"][0], + ses=sub_ses_group["ses"][0], + output_dir=output_dir, + ) + if pipe_ctx.ses is None: + raise ValueError( + "No session data, unable to perform longitudinal processing" + ) + session = load_session(sub_ses_group, pipe_ctx.sub, pipe_ctx.ses) + tpl_df = df.filter( + pl.all_horizontal( + pl.col("sub") == pipe_ctx.sub, + pl.col("ses") == "longitudinal", + ) + ) + if tpl_df.is_empty(): + raise ValueError("No longitudinal template found") + yield pipe_ctx, session, tpl_df diff --git a/src/rbc/orchestration/longitudinal/all.py b/src/rbc/orchestration/longitudinal/all.py new file mode 100644 index 00000000..40fb36f3 --- /dev/null +++ b/src/rbc/orchestration/longitudinal/all.py @@ -0,0 +1,35 @@ +"""Orchestration for the combined longitudinal pipeline (not yet implemented).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Sequence + from pathlib import Path + + from rbc.orchestration import Filters, RunnerConfig + +__all__ = ["run"] + + +def run( + input_dirs: Sequence[Path], + output_dir: Path, + *, + filters: Filters, + fs_license: Path | None = None, + atlas_files: dict[str, Path] | None = None, + fwhm: float | None = None, + runner_config: RunnerConfig | None = None, +) -> None: + """Run the full longitudinal pipeline (template → anat → func → metrics → qc). + + Placeholder wired up by Stage 3; full implementation ships in Stage 6. + """ + del input_dirs, output_dir, filters, fs_license + del atlas_files, fwhm, runner_config + raise NotImplementedError( + "rbc longitudinal all is planned for Stage 6 of the longitudinal " + "refactor (tracker: #301). It is not yet implemented." + ) diff --git a/src/rbc/orchestration/longitudinal/anatomical.py b/src/rbc/orchestration/longitudinal/anatomical.py index 7162ad3f..5e46ac50 100644 --- a/src/rbc/orchestration/longitudinal/anatomical.py +++ b/src/rbc/orchestration/longitudinal/anatomical.py @@ -1,7 +1,8 @@ -"""Per-group anatomical longitudinal processing.""" +"""Orchestration for the longitudinal anatomical workflow.""" from __future__ import annotations +import logging from typing import TYPE_CHECKING import polars as pl @@ -11,16 +12,23 @@ export_longitudinal_anat, resolve_longitudinal_anat, ) +from rbc.orchestration import Filters, RunnerConfig, init_runner +from rbc.orchestration.longitudinal._iter import iter_sessions_with_template from rbc.workflows.longitudinal.anatomical import ( longitudinal_process as anatomical_longitudinal, ) from rbc_resources import REGISTRATION_TEMPLATES if TYPE_CHECKING: + from collections.abc import Sequence from pathlib import Path from rbc.context import RunContext +__all__ = ["process_anat", "run"] + +_logger = logging.getLogger(__name__) + def process_anat( pipe_ctx: RunContext, @@ -55,3 +63,48 @@ def process_anat( ) aex = anat_q.derive(entities=ents, space="longitudinal") export_longitudinal_anat(aex, outputs) + + +def run( + input_dirs: Sequence[Path], + output_dir: Path, + *, + filters: Filters, + registration_template: Path = REGISTRATION_TEMPLATES.brain_1mm, + runner_config: RunnerConfig | None = None, +) -> None: + """Run longitudinal anatomical processing for all matching subjects/sessions. + + Args: + input_dirs: BIDS dataset directories (must include preprocessed + cross-sectional anatomical derivatives and longitudinal templates). + output_dir: Output directory for derivatives. + filters: Participant/session filters applied before grouping. + registration_template: Brain template for ANTs registration. + runner_config: Execution backend configuration. + """ + config = runner_config or RunnerConfig() + init_runner(config) + verbose = config.verbose + + _logger.warning( + "This workflow is experimental and may be sensitive to input file " + "naming conventions." + ) + _logger.info("Preparing to run RBC longitudinal anatomical workflow") + + for pipe_ctx, session, tpl_df in iter_sessions_with_template( + input_dirs, output_dir, filters=filters, verbose=verbose + ): + for _, anat_df in session.anat.filter(pl.col("suffix") == "T1w").group_by( + ("run", "acq"), maintain_order=True + ): + process_anat( + pipe_ctx=pipe_ctx, + anat_df=anat_df, + tpl_df=tpl_df, + registration_template=registration_template, + ) + pipe_ctx.ensure_dataset_description() + + _logger.info("RBC longitudinal anatomical workflow complete") diff --git a/src/rbc/orchestration/longitudinal/functional.py b/src/rbc/orchestration/longitudinal/functional.py index 21b70f81..724634f9 100644 --- a/src/rbc/orchestration/longitudinal/functional.py +++ b/src/rbc/orchestration/longitudinal/functional.py @@ -1,23 +1,34 @@ -"""Per-run functional longitudinal processing.""" +"""Orchestration for the longitudinal functional workflow.""" from __future__ import annotations +import logging from typing import TYPE_CHECKING -from rbc.bids import Datatype, Suffix, extract_entities +from rbc.bids import FUNC_GROUP_ENTITIES, Datatype, Suffix, extract_entities from rbc.bids.longitudinal.functional import ( export_longitudinal_func, resolve_longitudinal_func, ) +from rbc.bids.session import iter_session_files +from rbc.orchestration import Filters, RunnerConfig, init_runner +from rbc.orchestration.longitudinal._iter import iter_sessions_with_template from rbc.workflows.longitudinal.functional import ( longitudinal_process as functional_longitudinal, ) if TYPE_CHECKING: + from collections.abc import Sequence + from pathlib import Path + import polars as pl from rbc.context import RunContext +__all__ = ["process_func", "run"] + +_logger = logging.getLogger(__name__) + def process_func( pipe_ctx: RunContext, @@ -47,3 +58,40 @@ def process_func( func_outputs = functional_longitudinal(**resolved) # type: ignore[arg-type] fex = func_q.derive(space="longitudinal") export_longitudinal_func(fex, func_outputs) + + +def run( + input_dirs: Sequence[Path], + output_dir: Path, + *, + filters: Filters, + runner_config: RunnerConfig | None = None, +) -> None: + """Run longitudinal functional processing for all matching subjects/sessions. + + Args: + input_dirs: BIDS dataset directories (must include preprocessed + cross-sectional functional derivatives, anatomical derivatives, + and longitudinal templates). + output_dir: Output directory for derivatives. + filters: Participant/session/task filters applied before grouping. + runner_config: Execution backend configuration. + """ + config = runner_config or RunnerConfig() + init_runner(config) + verbose = config.verbose + + _logger.warning( + "This workflow is experimental and may be sensitive to input file " + "naming conventions." + ) + _logger.info("Preparing to run RBC longitudinal functional workflow") + + for pipe_ctx, session, tpl_df in iter_sessions_with_template( + input_dirs, output_dir, filters=filters, verbose=verbose + ): + for func_df, _ in iter_session_files(session, groupby=FUNC_GROUP_ENTITIES): + process_func(pipe_ctx=pipe_ctx, func_df=func_df, tpl_df=tpl_df) + pipe_ctx.ensure_dataset_description() + + _logger.info("RBC longitudinal functional workflow complete") diff --git a/src/rbc/orchestration/longitudinal/metrics.py b/src/rbc/orchestration/longitudinal/metrics.py new file mode 100644 index 00000000..8aaf699f --- /dev/null +++ b/src/rbc/orchestration/longitudinal/metrics.py @@ -0,0 +1,33 @@ +"""Orchestration for the longitudinal metrics workflow (not yet implemented).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Sequence + from pathlib import Path + + from rbc.orchestration import Filters, RunnerConfig + +__all__ = ["run"] + + +def run( + input_dirs: Sequence[Path], + output_dir: Path, + *, + filters: Filters, + atlas_files: dict[str, Path], + fwhm: float, + runner_config: RunnerConfig | None = None, +) -> None: + """Compute resting-state metrics in longitudinal space. + + Placeholder wired up by Stage 3; full implementation ships in Stage 6. + """ + del input_dirs, output_dir, filters, atlas_files, fwhm, runner_config + raise NotImplementedError( + "rbc longitudinal metrics is planned for Stage 6 of the longitudinal " + "refactor (tracker: #301). It is not yet implemented." + ) diff --git a/src/rbc/orchestration/longitudinal/qc.py b/src/rbc/orchestration/longitudinal/qc.py new file mode 100644 index 00000000..278a41ed --- /dev/null +++ b/src/rbc/orchestration/longitudinal/qc.py @@ -0,0 +1,31 @@ +"""Orchestration for the longitudinal QC workflow (not yet implemented).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Sequence + from pathlib import Path + + from rbc.orchestration import Filters, RunnerConfig + +__all__ = ["run"] + + +def run( + input_dirs: Sequence[Path], + output_dir: Path, + *, + filters: Filters, + runner_config: RunnerConfig | None = None, +) -> None: + """Run registration QC for longitudinal derivatives. + + Placeholder wired up by Stage 3; full implementation ships in Stage 6. + """ + del input_dirs, output_dir, filters, runner_config + raise NotImplementedError( + "rbc longitudinal qc is planned for Stage 6 of the longitudinal " + "refactor (tracker: #301). It is not yet implemented." + ) diff --git a/tests/unit/cli/test_longitudinal.py b/tests/unit/cli/test_longitudinal.py index 794ffacc..4608cd01 100644 --- a/tests/unit/cli/test_longitudinal.py +++ b/tests/unit/cli/test_longitudinal.py @@ -1,64 +1,215 @@ -"""Unit tests for Longitudinal CLI module.""" +"""Unit tests for ``rbc longitudinal`` subcommand argparse wiring.""" from __future__ import annotations import argparse from typing import TYPE_CHECKING +from unittest.mock import patch import pytest +from rbc.cli.longitudinal._base import LongitudinalBaseArgs +from rbc.cli.longitudinal.all import AllLongArgs +from rbc.cli.longitudinal.anatomical import AnatomicalLongArgs +from rbc.cli.longitudinal.functional import FunctionalLongArgs +from rbc.cli.longitudinal.metrics import MetricsLongArgs +from rbc.cli.main import cli + if TYPE_CHECKING: from pathlib import Path -from rbc.cli.longitudinal.process import LongitudinalArgs - @pytest.fixture -def base_args(tmp_path: Path) -> argparse.Namespace: - """Fixture for base argument namespace.""" +def base_ns(tmp_path: Path) -> argparse.Namespace: + """Minimal argparse namespace shared across longitudinal subcommands.""" input_dir = tmp_path / "input" - input_dir.touch() + input_dir.mkdir() return argparse.Namespace( runner="local", - verbose=False, + verbose=0, input_dirs=[input_dir], output_dir=tmp_path / "output", participant_label=[], session_label=[], - anatomical=True, - functional=False, tmp_dir=None, - anat_template=None, ants_threads=1, + fs_license=None, ) -class TestLongitudinalArgs: - """Tests for LongitudinalArgs validation.""" +class TestLongitudinalBaseArgs: + """Tests for the shared longitudinal base-args validator.""" - @pytest.mark.parametrize( - ("anat", "func"), [(True, False), (False, True), (True, True)] - ) - def test_valid_flag_combinations( - self, - base_args: argparse.Namespace, - anat: bool, # noqa: FBT001 - func: bool, # noqa: FBT001 + def test_no_license_is_allowed(self, base_ns: argparse.Namespace) -> None: + """``--fs-license`` is optional; ``None`` is a valid default.""" + args = LongitudinalBaseArgs.validate_namespace(base_ns) + assert args.fs_license is None + + def test_existing_license_accepted( + self, base_ns: argparse.Namespace, tmp_path: Path ) -> None: - """Test different combination of valid longitudinal flags.""" - base_args.anatomical, base_args.functional = anat, func - args = LongitudinalArgs.validate_namespace(base_args) - assert args.anatomical is anat - assert args.functional is func - - def test_no_flags_raises(self, base_args: argparse.Namespace) -> None: - """Test error raised if no processing selected.""" - base_args.anatomical = base_args.functional = False - with pytest.raises(ValueError, match="At least one of"): - LongitudinalArgs.validate_namespace(base_args) - - def test_defaults(self, base_args: argparse.Namespace) -> None: - """Test defaults.""" - args = LongitudinalArgs.validate_namespace(base_args) - assert args.participant_label == [] - assert args.session_label == [] + """An existing ``--fs-license`` path round-trips through validation.""" + lic = tmp_path / "license.txt" + lic.touch() + base_ns.fs_license = lic + args = LongitudinalBaseArgs.validate_namespace(base_ns) + assert args.fs_license == lic + + def test_missing_license_rejected( + self, base_ns: argparse.Namespace, tmp_path: Path + ) -> None: + """A non-existent ``--fs-license`` path raises ``ValueError``.""" + base_ns.fs_license = tmp_path / "nope.txt" + with pytest.raises(ValueError, match="not found"): + LongitudinalBaseArgs.validate_namespace(base_ns) + + +class TestAnatomicalLongArgs: + """Tests for the anatomical longitudinal subcommand validator.""" + + def test_defaults(self, base_ns: argparse.Namespace) -> None: + """Anat template defaults to the bundled 1 mm registration template.""" + base_ns.anat_template = None + args = AnatomicalLongArgs.validate_namespace(base_ns) + assert args.registration_template.name.endswith(".nii.gz") + + +class TestFunctionalLongArgs: + """Tests for the functional longitudinal subcommand validator.""" + + def test_valid_task(self, base_ns: argparse.Namespace) -> None: + """Alphanumeric task labels pass validation.""" + base_ns.task = "rest" + args = FunctionalLongArgs.validate_namespace(base_ns) + assert args.task == "rest" + + def test_invalid_task_rejected(self, base_ns: argparse.Namespace) -> None: + """Task labels with special characters are rejected.""" + base_ns.task = "rest/invalid" + with pytest.raises(ValueError, match="Task"): + FunctionalLongArgs.validate_namespace(base_ns) + + +class TestMetricsLongArgs: + """Tests for the metrics longitudinal subcommand validator.""" + + def test_defaults(self, base_ns: argparse.Namespace) -> None: + """FWHM defaults to 6 mm and atlas resolves from the registry.""" + base_ns.atlas = ["schaefer_200"] + base_ns.fwhm = 6.0 + base_ns.task = None + args = MetricsLongArgs.validate_namespace(base_ns) + assert args.fwhm == 6.0 + assert "schaefer_200" in args.atlas_files + + def test_nonpositive_fwhm_rejected(self, base_ns: argparse.Namespace) -> None: + """FWHM must be strictly positive.""" + base_ns.atlas = ["schaefer_200"] + base_ns.fwhm = 0.0 + base_ns.task = None + with pytest.raises(ValueError, match="FWHM"): + MetricsLongArgs.validate_namespace(base_ns) + + +class TestAllLongArgs: + """Tests for the combined longitudinal subcommand validator.""" + + def test_defaults(self, base_ns: argparse.Namespace) -> None: + """Defaults resolve to atlas registry + bundled 1 mm template.""" + base_ns.anat_template = None + base_ns.atlas = ["schaefer_200"] + base_ns.fwhm = 6.0 + args = AllLongArgs.validate_namespace(base_ns) + assert args.fwhm == 6.0 + assert "schaefer_200" in args.atlas_files + assert args.registration_template.name.endswith(".nii.gz") + + +class TestParentSubparser: + """Tests for ``rbc longitudinal --help`` and alias wiring.""" + + def test_help_lists_all_stages(self, capsys: pytest.CaptureFixture[str]) -> None: + """All six stages show up in the parent-subparser help output.""" + cli(["longitudinal", "--help"]) + out = capsys.readouterr().out + for stage in ("template", "anatomical", "functional", "metrics", "qc", "all"): + assert stage in out + + def test_long_alias_help_matches(self, capsys: pytest.CaptureFixture[str]) -> None: + """``rbc long --help`` exposes the same stages as ``rbc longitudinal``.""" + cli(["long", "--help"]) + out = capsys.readouterr().out + for stage in ("template", "anatomical", "functional", "metrics", "qc", "all"): + assert stage in out + + +class TestLongitudinalDispatch: + """Tests for end-to-end argparse → orchestration wiring per subcommand.""" + + def test_anatomical_dispatches(self, tmp_path: Path) -> None: + """``rbc longitudinal anatomical`` routes to the anat orchestration.""" + input_dir = tmp_path / "input" + input_dir.mkdir() + output_dir = tmp_path / "output" + + with patch("rbc.cli.longitudinal.anatomical.run") as mock_run: + rc = cli( + [ + "longitudinal", + "anatomical", + str(input_dir), + "-o", + str(output_dir), + ] + ) + assert rc == 0 + mock_run.assert_called_once() + kwargs = mock_run.call_args.kwargs + assert kwargs["input_dirs"] == (input_dir,) + assert kwargs["output_dir"] == output_dir + + def test_functional_dispatches(self, tmp_path: Path) -> None: + """``rbc longitudinal functional`` routes to the func orchestration.""" + input_dir = tmp_path / "input" + input_dir.mkdir() + output_dir = tmp_path / "output" + + with patch("rbc.cli.longitudinal.functional.run") as mock_run: + rc = cli( + [ + "longitudinal", + "functional", + str(input_dir), + "-o", + str(output_dir), + ] + ) + assert rc == 0 + mock_run.assert_called_once() + + def test_metrics_subcommand_registers(self, tmp_path: Path) -> None: + """Metrics subcommand is registered; Stage 6 raises NotImplementedError.""" + input_dir = tmp_path / "input" + input_dir.mkdir() + output_dir = tmp_path / "output" + + with pytest.raises(NotImplementedError, match="Stage 6"): + cli(["longitudinal", "metrics", str(input_dir), "-o", str(output_dir)]) + + def test_qc_subcommand_registers(self, tmp_path: Path) -> None: + """QC subcommand is registered; Stage 6 raises NotImplementedError.""" + input_dir = tmp_path / "input" + input_dir.mkdir() + output_dir = tmp_path / "output" + + with pytest.raises(NotImplementedError, match="Stage 6"): + cli(["longitudinal", "qc", str(input_dir), "-o", str(output_dir)]) + + def test_all_subcommand_registers(self, tmp_path: Path) -> None: + """All subcommand is registered; Stage 6 raises NotImplementedError.""" + input_dir = tmp_path / "input" + input_dir.mkdir() + output_dir = tmp_path / "output" + + with pytest.raises(NotImplementedError, match="Stage 6"): + cli(["longitudinal", "all", str(input_dir), "-o", str(output_dir)]) diff --git a/tests/unit/orchestration/test_longitudinal.py b/tests/unit/orchestration/test_longitudinal.py index 6d01a51a..86d80024 100644 --- a/tests/unit/orchestration/test_longitudinal.py +++ b/tests/unit/orchestration/test_longitudinal.py @@ -130,23 +130,6 @@ def func_df_full() -> pl.DataFrame: ) -@pytest.fixture -def mixed_df() -> pl.DataFrame: - """Anat + func dataframe for both-flags tests.""" - return _df( - _anat_row("01", "baseline"), - _anat_row("01", "vis2"), - _anat_row("02", "baseline"), - _anat_row("02", "vis2"), - _func_row("01", "baseline"), - _func_row("01", "vis2"), - _func_row("02", "baseline"), - _func_row("02", "vis2"), - _anat_row("01", "longitudinal"), - _anat_row("02", "longitudinal"), - ) - - def _make_groups( df: pl.DataFrame, participant: list[str], session: list[str] ) -> list[tuple]: @@ -190,13 +173,10 @@ def _side_effect(*_args, **_kwargs) -> list: # noqa: ANN002, ANN003 @contextmanager -def _patch_run( +def _patch_anat_run( full_df: pl.DataFrame, - groups: list[tuple], - *, - with_bold_mask: bool = True, -) -> Generator[tuple[Mock, Mock, Mock], None, None]: - """Patch external calls made by orchestration.longitudinal.run().""" +) -> Generator[tuple[Mock, Mock], None, None]: + """Patch external calls made by orchestration.longitudinal.anatomical.run().""" from rbc.bids.session import SessionTables mock_anat_df = pl.DataFrame( @@ -213,12 +193,11 @@ def _patch_run( ) mock_session = SessionTables(anat=mock_anat_df, func=None) with ( - patch("rbc.orchestration.longitudinal.init_runner"), - patch("rbc.orchestration.longitudinal.load_table", return_value=full_df), - patch("rbc.orchestration.longitudinal.load_session", return_value=mock_session), + patch("rbc.orchestration.longitudinal.anatomical.init_runner"), + patch("rbc.orchestration.longitudinal._iter.load_table", return_value=full_df), patch( - "rbc.orchestration.longitudinal.iter_session_files", - side_effect=_build_iter_side_effect(groups), + "rbc.orchestration.longitudinal._iter.load_session", + return_value=mock_session, ), patch( "rbc.bids.query.find_file", @@ -228,13 +207,56 @@ def _patch_run( "rbc.orchestration.longitudinal.anatomical.anatomical_longitudinal", return_value=_mock_anat_outputs(), ) as mock_anat, + patch("rbc.orchestration.longitudinal._iter.RunContext") as mock_ctx_cls, + ): + yield mock_anat, mock_ctx_cls + + +@contextmanager +def _patch_func_run( + full_df: pl.DataFrame, + groups: list[tuple], + *, + with_bold_mask: bool = True, +) -> Generator[tuple[Mock, Mock], None, None]: + """Patch external calls made by orchestration.longitudinal.functional.run().""" + from rbc.bids.session import SessionTables + + mock_anat_df = pl.DataFrame( + { + "suffix": ["T1w"], + "ext": [".nii.gz"], + "run": [None], + "acq": [None], + "space": [None], + "desc": [None], + "root": ["/data"], + "path": ["sub-01/ses-baseline/anat/sub-01_ses-baseline_T1w.nii.gz"], + } + ) + mock_session = SessionTables(anat=mock_anat_df, func=None) + with ( + patch("rbc.orchestration.longitudinal.functional.init_runner"), + patch("rbc.orchestration.longitudinal._iter.load_table", return_value=full_df), + patch( + "rbc.orchestration.longitudinal._iter.load_session", + return_value=mock_session, + ), + patch( + "rbc.orchestration.longitudinal.functional.iter_session_files", + side_effect=_build_iter_side_effect(groups), + ), + patch( + "rbc.bids.query.find_file", + return_value=Path("fake_workdir/file.nii.gz"), + ), patch( "rbc.orchestration.longitudinal.functional.functional_longitudinal", return_value=_mock_func_outputs(with_bold_mask=with_bold_mask), ) as mock_func, - patch("rbc.orchestration.longitudinal.RunContext") as mock_ctx_cls, + patch("rbc.orchestration.longitudinal._iter.RunContext") as mock_ctx_cls, ): - yield mock_anat, mock_func, mock_ctx_cls + yield mock_func, mock_ctx_cls class TestProcessAnat: @@ -381,8 +403,8 @@ def test_optional_bold_mask_file_not_found( assert mock_copy.call_count == 3 -class TestLongitudinalDispatch: - """Tests for run() dispatch logic (not filtering, which is in test_filters).""" +class TestLongitudinalAnatomicalRun: + """Tests for the longitudinal anatomical orchestration entrypoint.""" def test_missing_template_raises( self, @@ -390,14 +412,10 @@ def test_missing_template_raises( tmp_path: Path, ) -> None: """Missing longitudinal template raises ValueError.""" - from rbc.orchestration.longitudinal import run + from rbc.orchestration.longitudinal.anatomical import run df_no_tpl = anat_df_full.filter(pl.col("ses") != "longitudinal") - with _patch_run(df_no_tpl, _make_groups(df_no_tpl, [], [])) as ( - _, - __, - mock_ctx_cls, - ): + with _patch_anat_run(df_no_tpl) as (_, mock_ctx_cls): mock_ctx_cls.return_value = Mock(sub="01", ses="baseline") with pytest.raises(ValueError, match="No longitudinal template found"): run( @@ -406,75 +424,43 @@ def test_missing_template_raises( filters=Filters(), ) - def test_functional_false_skips_func( + def test_dispatches_anat_processing( self, anat_df_full: pl.DataFrame, tmp_path: Path, ) -> None: - """Functional processing is skipped when functional=False.""" - from rbc.orchestration.longitudinal import run + """Anatomical processing dispatches for each matching session.""" + from rbc.orchestration.longitudinal.anatomical import run - with ( - _patch_run(anat_df_full, _make_groups(anat_df_full, [], [])) as ( - _, - __, - mock_ctx_cls, - ), - patch("rbc.orchestration.longitudinal.process_func") as mock_pf, - ): + with _patch_anat_run(anat_df_full) as (mock_anat, mock_ctx_cls): mock_ctx_cls.return_value = Mock(sub="01", ses="baseline") run( input_dirs=[tmp_path], output_dir=tmp_path, - filters=Filters(), - functional=False, + filters=Filters(participant_label=["01"], session_label=["baseline"]), ) - mock_pf.assert_not_called() + mock_anat.assert_called_once() - def test_functional_true_dispatches( - self, - func_df_full: pl.DataFrame, - tmp_path: Path, - ) -> None: - """Functional processing dispatches when functional=True.""" - from rbc.orchestration.longitudinal import run - with _patch_run( - func_df_full, _make_groups(func_df_full, ["01"], ["baseline"]) - ) as (mock_anat, mock_func, mock_ctx_cls): - mock_ctx_cls.return_value = Mock(sub="01", ses="baseline") - run( - input_dirs=[tmp_path], - output_dir=tmp_path, - filters=Filters(participant_label=["01"], session_label=["baseline"]), - anatomical=False, - functional=True, - ) - mock_func.assert_called_once() - mock_anat.assert_not_called() +class TestLongitudinalFunctionalRun: + """Tests for the longitudinal functional orchestration entrypoint.""" - def test_both_flags_dispatch( + def test_dispatches_func_processing( self, - mixed_df: pl.DataFrame, + func_df_full: pl.DataFrame, tmp_path: Path, ) -> None: - """Both anatomical and functional dispatch when both flags are True.""" - from rbc.orchestration.longitudinal import run + """Functional processing dispatches for each matching BOLD run.""" + from rbc.orchestration.longitudinal.functional import run - with _patch_run(mixed_df, _make_groups(mixed_df, ["01"], ["baseline"])) as ( - mock_anat, - mock_func, - mock_ctx_cls, - ): + groups = _make_groups(func_df_full, ["01"], ["baseline"]) + with _patch_func_run(func_df_full, groups) as (mock_func, mock_ctx_cls): mock_ctx_cls.return_value = Mock(sub="01", ses="baseline") run( input_dirs=[tmp_path], output_dir=tmp_path, filters=Filters(participant_label=["01"], session_label=["baseline"]), - anatomical=True, - functional=True, ) - mock_anat.assert_called_once() mock_func.assert_called_once() def test_experimental_warning_emitted( @@ -483,15 +469,17 @@ def test_experimental_warning_emitted( caplog: pytest.LogCaptureFixture, ) -> None: """Experimental warning is logged when run() starts.""" - from rbc.orchestration.longitudinal import run + from rbc.orchestration.longitudinal.functional import run empty_df = pl.DataFrame( {c: [] for c in ["sub", "ses", "datatype", "suffix", "space", "task"]} ) with ( caplog.at_level(logging.WARNING), - patch("rbc.orchestration.longitudinal.init_runner"), - patch("rbc.orchestration.longitudinal.load_table", return_value=empty_df), + patch("rbc.orchestration.longitudinal.functional.init_runner"), + patch( + "rbc.orchestration.longitudinal._iter.load_table", return_value=empty_df + ), ): run( input_dirs=[tmp_path],