Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions CPAC/distortion_correction/distortion_correction.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.pipeline.nodeblock import nodeblock
from CPAC.utils import function
from CPAC.utils.datasource import match_epi_fmaps
from CPAC.utils.datasource import match_epi_fmaps_function_node
from CPAC.utils.interfaces.function import Function


Expand Down Expand Up @@ -406,23 +406,7 @@ def distcor_blip_afni_qwarp(wf, cfg, strat_pool, pipe_num, opt=None):
3dQWarp. The output of this can then proceed to
func_preproc.
"""
match_epi_imports = ["import json"]
match_epi_fmaps_node = pe.Node(
Function(
input_names=[
"bold_pedir",
"epi_fmap_one",
"epi_fmap_params_one",
"epi_fmap_two",
"epi_fmap_params_two",
],
output_names=["opposite_pe_epi", "same_pe_epi"],
function=match_epi_fmaps,
imports=match_epi_imports,
as_module=True,
),
name=f"match_epi_fmaps_{pipe_num}",
)
match_epi_fmaps_node = match_epi_fmaps_function_node(f"match_epi_fmaps_{pipe_num}")

node, out = strat_pool.get_data("epi-1")
wf.connect(node, out, match_epi_fmaps_node, "epi_fmap_one")
Expand Down
47 changes: 38 additions & 9 deletions CPAC/utils/datasource.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2012-2024 C-PAC Developers
# Copyright (C) 2012-2025 C-PAC Developers

# This file is part of C-PAC.

Expand All @@ -20,6 +20,7 @@
import json
from pathlib import Path
import re
from typing import Any, Optional

from voluptuous import RequiredFieldInvalid
from nipype.interfaces import utility as util
Expand Down Expand Up @@ -463,12 +464,12 @@ def gather_echo_times(echotime_1, echotime_2, echotime_3=None, echotime_4=None):


def match_epi_fmaps(
bold_pedir,
epi_fmap_one,
epi_fmap_params_one,
epi_fmap_two=None,
epi_fmap_params_two=None,
):
bold_pedir: str,
epi_fmap_one: str,
epi_fmap_params_one: dict[str, Any],
epi_fmap_two: Optional[str] = None,
epi_fmap_params_two: Optional[dict[str, Any]] = None,
) -> tuple[str, str]:
"""Match EPI field maps to the BOLD scan.

Parse the field map files in the data configuration and determine which
Expand Down Expand Up @@ -504,13 +505,41 @@ def match_epi_fmaps(
with open(scan_params, "r") as f:
scan_params = json.load(f)
if "PhaseEncodingDirection" in scan_params:
epi_pedir = scan_params["PhaseEncodingDirection"]
epi_pedir: str | bytes = scan_params["PhaseEncodingDirection"]
if isinstance(epi_pedir, bytes):
epi_pedir = epi_pedir.decode("utf-8")
if epi_pedir == bold_pedir:
same_pe_epi = epi_scan
elif epi_pedir[0] == bold_pedir[0]:
opposite_pe_epi = epi_scan

return (opposite_pe_epi, same_pe_epi)
if same_pe_epi is None:
msg = f"Same phase encoding EPI: {bold_pedir}"
raise FileNotFoundError(msg)
if opposite_pe_epi is None:
msg = f"Opposite phase encoding EPI: {bold_pedir}"
raise FileNotFoundError(msg)

return opposite_pe_epi, same_pe_epi


def match_epi_fmaps_function_node(name: str = "match_epi_fmaps"):
"""Return a Function node for `~CPAC.utils.datasource.match_epi_fmaps`."""
return pe.Node(
Function(
input_names=[
"bold_pedir",
"epi_fmap_one",
"epi_fmap_params_one",
"epi_fmap_two",
"epi_fmap_params_two",
],
output_names=["opposite_pe_epi", "same_pe_epi"],
function=match_epi_fmaps,
as_module=True,
),
name=name,
)


def ingress_func_metadata(
Expand Down
42 changes: 21 additions & 21 deletions CPAC/utils/test_resources.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2019-2024 C-PAC Developers
# Copyright (C) 2019-2025 C-PAC Developers

# This file is part of C-PAC.

Expand All @@ -14,29 +14,32 @@

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
from CPAC.utils.monitoring import WFLOGGER
"""Resources for testing utilities."""

import os
import shutil
from typing import Optional

def setup_test_wf(s3_prefix, paths_list, test_name, workdirs_to_keep=None):
"""Set up a basic template Nipype workflow for testing single nodes or
small sub-workflows.
"""
import os
import shutil
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.utils.datasource import check_for_s3
from CPAC.utils.interfaces.datasink import DataSink
from CPAC.utils.monitoring import WFLOGGER

from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.utils.datasource import check_for_s3
from CPAC.utils.interfaces.datasink import DataSink

test_dir = os.path.join(os.getcwd(), test_name)
def setup_test_wf(
s3_prefix,
paths_list,
test_name,
workdirs_to_keep=None,
test_dir: Optional[str] = None,
) -> tuple[pe.Workflow, pe.Node, dict[str, str]]:
"""Set up a basic template Nipype workflow for testing small workflows."""
test_dir = os.path.join(test_dir if test_dir else os.getcwd(), test_name)
work_dir = os.path.join(test_dir, "workdir")
out_dir = os.path.join(test_dir, "output")

if os.path.exists(out_dir):
try:
shutil.rmtree(out_dir)
except:
pass
shutil.rmtree(out_dir, ignore_errors=True)

if os.path.exists(work_dir):
for dirname in os.listdir(work_dir):
Expand All @@ -45,10 +48,7 @@ def setup_test_wf(s3_prefix, paths_list, test_name, workdirs_to_keep=None):
WFLOGGER.info("%s --- %s\n", dirname, keepdir)
if keepdir in dirname:
continue
try:
shutil.rmtree(os.path.join(work_dir, dirname))
except:
pass
shutil.rmtree(os.path.join(work_dir, dirname), ignore_errors=True)

local_paths = {}
for subpath in paths_list:
Expand All @@ -67,4 +67,4 @@ def setup_test_wf(s3_prefix, paths_list, test_name, workdirs_to_keep=None):
ds.inputs.base_directory = out_dir
ds.inputs.parameterization = True

return (wf, ds, local_paths)
return wf, ds, local_paths
Loading