diff --git a/CPAC/distortion_correction/distortion_correction.py b/CPAC/distortion_correction/distortion_correction.py
index 4457ab91f..0ddf005e9 100644
--- a/CPAC/distortion_correction/distortion_correction.py
+++ b/CPAC/distortion_correction/distortion_correction.py
@@ -36,7 +36,7 @@
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.pipeline.nodeblock import nodeblock
from CPAC.utils import function
-from CPAC.utils.datasource import match_epi_fmaps
+from CPAC.utils.datasource import match_epi_fmaps_function_node
from CPAC.utils.interfaces.function import Function
@@ -406,23 +406,7 @@ def distcor_blip_afni_qwarp(wf, cfg, strat_pool, pipe_num, opt=None):
3dQWarp. The output of this can then proceed to
func_preproc.
"""
- match_epi_imports = ["import json"]
- match_epi_fmaps_node = pe.Node(
- Function(
- input_names=[
- "bold_pedir",
- "epi_fmap_one",
- "epi_fmap_params_one",
- "epi_fmap_two",
- "epi_fmap_params_two",
- ],
- output_names=["opposite_pe_epi", "same_pe_epi"],
- function=match_epi_fmaps,
- imports=match_epi_imports,
- as_module=True,
- ),
- name=f"match_epi_fmaps_{pipe_num}",
- )
+ match_epi_fmaps_node = match_epi_fmaps_function_node(f"match_epi_fmaps_{pipe_num}")
node, out = strat_pool.get_data("epi-1")
wf.connect(node, out, match_epi_fmaps_node, "epi_fmap_one")
diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py
index 25adb1eec..a23f37348 100644
--- a/CPAC/utils/datasource.py
+++ b/CPAC/utils/datasource.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2012-2024 C-PAC Developers
+# Copyright (C) 2012-2025 C-PAC Developers
# This file is part of C-PAC.
@@ -20,6 +20,7 @@
import json
from pathlib import Path
import re
+from typing import Any, Optional
from voluptuous import RequiredFieldInvalid
from nipype.interfaces import utility as util
@@ -463,12 +464,12 @@ def gather_echo_times(echotime_1, echotime_2, echotime_3=None, echotime_4=None):
def match_epi_fmaps(
- bold_pedir,
- epi_fmap_one,
- epi_fmap_params_one,
- epi_fmap_two=None,
- epi_fmap_params_two=None,
-):
+ bold_pedir: str,
+ epi_fmap_one: str,
+ epi_fmap_params_one: dict[str, Any],
+ epi_fmap_two: Optional[str] = None,
+ epi_fmap_params_two: Optional[dict[str, Any]] = None,
+) -> tuple[str, str]:
"""Match EPI field maps to the BOLD scan.
Parse the field map files in the data configuration and determine which
@@ -504,13 +505,41 @@ def match_epi_fmaps(
with open(scan_params, "r") as f:
scan_params = json.load(f)
if "PhaseEncodingDirection" in scan_params:
- epi_pedir = scan_params["PhaseEncodingDirection"]
+ epi_pedir: str | bytes = scan_params["PhaseEncodingDirection"]
+ if isinstance(epi_pedir, bytes):
+ epi_pedir = epi_pedir.decode("utf-8")
if epi_pedir == bold_pedir:
same_pe_epi = epi_scan
elif epi_pedir[0] == bold_pedir[0]:
opposite_pe_epi = epi_scan
- return (opposite_pe_epi, same_pe_epi)
+ if same_pe_epi is None:
+ msg = f"Same phase encoding EPI: {bold_pedir}"
+ raise FileNotFoundError(msg)
+ if opposite_pe_epi is None:
+ msg = f"Opposite phase encoding EPI: {bold_pedir}"
+ raise FileNotFoundError(msg)
+
+ return opposite_pe_epi, same_pe_epi
+
+
+def match_epi_fmaps_function_node(name: str = "match_epi_fmaps"):
+ """Return a Function node for `~CPAC.utils.datasource.match_epi_fmaps`."""
+ return pe.Node(
+ Function(
+ input_names=[
+ "bold_pedir",
+ "epi_fmap_one",
+ "epi_fmap_params_one",
+ "epi_fmap_two",
+ "epi_fmap_params_two",
+ ],
+ output_names=["opposite_pe_epi", "same_pe_epi"],
+ function=match_epi_fmaps,
+ as_module=True,
+ ),
+ name=name,
+ )
def ingress_func_metadata(
diff --git a/CPAC/utils/test_resources.py b/CPAC/utils/test_resources.py
index da58e4e0f..5d447292f 100644
--- a/CPAC/utils/test_resources.py
+++ b/CPAC/utils/test_resources.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019-2024 C-PAC Developers
+# Copyright (C) 2019-2025 C-PAC Developers
# This file is part of C-PAC.
@@ -14,29 +14,32 @@
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see .
-from CPAC.utils.monitoring import WFLOGGER
+"""Resources for testing utilities."""
+import os
+import shutil
+from typing import Optional
-def setup_test_wf(s3_prefix, paths_list, test_name, workdirs_to_keep=None):
- """Set up a basic template Nipype workflow for testing single nodes or
- small sub-workflows.
- """
- import os
- import shutil
+from CPAC.pipeline import nipype_pipeline_engine as pe
+from CPAC.utils.datasource import check_for_s3
+from CPAC.utils.interfaces.datasink import DataSink
+from CPAC.utils.monitoring import WFLOGGER
- from CPAC.pipeline import nipype_pipeline_engine as pe
- from CPAC.utils.datasource import check_for_s3
- from CPAC.utils.interfaces.datasink import DataSink
- test_dir = os.path.join(os.getcwd(), test_name)
+def setup_test_wf(
+ s3_prefix,
+ paths_list,
+ test_name,
+ workdirs_to_keep=None,
+ test_dir: Optional[str] = None,
+) -> tuple[pe.Workflow, pe.Node, dict[str, str]]:
+ """Set up a basic template Nipype workflow for testing small workflows."""
+ test_dir = os.path.join(test_dir if test_dir else os.getcwd(), test_name)
work_dir = os.path.join(test_dir, "workdir")
out_dir = os.path.join(test_dir, "output")
if os.path.exists(out_dir):
- try:
- shutil.rmtree(out_dir)
- except:
- pass
+ shutil.rmtree(out_dir, ignore_errors=True)
if os.path.exists(work_dir):
for dirname in os.listdir(work_dir):
@@ -45,10 +48,7 @@ def setup_test_wf(s3_prefix, paths_list, test_name, workdirs_to_keep=None):
WFLOGGER.info("%s --- %s\n", dirname, keepdir)
if keepdir in dirname:
continue
- try:
- shutil.rmtree(os.path.join(work_dir, dirname))
- except:
- pass
+ shutil.rmtree(os.path.join(work_dir, dirname), ignore_errors=True)
local_paths = {}
for subpath in paths_list:
@@ -67,4 +67,4 @@ def setup_test_wf(s3_prefix, paths_list, test_name, workdirs_to_keep=None):
ds.inputs.base_directory = out_dir
ds.inputs.parameterization = True
- return (wf, ds, local_paths)
+ return wf, ds, local_paths
diff --git a/CPAC/utils/tests/test_datasource.py b/CPAC/utils/tests/test_datasource.py
index be7c2255c..6e9e52c0d 100644
--- a/CPAC/utils/tests/test_datasource.py
+++ b/CPAC/utils/tests/test_datasource.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019-2024 C-PAC Developers
+# Copyright (C) 2019-2025 C-PAC Developers
# This file is part of C-PAC.
@@ -14,72 +14,370 @@
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see .
+"""Test datasource utilities."""
+
+from dataclasses import dataclass
import json
+from pathlib import Path
+from typing import Any, Literal, TypeAlias
+from networkx.classes.digraph import DiGraph
import pytest
from CPAC.pipeline import nipype_pipeline_engine as pe
-from CPAC.utils.datasource import match_epi_fmaps
-from CPAC.utils.interfaces import Function
+from CPAC.utils.datasource import match_epi_fmaps, match_epi_fmaps_function_node
from CPAC.utils.test_resources import setup_test_wf
+from CPAC.utils.utils import PE_DIRECTION
-@pytest.mark.skip(reason="needs refactoring")
-def test_match_epi_fmaps():
- # good data to use
- s3_prefix = "s3://fcp-indi/data/Projects/HBN/MRI/Site-CBIC/sub-NDARAB708LM5"
- s3_paths = [
- "func/sub-NDARAB708LM5_task-rest_run-1_bold.json",
- "fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.nii.gz",
- "fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.json",
- "fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.nii.gz",
- "fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.json",
- ]
+@dataclass
+class MatchEpiFmapsInputs:
+ """Store test data for `match_epi_fmaps`."""
- wf, ds, local_paths = setup_test_wf(s3_prefix, s3_paths, "test_match_epi_fmaps")
+ bold_pedir: PE_DIRECTION
+ epi_fmaps: list[tuple[str, dict[str, Any]]]
- opposite_pe_json = local_paths["fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.json"]
- same_pe_json = local_paths["fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.json"]
- func_json = local_paths["func/sub-NDARAB708LM5_task-rest_run-1_bold.json"]
- with open(opposite_pe_json, "r") as f:
- opposite_pe_params = json.load(f)
+def match_epi_fmaps_inputs(
+ generate: bool, path: Path
+) -> tuple[pe.Workflow, MatchEpiFmapsInputs]:
+ """Return inputs for `~CPAC.utils.datasource.match_epi_fmaps`."""
+ if generate:
+ # good data to use
+ s3_prefix = "s3://fcp-indi/data/Projects/HBN/MRI/Site-CBIC/sub-NDARAB708LM5"
+ s3_paths = [
+ "func/sub-NDARAB708LM5_task-rest_run-1_bold.json",
+ "fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.nii.gz",
+ "fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.json",
+ "fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.nii.gz",
+ "fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.json",
+ ]
- with open(same_pe_json, "r") as f:
- same_pe_params = json.load(f)
+ wf, ds, local_paths = setup_test_wf(
+ s3_prefix, s3_paths, "test_match_epi_fmaps", test_dir=str(path)
+ )
- with open(func_json, "r") as f:
- func_params = json.load(f)
- bold_pedir = func_params["PhaseEncodingDirection"]
+ opposite_pe_json = local_paths["fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.json"]
+ same_pe_json = local_paths["fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.json"]
+ func_json = local_paths["func/sub-NDARAB708LM5_task-rest_run-1_bold.json"]
- fmap_paths_dct = {
- "epi_PA": {
- "scan": local_paths["fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.nii.gz"],
- "scan_parameters": opposite_pe_params,
- },
- "epi_AP": {
- "scan": local_paths["fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.nii.gz"],
- "scan_parameters": same_pe_params,
- },
- }
+ with open(opposite_pe_json, "r") as f:
+ opposite_pe_params = json.load(f)
+
+ with open(same_pe_json, "r") as f:
+ same_pe_params = json.load(f)
- match_fmaps = pe.Node(
- Function(
- input_names=["fmap_dct", "bold_pedir"],
- output_names=["opposite_pe_epi", "same_pe_epi"],
- function=match_epi_fmaps,
- as_module=True,
- ),
- name="match_epi_fmaps",
+ with open(func_json, "r") as f:
+ func_params = json.load(f)
+ bold_pedir = func_params["PhaseEncodingDirection"]
+
+ fmap_paths_dct = {
+ "epi_PA": {
+ "scan": local_paths["fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.nii.gz"],
+ "scan_parameters": opposite_pe_params,
+ },
+ "epi_AP": {
+ "scan": local_paths["fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.nii.gz"],
+ "scan_parameters": same_pe_params,
+ },
+ }
+ ds.inputs.func_json = func_json
+ ds.inputs.opposite_pe_json = opposite_pe_json
+ ds.inputs.same_pe_json = same_pe_json
+ return wf, MatchEpiFmapsInputs(
+ bold_pedir,
+ [
+ (scan["scan"], scan["scan_parameters"])
+ for scan in fmap_paths_dct.values()
+ ],
+ )
+ _paths = [
+ f"{path}/sub-NDARAB514MAJ_dir-AP_acq-fMRI_epi.nii.gz",
+ f"{path}/sub-NDARAB514MAJ_dir-PA_acq-fMRI_epi.nii.gz",
+ ]
+ for _ in _paths:
+ Path(_).touch(exist_ok=True)
+ return pe.Workflow("test_match_epi_fmaps", path), MatchEpiFmapsInputs(
+ "j-",
+ [
+ (
+ _paths[0],
+ {
+ "AcquisitionMatrixPE": 84,
+ "BandwidthPerPixelPhaseEncode": 23.81,
+ "BaseResolution": 84,
+ "BodyPartExamined": b"BRAIN",
+ "ConsistencyInfo": b"N4_VE11B_LATEST_20150530",
+ "ConversionSoftware": b"dcm2niix",
+ "ConversionSoftwareVersion": b"v1.0.20171215 GCC4.8.4",
+ "DerivedVendorReportedEchoSpacing": 0.00049999,
+ "DeviceSerialNumber": b"67080",
+ "DwellTime": 2.6e-06,
+ "EchoTime": 0.0512,
+ "EchoTrainLength": 84,
+ "EffectiveEchoSpacing": 0.00049999,
+ "FlipAngle": 90,
+ "ImageOrientationPatientDICOM": [1, 0, 0, 0, 1, 0],
+ "ImageType": ["ORIGINAL", "PRIMARY", "M", "ND", "MOSAIC"],
+ "InPlanePhaseEncodingDirectionDICOM": b"COL",
+ "MRAcquisitionType": b"2D",
+ "MagneticFieldStrength": 3,
+ "Manufacturer": b"Siemens",
+ "ManufacturersModelName": b"Prisma_fit",
+ "Modality": b"MR",
+ "PartialFourier": 1,
+ "PatientPosition": b"HFS",
+ "PercentPhaseFOV": 100,
+ "PhaseEncodingDirection": b"j-",
+ "PhaseEncodingSteps": 84,
+ "PhaseResolution": 1,
+ "PixelBandwidth": 2290,
+ "ProcedureStepDescription": b"CMI_HBN-CBIC",
+ "ProtocolName": b"cmrr_fMRI_DistortionMap_AP",
+ "PulseSequenceDetails": b"%CustomerSeq%_cmrr_mbep2d_se",
+ "ReceiveCoilActiveElements": b"HEA;HEP",
+ "ReceiveCoilName": b"Head_32",
+ "ReconMatrixPE": 84,
+ "RepetitionTime": 5.301,
+ "SAR": 0.364379,
+ "ScanOptions": b"FS",
+ "ScanningSequence": b"EP",
+ "SequenceName": b"epse2d1_84",
+ "SequenceVariant": b"SK",
+ "SeriesDescription": b"cmrr_fMRI_DistortionMap_AP",
+ "ShimSetting": [208, -10464, -5533, 615, -83, -88, 55, 30],
+ "SliceThickness": 2.4,
+ "SliceTiming": [
+ 2.64,
+ 0,
+ 2.7275,
+ 0.0875,
+ 2.815,
+ 0.175,
+ 2.9025,
+ 0.2625,
+ 2.9925,
+ 0.3525,
+ 3.08,
+ 0.44,
+ 3.1675,
+ 0.5275,
+ 3.255,
+ 0.615,
+ 3.3425,
+ 0.7025,
+ 3.4325,
+ 0.7925,
+ 3.52,
+ 0.88,
+ 3.6075,
+ 0.9675,
+ 3.695,
+ 1.055,
+ 3.785,
+ 1.1425,
+ 3.8725,
+ 1.2325,
+ 3.96,
+ 1.32,
+ 4.0475,
+ 1.4075,
+ 4.135,
+ 1.495,
+ 4.225,
+ 1.5825,
+ 4.3125,
+ 1.6725,
+ 4.4,
+ 1.76,
+ 4.4875,
+ 1.8475,
+ 4.575,
+ 1.935,
+ 4.665,
+ 2.0225,
+ 4.7525,
+ 2.1125,
+ 4.84,
+ 2.2,
+ 4.9275,
+ 2.2875,
+ 5.015,
+ 2.375,
+ 5.105,
+ 2.4625,
+ 5.1925,
+ 2.5525,
+ ],
+ "SoftwareVersions": b"syngo_MR_E11",
+ "SpacingBetweenSlices": 2.4,
+ "StationName": b"MRTRIO3TX72",
+ "TotalReadoutTime": 0.0414992,
+ "TxRefAmp": 209.923,
+ },
+ ),
+ (
+ _paths[1],
+ {
+ "AcquisitionMatrixPE": 84,
+ "BandwidthPerPixelPhaseEncode": 23.81,
+ "BaseResolution": 84,
+ "BodyPartExamined": b"BRAIN",
+ "ConsistencyInfo": b"N4_VE11B_LATEST_20150530",
+ "ConversionSoftware": b"dcm2niix",
+ "ConversionSoftwareVersion": b"v1.0.20171215 GCC4.8.4",
+ "DerivedVendorReportedEchoSpacing": 0.00049999,
+ "DeviceSerialNumber": b"67080",
+ "DwellTime": 2.6e-06,
+ "EchoTime": 0.0512,
+ "EchoTrainLength": 84,
+ "EffectiveEchoSpacing": 0.00049999,
+ "FlipAngle": 90,
+ "ImageOrientationPatientDICOM": [1, 0, 0, 0, 1, 0],
+ "ImageType": ["ORIGINAL", "PRIMARY", "M", "ND", "MOSAIC"],
+ "InPlanePhaseEncodingDirectionDICOM": b"COL",
+ "MRAcquisitionType": b"2D",
+ "MagneticFieldStrength": 3,
+ "Manufacturer": b"Siemens",
+ "ManufacturersModelName": b"Prisma_fit",
+ "Modality": b"MR",
+ "PartialFourier": 1,
+ "PatientPosition": b"HFS",
+ "PercentPhaseFOV": 100,
+ "PhaseEncodingDirection": b"j",
+ "PhaseEncodingSteps": 84,
+ "PhaseResolution": 1,
+ "PixelBandwidth": 2290,
+ "ProcedureStepDescription": b"CMI_HBN-CBIC",
+ "ProtocolName": b"cmrr_fMRI_DistortionMap_PA",
+ "PulseSequenceDetails": b"%CustomerSeq%_cmrr_mbep2d_se",
+ "ReceiveCoilActiveElements": b"HEA;HEP",
+ "ReceiveCoilName": b"Head_32",
+ "ReconMatrixPE": 84,
+ "RepetitionTime": 5.301,
+ "SAR": 0.364379,
+ "ScanOptions": b"FS",
+ "ScanningSequence": b"EP",
+ "SequenceName": b"epse2d1_84",
+ "SequenceVariant": b"SK",
+ "SeriesDescription": b"cmrr_fMRI_DistortionMap_PA",
+ "ShimSetting": [208, -10464, -5533, 615, -83, -88, 55, 30],
+ "SliceThickness": 2.4,
+ "SliceTiming": [
+ 2.64,
+ 0,
+ 2.73,
+ 0.09,
+ 2.8175,
+ 0.1775,
+ 2.905,
+ 0.265,
+ 2.9925,
+ 0.3525,
+ 3.08,
+ 0.44,
+ 3.17,
+ 0.53,
+ 3.2575,
+ 0.6175,
+ 3.345,
+ 0.705,
+ 3.4325,
+ 0.7925,
+ 3.52,
+ 0.88,
+ 3.61,
+ 0.97,
+ 3.6975,
+ 1.0575,
+ 3.785,
+ 1.145,
+ 3.8725,
+ 1.2325,
+ 3.9625,
+ 1.32,
+ 4.05,
+ 1.41,
+ 4.1375,
+ 1.4975,
+ 4.225,
+ 1.585,
+ 4.3125,
+ 1.6725,
+ 4.4025,
+ 1.76,
+ 4.49,
+ 1.85,
+ 4.5775,
+ 1.9375,
+ 4.665,
+ 2.025,
+ 4.7525,
+ 2.1125,
+ 4.8425,
+ 2.2,
+ 4.93,
+ 2.29,
+ 5.0175,
+ 2.3775,
+ 5.105,
+ 2.465,
+ 5.1925,
+ 2.5525,
+ ],
+ "SoftwareVersions": b"syngo_MR_E11",
+ "SpacingBetweenSlices": 2.4,
+ "StationName": b"MRTRIO3TX72",
+ "TotalReadoutTime": 0.0414992,
+ "TxRefAmp": 209.923,
+ },
+ ),
+ ],
)
- match_fmaps.inputs.fmap_dct = fmap_paths_dct
- match_fmaps.inputs.bold_pedir = bold_pedir
- ds.inputs.func_json = func_json
- ds.inputs.opposite_pe_json = opposite_pe_json
- ds.inputs.same_pe_json = same_pe_json
- wf.connect(match_fmaps, "opposite_pe_epi", ds, "should_be_dir-PA")
- wf.connect(match_fmaps, "same_pe_epi", ds, "should_be_dir-AP")
+RunType: TypeAlias = Literal["nipype"] | Literal["direct"]
+Direction: TypeAlias = Literal["opposite"] | Literal["same"]
- wf.run()
+
+@pytest.mark.parametrize("generate", [True, False])
+def test_match_epi_fmaps(generate: bool, tmp_path: Path) -> None:
+ """Test `~CPAC.utils.datasource.match_epi_fmaps`."""
+ wf, data = match_epi_fmaps_inputs(generate, tmp_path)
+
+ match_fmaps = match_epi_fmaps_function_node()
+ match_fmaps.inputs.bold_pedir = data.bold_pedir
+ match_fmaps.inputs.epi_fmap_one = data.epi_fmaps[0][0]
+ match_fmaps.inputs.epi_fmap_params_one = data.epi_fmaps[0][1]
+ match_fmaps.inputs.epi_fmap_two = data.epi_fmaps[1][0]
+ match_fmaps.inputs.epi_fmap_params_two = data.epi_fmaps[1][1]
+
+ wf.add_nodes([match_fmaps])
+
+ graph: DiGraph = wf.run()
+ result = list(graph.nodes)[-1].run()
+ str_outputs: dict[RunType, dict[Direction, str]] = {
+ "nipype": {
+ "opposite": result.outputs.opposite_pe_epi,
+ "same": result.outputs.same_pe_epi,
+ },
+ "direct": {},
+ }
+ path_outputs: dict[RunType, dict[Direction, Path]] = {"nipype": {}, "direct": {}}
+ str_outputs["direct"]["opposite"], str_outputs["direct"]["same"] = match_epi_fmaps(
+ data.bold_pedir,
+ data.epi_fmaps[0][0],
+ data.epi_fmaps[0][1],
+ data.epi_fmaps[1][0],
+ data.epi_fmaps[1][1],
+ )
+ directions: list[Direction] = ["opposite", "same"]
+ runtypes: list[RunType] = ["nipype", "direct"]
+ for direction in directions:
+ for runtype in runtypes:
+ path_outputs[runtype][direction] = Path(str_outputs[runtype][direction])
+ assert path_outputs[runtype][direction].exists()
+ assert (
+ path_outputs["nipype"][direction].name
+ == path_outputs["direct"][direction].name
+ )