diff --git a/CPAC/func_preproc/func_preproc.py b/CPAC/func_preproc/func_preproc.py index 421f1535d..d098b6186 100644 --- a/CPAC/func_preproc/func_preproc.py +++ b/CPAC/func_preproc/func_preproc.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012-2023 C-PAC Developers +# Copyright (C) 2012-2025 C-PAC Developers # This file is part of C-PAC. @@ -16,13 +16,15 @@ # License along with C-PAC. If not, see . """Functional preprocessing.""" +from typing import TYPE_CHECKING + # pylint: disable=ungrouped-imports,wrong-import-order,wrong-import-position from nipype.interfaces import afni, ants, fsl, utility as util from nipype.interfaces.afni import preprocess, utils as afni_utils from CPAC.func_preproc.utils import get_num_slices, interpolate_slice_timing, nullify from CPAC.pipeline import nipype_pipeline_engine as pe -from CPAC.pipeline.nodeblock import nodeblock +from CPAC.pipeline.nodeblock import nodeblock, NODEBLOCK_RETURN, POOL_RESOURCE_DICT from CPAC.utils.interfaces import Function from CPAC.utils.interfaces.ants import ( AI, # niworkflows @@ -31,6 +33,10 @@ ) from CPAC.utils.utils import add_afni_prefix, afni_3dwarp +if TYPE_CHECKING: + from CPAC.pipeline.engine import ResourcePool + from CPAC.utils.configuration import Configuration + def collect_arguments(*args): """Collect arguments.""" @@ -1890,7 +1896,9 @@ def bold_masking(wf, cfg, strat_pool, pipe_num, opt=None): ["functional_preproc", "run"], ["functional_preproc", "template_space_func_masking", "run"], ], - inputs=[("space-template_desc-preproc_bold", "space-template_desc-bold_mask")], + inputs=[ + ("space-template_desc-preproc_bold", "space-template_desc-bold_mask"), + ], outputs={ "space-template_desc-preproc_bold": { "Description": "The skull-stripped BOLD time-series.", @@ -1906,7 +1914,13 @@ def bold_masking(wf, cfg, strat_pool, pipe_num, opt=None): }, }, ) -def template_space_bold_masking(wf, cfg, strat_pool, pipe_num, opt=None): +def template_space_bold_masking( + wf: pe.Workflow, + cfg: "Configuration", + strat_pool: "ResourcePool", + pipe_num: int, + opt: None = None, +) -> NODEBLOCK_RETURN: """Mask the bold in template space.""" func_apply_mask = pe.Node( interface=afni_utils.Calc(), @@ -1924,13 +1938,13 @@ def template_space_bold_masking(wf, cfg, strat_pool, pipe_num, opt=None): node, out = strat_pool.get_data("space-template_desc-bold_mask") wf.connect(node, out, func_apply_mask, "in_file_b") - outputs = { + outputs: POOL_RESOURCE_DICT = { "space-template_desc-preproc_bold": (func_apply_mask, "out_file"), "space-template_desc-brain_bold": (func_apply_mask, "out_file"), "space-template_desc-head_bold": (node_head_bold, out_head_bold), } - return (wf, outputs) + return wf, outputs @nodeblock( diff --git a/CPAC/nuisance/nuisance.py b/CPAC/nuisance/nuisance.py index 90d39c18a..9504f0a3b 100644 --- a/CPAC/nuisance/nuisance.py +++ b/CPAC/nuisance/nuisance.py @@ -40,8 +40,8 @@ TR_string_to_float, ) from CPAC.pipeline import nipype_pipeline_engine as pe -from CPAC.pipeline.engine import ResourcePool -from CPAC.pipeline.nodeblock import nodeblock +from CPAC.pipeline.engine import NodeData, ResourcePool +from CPAC.pipeline.nodeblock import nodeblock, NODEBLOCK_RETURN, POOL_RESOURCE_DICT from CPAC.registration.registration import ( apply_transform, warp_timeseries_to_EPItemplate, @@ -2457,8 +2457,15 @@ def nuisance_regressors_generation_EPItemplate(wf, cfg, strat_pool, pipe_num, op inputs=[ ( "desc-preproc_bold", - "space-bold_desc-brain_mask", + "desc-reorient_bold", + "sbref", + [ + "space-bold_desc-brain_mask", + "space-template_desc-bold_mask", + "space-template_desc-brain_mask", + ], "from-bold_to-T1w_mode-image_desc-linear_xfm", + "from-template_to-bold_mode-image_xfm", "desc-movementParameters_motion", "framewise-displacement-jenkinson", "framewise-displacement-power", @@ -2486,7 +2493,13 @@ def nuisance_regressors_generation_EPItemplate(wf, cfg, strat_pool, pipe_num, op "lateral-ventricles-mask", "TR", ], - outputs=["desc-confounds_timeseries", "censor-indices"], + outputs={ + "desc-confounds_timeseries": {}, + "censor-indices": {}, + "space-bold_desc-brain_mask": { + "Description": "Binary brain mask of the BOLD functional time-series, transformed from template space." + }, + }, ) def nuisance_regressors_generation_T1w(wf, cfg, strat_pool, pipe_num, opt=None): return nuisance_regressors_generation(wf, cfg, strat_pool, pipe_num, opt, "T1w") @@ -2499,38 +2512,37 @@ def nuisance_regressors_generation( pipe_num: int, opt: dict, space: Literal["T1w", "bold"], -) -> tuple[Workflow, dict]: - """Generate nuisance regressors. - - Parameters - ---------- - wf : ~nipype.pipeline.engine.workflows.Workflow - - cfg : ~CPAC.utils.configuration.Configuration - - strat_pool : ~CPAC.pipeline.engine.ResourcePool - - pipe_num : int - - opt : dict - - space : str - T1w or bold - - Returns - ------- - wf : nipype.pipeline.engine.workflows.Workflow +) -> NODEBLOCK_RETURN: + """Generate nuisance regressors.""" + from CPAC.nuisance.utils.xfm import transform_bold_mask_to_native - outputs : dict - """ prefixes = [f"space-{space}_"] * 2 reg_tool = None + outputs: POOL_RESOURCE_DICT = {} + + brain_mask = ( + strat_pool.node_data("space-bold_desc-brain_mask") + if strat_pool.check_rpool("space-bold_desc-brain_mask") + else NodeData() + ) if space == "T1w": prefixes[0] = "" if strat_pool.check_rpool("from-template_to-T1w_mode-image_desc-linear_xfm"): reg_tool = strat_pool.reg_tool( "from-template_to-T1w_mode-image_desc-linear_xfm" ) + if brain_mask.node is NotImplemented: + if reg_tool and strat_pool.check_rpool( + ["space-template_desc-bold_mask", "space-template_desc-brain_mask"] + ): + outputs["space-bold_desc-brain_mask"] = ( + transform_bold_mask_to_native( + wf, strat_pool, cfg, pipe_num, reg_tool + ) + ) + brain_mask.node, brain_mask.out = outputs[ + "space-bold_desc-brain_mask" + ] elif space == "bold": reg_tool = strat_pool.reg_tool( "from-EPItemplate_to-bold_mode-image_desc-linear_xfm" @@ -2575,8 +2587,12 @@ def nuisance_regressors_generation( node, out = strat_pool.get_data("desc-preproc_bold") wf.connect(node, out, regressors, "inputspec.functional_file_path") - node, out = strat_pool.get_data("space-bold_desc-brain_mask") - wf.connect(node, out, regressors, "inputspec.functional_brain_mask_file_path") + wf.connect( + brain_mask.node, + brain_mask.out, + regressors, + "inputspec.functional_brain_mask_file_path", + ) if strat_pool.check_rpool(f"desc-brain_{space}"): node, out = strat_pool.get_data(f"desc-brain_{space}") @@ -2738,12 +2754,13 @@ def nuisance_regressors_generation( node, out = strat_pool.get_data("TR") wf.connect(node, out, regressors, "inputspec.tr") - outputs = { - "desc-confounds_timeseries": (regressors, "outputspec.regressors_file_path"), - "censor-indices": (regressors, "outputspec.censor_indices"), - } + outputs["desc-confounds_timeseries"] = ( + regressors, + "outputspec.regressors_file_path", + ) + outputs["censor-indices"] = (regressors, "outputspec.censor_indices") - return (wf, outputs) + return wf, outputs def nuisance_regression(wf, cfg, strat_pool, pipe_num, opt, space, res=None): diff --git a/CPAC/nuisance/utils/xfm.py b/CPAC/nuisance/utils/xfm.py new file mode 100644 index 000000000..3ee1b5942 --- /dev/null +++ b/CPAC/nuisance/utils/xfm.py @@ -0,0 +1,69 @@ +# Copyright (C) 2025 C-PAC Developers + +# This file is part of C-PAC. + +# C-PAC is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. + +# C-PAC is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +# License for more details. + +# You should have received a copy of the GNU Lesser General Public +# License along with C-PAC. If not, see . +"""Transformation utilities for nuisance regression.""" + +from typing import cast, Literal + +from nipype.pipeline.engine import Workflow + +from CPAC.pipeline.engine import ResourcePool +from CPAC.registration.registration import apply_transform +from CPAC.utils.configuration import Configuration + + +def transform_bold_mask_to_native( + wf: Workflow, + strat_pool: ResourcePool, + cfg: Configuration, + pipe_num: int, + reg_tool: Literal["ants", "fsl"], +) -> tuple[Workflow, str]: + """Transform a template-space BOLD mask to native space.""" + num_cpus = cast( + int, cfg["pipeline_setup", "system_config", "max_cores_per_participant"] + ) + num_ants_cores = cast( + int, cfg["pipeline_setup", "system_config", "num_ants_threads"] + ) + apply_xfm = apply_transform( + f"xfm_from-template_to-bold_mask_{pipe_num}", + reg_tool, + time_series=True, + num_cpus=num_cpus, + num_ants_cores=num_ants_cores, + ) + apply_xfm.inputs.inputspec.interpolation = cfg[ + "registration_workflows", + "functional_registration", + "func_registration_to_template", + f"{'ANTs' if reg_tool == 'ants' else 'FNIRT'}_pipelines", + "interpolation", + ] + sbref = strat_pool.node_data("sbref") + bold_mask = strat_pool.node_data( + ["space-template_desc-bold_mask", "space-template_desc-brain_mask"] + ) + xfm = strat_pool.node_data("from-template_to-bold_mode-image_xfm") + wf.connect( + [ + (bold_mask.node, apply_xfm, [(bold_mask.out, "inputspec.input_image")]), + (sbref.node, apply_xfm, [(sbref.out, "inputspec.reference")]), + (xfm.node, apply_xfm, [(xfm.out, "inputspec.transform")]), + ] + ) + + return apply_xfm, "outputspec.output_image" diff --git a/CPAC/pipeline/cpac_runner.py b/CPAC/pipeline/cpac_runner.py index 425eefb91..0a0f58bbd 100644 --- a/CPAC/pipeline/cpac_runner.py +++ b/CPAC/pipeline/cpac_runner.py @@ -1,4 +1,4 @@ -# Copyright (C) 2022-2024 C-PAC Developers +# Copyright (C) 2022-2025 C-PAC Developers # This file is part of C-PAC. @@ -19,10 +19,12 @@ from multiprocessing import Process import os from time import strftime +from typing import Optional import warnings from voluptuous.error import Invalid import yaml +from nipype.pipeline.plugins.base import PluginBase as Plugin from CPAC.longitudinal_pipeline.longitudinal_workflow import anat_longitudinal_wf from CPAC.pipeline.utils import get_shell @@ -257,15 +259,15 @@ def run_T1w_longitudinal(sublist, cfg): def run( # noqa: PLR0915 - subject_list_file, - config_file=None, - p_name=None, - plugin=None, - plugin_args=None, - tracking=True, - num_subs_at_once=None, - debug=False, - test_config=False, + subject_list_file: str, + config_file: Optional[str] = None, + p_name: Optional[str] = None, + plugin: Optional[str | Plugin] = None, + plugin_args: Optional[dict] = None, + tracking: bool = True, + num_subs_at_once: Optional[int] = None, + debug: bool = False, + test_config: bool = False, ) -> int: """Run C-PAC subjects via job queue. diff --git a/CPAC/pipeline/test/test_connect_pipeline.py b/CPAC/pipeline/test/test_connect_pipeline.py new file mode 100644 index 000000000..6677fe7c2 --- /dev/null +++ b/CPAC/pipeline/test/test_connect_pipeline.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +# Copyright (C) 2025 C-PAC Developers + +# This file is part of C-PAC. + +# C-PAC is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. + +# C-PAC is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +# License for more details. + +# You should have received a copy of the GNU Lesser General Public +# License along with C-PAC. If not, see . +"""Test pipeline connections.""" + +from logging import INFO +import multiprocessing.resource_tracker +from pathlib import Path +from typing import Callable + +import pytest +import yaml + +from CPAC.pipeline.cpac_runner import run +from CPAC.utils.configuration.configuration import Preconfiguration +from CPAC.utils.configuration.yaml_template import create_yaml_from_template +from CPAC.utils.monitoring import log_nodes_cb + +_unregister = multiprocessing.resource_tracker.unregister + + +def safe_unregister(name, rtype) -> None: + """Suppress unregister warnings.""" + try: + _unregister(name, rtype) + except KeyError: + pass + + +multiprocessing.resource_tracker.unregister = safe_unregister + + +@pytest.mark.parametrize("preconfig", ["abcd-options"]) +def test_config( + caplog: pytest.LogCaptureFixture, preconfig: str, tmp_path: Path +) -> None: + """Run 'test_config' analysis level.""" + caplog.set_level(INFO) + data_config_file = tmp_path / "data_config.yaml" + with data_config_file.open("w") as _f: + yaml.dump( + [ + { + "anat": "s3://fcp-indi/data/Projects/ADHD200/RawDataBIDS/KKI/sub-1019436/ses-1/anat/sub-1019436_ses-1_run-1_T1w.nii.gz", + "func": { + "rest_acq-1_run-1": { + "scan": "s3://fcp-indi/data/Projects/ADHD200/RawDataBIDS/KKI/sub-1019436/ses-1/func/sub-1019436_ses-1_task-rest_acq-1_run-1_bold.nii.gz", + "scan_parameters": "s3://fcp-indi/data/Projects/ADHD200/RawDataBIDS/KKI/task-rest_acq-1_bold.json", + } + }, + "site": "KKI", + "subject_id": "1019436", + "unique_id": "1", + } + ], + _f, + ) + + # output in tmp_path/outputs + pipeline = Preconfiguration(preconfig) + output_dir = tmp_path / "outputs" + output_dir.mkdir(parents=True, exist_ok=True) + pipeline["pipeline_setup", "log_directory", "path"] = str(output_dir / "log") + pipeline["pipeline_setup", "output_directory", "path"] = str(output_dir / "out") + pipeline["pipeline_setup", "working_directory", "path"] = str( + output_dir / "working" + ) + pipeline_file = tmp_path / "pipe_config.yaml" + with pipeline_file.open("w") as _f: + _f.write(create_yaml_from_template(pipeline, preconfig, preconfig, True)) + + plugin = "MultiProc" + plugin_args: dict[str, int | bool | Callable] = { + "n_procs": 2, + "memory_gb": 10, + "raise_insufficient": True, + "status_callback": log_nodes_cb, + } + tracking = False + exitcode = run( + str(data_config_file), + str(pipeline_file), + plugin=plugin, + plugin_args=plugin_args, + tracking=tracking, + test_config=True, + ) + if exitcode != 0: + records = list(caplog.records) + msg: str + msg = str(records[-1]) + if hasattr(records[-1], "exc_info"): + exc_info = records[-1].exc_info + if ( + exc_info + and exc_info[0] + and exc_info[1] + and hasattr(exc_info[1], "args") + ): + msg = exc_info[1].args[0] + raise exc_info[0](exc_info[1]) + raise AssertionError(msg) diff --git a/CPAC/registration/registration.py b/CPAC/registration/registration.py index 8d9795128..d001b0055 100644 --- a/CPAC/registration/registration.py +++ b/CPAC/registration/registration.py @@ -49,13 +49,13 @@ def apply_transform( - wf_name, - reg_tool, - time_series=False, - multi_input=False, - num_cpus=1, - num_ants_cores=1, -): + wf_name: str, + reg_tool: Literal["ants", "fsl"], + time_series: bool = False, + multi_input: bool = False, + num_cpus: int = 1, + num_ants_cores: int = 1, +) -> pe.Workflow: """Apply transform.""" if not reg_tool: msg = ( @@ -101,7 +101,7 @@ def apply_transform( ) apply_warp.inputs.dimension = 3 - apply_warp.interface.num_threads = int(num_ants_cores) + apply_warp.inputs.num_threads = int(num_ants_cores) if time_series: apply_warp.inputs.input_image_type = 3