|
| 1 | +# Copyright (C) 2021-2025 C-PAC Developers |
| 2 | + |
| 3 | +# This file is part of C-PAC. |
| 4 | + |
| 5 | +# C-PAC is free software: you can redistribute it and/or modify it under |
| 6 | +# the terms of the GNU Lesser General Public License as published by the |
| 7 | +# Free Software Foundation, either version 3 of the License, or (at your |
| 8 | +# option) any later version. |
| 9 | + |
| 10 | +# C-PAC is distributed in the hope that it will be useful, but WITHOUT |
| 11 | +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| 12 | +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
| 13 | +# License for more details. |
| 14 | + |
| 15 | +# You should have received a copy of the GNU Lesser General Public |
| 16 | +# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>. |
| 17 | +"""Unit tests for the C-PAC pipeline engine.""" |
| 18 | + |
| 19 | +from argparse import Namespace |
1 | 20 | import os |
| 21 | +from pathlib import Path |
| 22 | +from typing import cast |
2 | 23 |
|
| 24 | +from _pytest.logging import LogCaptureFixture |
3 | 25 | import pytest |
4 | 26 |
|
5 | 27 | from CPAC.pipeline.cpac_pipeline import ( |
@@ -138,17 +160,129 @@ def test_build_workflow(pipe_config, bids_dir, test_dir): |
138 | 160 | wf.run() |
139 | 161 |
|
140 | 162 |
|
| 163 | +def test_missing_resource( |
| 164 | + bids_examples: Path, caplog: LogCaptureFixture, tmp_path: Path |
| 165 | +) -> None: |
| 166 | + """Test the error message thrown when a resource is missing.""" |
| 167 | + from datetime import datetime |
| 168 | + |
| 169 | + import yaml |
| 170 | + |
| 171 | + from CPAC.pipeline.cpac_runner import run |
| 172 | + from CPAC.utils.bids_utils import sub_list_filter_by_labels |
| 173 | + from CPAC.utils.configuration import Preconfiguration, set_subject |
| 174 | + from CPAC.utils.configuration.yaml_template import create_yaml_from_template |
| 175 | + |
| 176 | + st = datetime.now().strftime("%Y-%m-%dT%H-%M-%SZ") |
| 177 | + namespace = Namespace( |
| 178 | + bids_dir=str(bids_examples / "ds113b"), |
| 179 | + output_dir=str(tmp_path / "output"), |
| 180 | + analysis_level="test_config", |
| 181 | + participant_label="sub-01", |
| 182 | + ) |
| 183 | + c = Preconfiguration("anat-only") |
| 184 | + c["pipeline_setup", "output_directory", "path"] = namespace.output_dir |
| 185 | + c["pipeline_setup", "log_directory", "path"] = str(tmp_path / "logs") |
| 186 | + c["pipeline_setup", "working_directory", "path"] = str(tmp_path / "work") |
| 187 | + c["pipeline_setup", "system_config", "maximum_memory_per_participant"] = 1.0 |
| 188 | + c["pipeline_setup", "system_config", "max_cores_per_participant"] = 1 |
| 189 | + c["pipeline_setup", "system_config", "num_participants_at_once"] = 1 |
| 190 | + c["pipeline_setup", "system_config", "num_ants_threads"] = 1 |
| 191 | + c["pipeline_setup", "working_directory", "remove_working_dir"] = True |
| 192 | + sub_list = create_cpac_data_config( |
| 193 | + namespace.bids_dir, |
| 194 | + namespace.participant_label, |
| 195 | + None, |
| 196 | + True, |
| 197 | + only_one_anat=False, |
| 198 | + ) |
| 199 | + sub_list = sub_list_filter_by_labels(list(sub_list), {"T1w": None, "bold": None}) |
| 200 | + for i, sub in enumerate(sub_list): |
| 201 | + if isinstance(sub.get("anat"), dict): |
| 202 | + for anat_key in sub["anat"]: |
| 203 | + if isinstance(sub["anat"][anat_key], list) and len( |
| 204 | + sub["anat"][anat_key] |
| 205 | + ): |
| 206 | + sub_list[i]["anat"][anat_key] = sub["anat"][anat_key][0] |
| 207 | + if isinstance(sub.get("anat"), list) and len(sub["anat"]): |
| 208 | + sub_list[i]["anat"] = sub["anat"][0] |
| 209 | + data_config_file = f"cpac_data_config_{st}.yml" |
| 210 | + sublogdirs = [set_subject(sub, c)[2] for sub in sub_list] |
| 211 | + # write out the data configuration file |
| 212 | + data_config_file = os.path.join(sublogdirs[0], data_config_file) |
| 213 | + with open(data_config_file, "w", encoding="utf-8") as _f: |
| 214 | + noalias_dumper = yaml.dumper.SafeDumper |
| 215 | + noalias_dumper.ignore_aliases = lambda self, data: True |
| 216 | + yaml.dump(sub_list, _f, default_flow_style=False, Dumper=noalias_dumper) |
| 217 | + |
| 218 | + # update and write out pipeline config file |
| 219 | + pipeline_config_file = os.path.join(sublogdirs[0], f"cpac_pipeline_config_{st}.yml") |
| 220 | + with open(pipeline_config_file, "w", encoding="utf-8") as _f: |
| 221 | + _f.write(create_yaml_from_template(c)) |
| 222 | + minimized_config = f"{pipeline_config_file[:-4]}_min.yml" |
| 223 | + with open(minimized_config, "w", encoding="utf-8") as _f: |
| 224 | + _f.write(create_yaml_from_template(c, import_from="blank")) |
| 225 | + for config_file in (data_config_file, pipeline_config_file, minimized_config): |
| 226 | + os.chmod(config_file, 0o444) # Make config files readonly |
| 227 | + |
| 228 | + if len(sublogdirs) > 1: |
| 229 | + # If more than one run is included in the given data config |
| 230 | + # file, an identical copy of the data and pipeline config |
| 231 | + # will be included in the log directory for each run |
| 232 | + for sublogdir in sublogdirs[1:]: |
| 233 | + for config_file in ( |
| 234 | + data_config_file, |
| 235 | + pipeline_config_file, |
| 236 | + minimized_config, |
| 237 | + ): |
| 238 | + try: |
| 239 | + os.link(config_file, config_file.replace(sublogdirs[0], sublogdir)) |
| 240 | + except FileExistsError: |
| 241 | + pass |
| 242 | + |
| 243 | + run( |
| 244 | + data_config_file, |
| 245 | + pipeline_config_file, |
| 246 | + plugin="Linear", |
| 247 | + plugin_args={ |
| 248 | + "n_procs": int( |
| 249 | + cast( |
| 250 | + int | str, |
| 251 | + c["pipeline_setup", "system_config", "max_cores_per_participant"], |
| 252 | + ) |
| 253 | + ), |
| 254 | + "memory_gb": int( |
| 255 | + cast( |
| 256 | + int | str, |
| 257 | + c[ |
| 258 | + "pipeline_setup", |
| 259 | + "system_config", |
| 260 | + "maximum_memory_per_participant", |
| 261 | + ], |
| 262 | + ) |
| 263 | + ), |
| 264 | + "raise_insufficient": c[ |
| 265 | + "pipeline_setup", "system_config", "raise_insufficient" |
| 266 | + ], |
| 267 | + }, |
| 268 | + tracking=False, |
| 269 | + test_config=namespace.analysis_level == "test_config", |
| 270 | + ) |
| 271 | + |
| 272 | + assert "can be output from" in caplog.text |
| 273 | + |
| 274 | + |
141 | 275 | # bids_dir = "/Users/steven.giavasis/data/HBN-SI_dataset/rawdata" |
142 | 276 | # test_dir = "/test_dir" |
143 | 277 |
|
144 | 278 | # cfg = "/Users/hecheng.jin/GitHub/DevBranch/CPAC/resources/configs/pipeline_config_monkey-ABCD.yml" |
145 | | -cfg = "/Users/hecheng.jin/GitHub/pipeline_config_monkey-ABCDlocal.yml" |
146 | | -bids_dir = "/Users/hecheng.jin/Monkey/monkey_data_oxford/site-ucdavis" |
147 | | -test_dir = "/Users/hecheng.jin/GitHub/Test/T2preproc" |
148 | 279 |
|
149 | 280 | # test_ingress_func_raw_data(cfg, bids_dir, test_dir) |
150 | 281 | # test_ingress_anat_raw_data(cfg, bids_dir, test_dir) |
151 | 282 | # test_ingress_pipeconfig_data(cfg, bids_dir, test_dir) |
152 | 283 | # test_build_anat_preproc_stack(cfg, bids_dir, test_dir) |
153 | 284 | if __name__ == "__main__": |
| 285 | + cfg = "/Users/hecheng.jin/GitHub/pipeline_config_monkey-ABCDlocal.yml" |
| 286 | + bids_dir = "/Users/hecheng.jin/Monkey/monkey_data_oxford/site-ucdavis" |
| 287 | + test_dir = "/Users/hecheng.jin/GitHub/Test/T2preproc" |
154 | 288 | test_build_workflow(cfg, bids_dir, test_dir) |
0 commit comments