From ddd9c7a48d8259e707105914ed77249c32a8a595 Mon Sep 17 00:00:00 2001 From: Jon Cluce Date: Wed, 18 Jun 2025 22:08:17 -0400 Subject: [PATCH 1/6] :goal_net: Handle more datetime edge cases in logging --- CPAC/utils/monitoring/draw_gantt_chart.py | 12 +- CPAC/utils/monitoring/monitoring.py | 144 ++++++++++++++++++++-- 2 files changed, 142 insertions(+), 14 deletions(-) diff --git a/CPAC/utils/monitoring/draw_gantt_chart.py b/CPAC/utils/monitoring/draw_gantt_chart.py index a7a0aaac9..a299b5108 100644 --- a/CPAC/utils/monitoring/draw_gantt_chart.py +++ b/CPAC/utils/monitoring/draw_gantt_chart.py @@ -39,8 +39,7 @@ # You should have received a copy of the GNU Lesser General Public # License along with C-PAC. If not, see . -"""Module to draw an html gantt chart from logfile produced by -``CPAC.utils.monitoring.log_nodes_cb()``. +"""Module to draw an html gantt chart from logfile produced by `~CPAC.utils.monitoring.log_nodes_cb`. See https://nipype.readthedocs.io/en/latest/api/generated/nipype.utils.draw_gantt_chart.html """ @@ -430,9 +429,12 @@ def generate_gantt_chart( html_string += "

Cores: " + str(cores) + "

" html_string += close_header # Draw nipype nodes Gantt chart and runtimes - html_string += draw_lines( - start_node["start"], duration, minute_scale, space_between_minutes - ) + try: + html_string += draw_lines( + start_node["start"], duration, minute_scale, space_between_minutes + ) + except: + breakpoint() html_string += draw_nodes( start_node["start"], nodes_list, diff --git a/CPAC/utils/monitoring/monitoring.py b/CPAC/utils/monitoring/monitoring.py index 6e9466c33..9a6ce3c2f 100644 --- a/CPAC/utils/monitoring/monitoring.py +++ b/CPAC/utils/monitoring/monitoring.py @@ -16,14 +16,16 @@ # License along with C-PAC. If not, see . """Monitoring utilities for C-PAC.""" -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import glob import json import math import os import socketserver +import struct import threading -from typing import Any, Optional, TypeAlias +from typing import Any, Optional, overload, TypeAlias +from zoneinfo import available_timezones, ZoneInfo import networkx as nx from traits.trait_base import Undefined @@ -72,16 +74,104 @@ def __sub__(self, other: "DatetimeWithSafeNone | _NoTime") -> datetime | timedel """Subtract between None and a datetime or timedelta or None.""" return _safe_none_diff(self, other) + def isoformat(self) -> str: + """Return an ISO 8601-like string of 0s for display.""" + return "0000-00-00" + NoTime = _NoTime() """A singleton None that can be used in place of a datetime object.""" class DatetimeWithSafeNone(datetime, _NoTime): - """Time class that can be None or a time value.""" - - def __new__(cls, dt: "OptionalDatetime") -> "DatetimeWithSafeNone | _NoTime": + """Time class that can be None or a time value. + + Examples + -------- + >>> from datetime import datetime + >>> DatetimeWithSafeNone(datetime(2025, 6, 18, 21, 6, 43, 730004)).isoformat() + '2025-06-18T21:06:43.730004' + >>> DatetimeWithSafeNone("2025-06-18T21:06:43.730004").isoformat() + '2025-06-18T21:06:43.730004' + >>> DatetimeWithSafeNone(b"\\x07\\xe9\\x06\\x12\\x10\\x18\\x1c\\x88\\x6d\\x01").isoformat() + '2025-06-18T16:24:28.028040+00:00' + >>> DatetimeWithSafeNone(b'\\x07\\xe9\\x06\\x12\\x10\\x18\\x1c\\x88m\\x00').isoformat() + '2025-06-18T16:24:28.028040' + >>> DatetimeWithSafeNone(DatetimeWithSafeNone("2025-06-18")).isoformat() + '2025-06-18T00:00:00' + >>> DatetimeWithSafeNone(None) + NoTime + >>> DatetimeWithSafeNone(None).isoformat() + '0000-00-00' + """ + + @overload + def __new__( + cls, + year: "OptionalDatetime", + month: None = None, + day: None = None, + hour: None = None, + minute: None = None, + second: None = None, + microsecond: None = None, + tzinfo: None = None, + *, + fold: None = None, + ) -> "DatetimeWithSafeNone | _NoTime": ... + @overload + def __new__( + cls, + year: int, + month: Optional[int] = None, + day: Optional[int] = None, + hour: int = 0, + minute: int = 0, + second: int = 0, + microsecond: int = 0, + tzinfo: Optional[timezone | ZoneInfo] = None, + *, + fold: int = 0, + ) -> "DatetimeWithSafeNone": ... + + def __new__( + cls, + year: "int | OptionalDatetime", + month: Optional[int] = None, + day: Optional[int] = None, + hour: Optional[int] = 0, + minute: Optional[int] = 0, + second: Optional[int] = 0, + microsecond: Optional[int] = 0, + tzinfo: Optional[timezone | ZoneInfo] = None, + *, + fold: Optional[int] = 0, + ) -> "DatetimeWithSafeNone | _NoTime": """Create a new instance of the class.""" + if ( + isinstance(year, int) + and isinstance(month, int) + and isinstance(day, int) + and isinstance(hour, int) + and isinstance(minute, int) + and isinstance(second, int) + and isinstance(microsecond, int) + and isinstance(fold, int) + ): + return datetime.__new__( + cls, + year, + month, + day, + hour, + minute, + second, + microsecond, + tzinfo, + fold=fold, + ) + else: + dt = year if dt is None: return NoTime if isinstance(dt, datetime): @@ -98,9 +188,43 @@ def __new__(cls, dt: "OptionalDatetime") -> "DatetimeWithSafeNone | _NoTime": ) if isinstance(dt, bytes): try: - dt = dt.decode("utf-8") + tzflag: Optional[int] + year, month, day, hour, minute, second = struct.unpack(">H5B", dt[:7]) + microsecond, tzflag = struct.unpack(" bool: """Return True if not NoTime.""" return self is not NoTime - def __sub__(self, other: "DatetimeWithSafeNone | _NoTime") -> datetime | timedelta: + def __sub__(self, other: "DatetimeWithSafeNone | _NoTime") -> datetime | timedelta: # type: ignore[reportIncompatibleMethodOverride] """Subtract between a datetime or timedelta or None.""" return _safe_none_diff(self, other) @@ -146,7 +270,9 @@ def json_dumps(obj: Any, **kwargs) -> str: return json.dumps(obj, cls=DatetimeJSONEncoder, **kwargs) -OptionalDatetime: TypeAlias = Optional[datetime | str | DatetimeWithSafeNone | _NoTime] +OptionalDatetime: TypeAlias = Optional[ + datetime | str | bytes | DatetimeWithSafeNone | _NoTime +] """Type alias for a datetime, ISO-format string or None.""" From c8f45a910bfe79b64487522b468f3cd67822fae6 Mon Sep 17 00:00:00 2001 From: Jon Cluce Date: Wed, 18 Jun 2025 14:10:07 -0400 Subject: [PATCH 2/6] :white_check_mark: Refactor `test_match_epi_fmaps` --- CPAC/utils/test_resources.py | 42 ++++++++++++++--------------- CPAC/utils/tests/test_datasource.py | 28 +++++++++++++------ 2 files changed, 41 insertions(+), 29 deletions(-) diff --git a/CPAC/utils/test_resources.py b/CPAC/utils/test_resources.py index da58e4e0f..5d447292f 100644 --- a/CPAC/utils/test_resources.py +++ b/CPAC/utils/test_resources.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019-2024 C-PAC Developers +# Copyright (C) 2019-2025 C-PAC Developers # This file is part of C-PAC. @@ -14,29 +14,32 @@ # You should have received a copy of the GNU Lesser General Public # License along with C-PAC. If not, see . -from CPAC.utils.monitoring import WFLOGGER +"""Resources for testing utilities.""" +import os +import shutil +from typing import Optional -def setup_test_wf(s3_prefix, paths_list, test_name, workdirs_to_keep=None): - """Set up a basic template Nipype workflow for testing single nodes or - small sub-workflows. - """ - import os - import shutil +from CPAC.pipeline import nipype_pipeline_engine as pe +from CPAC.utils.datasource import check_for_s3 +from CPAC.utils.interfaces.datasink import DataSink +from CPAC.utils.monitoring import WFLOGGER - from CPAC.pipeline import nipype_pipeline_engine as pe - from CPAC.utils.datasource import check_for_s3 - from CPAC.utils.interfaces.datasink import DataSink - test_dir = os.path.join(os.getcwd(), test_name) +def setup_test_wf( + s3_prefix, + paths_list, + test_name, + workdirs_to_keep=None, + test_dir: Optional[str] = None, +) -> tuple[pe.Workflow, pe.Node, dict[str, str]]: + """Set up a basic template Nipype workflow for testing small workflows.""" + test_dir = os.path.join(test_dir if test_dir else os.getcwd(), test_name) work_dir = os.path.join(test_dir, "workdir") out_dir = os.path.join(test_dir, "output") if os.path.exists(out_dir): - try: - shutil.rmtree(out_dir) - except: - pass + shutil.rmtree(out_dir, ignore_errors=True) if os.path.exists(work_dir): for dirname in os.listdir(work_dir): @@ -45,10 +48,7 @@ def setup_test_wf(s3_prefix, paths_list, test_name, workdirs_to_keep=None): WFLOGGER.info("%s --- %s\n", dirname, keepdir) if keepdir in dirname: continue - try: - shutil.rmtree(os.path.join(work_dir, dirname)) - except: - pass + shutil.rmtree(os.path.join(work_dir, dirname), ignore_errors=True) local_paths = {} for subpath in paths_list: @@ -67,4 +67,4 @@ def setup_test_wf(s3_prefix, paths_list, test_name, workdirs_to_keep=None): ds.inputs.base_directory = out_dir ds.inputs.parameterization = True - return (wf, ds, local_paths) + return wf, ds, local_paths diff --git a/CPAC/utils/tests/test_datasource.py b/CPAC/utils/tests/test_datasource.py index be7c2255c..dea1a3877 100644 --- a/CPAC/utils/tests/test_datasource.py +++ b/CPAC/utils/tests/test_datasource.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019-2024 C-PAC Developers +# Copyright (C) 2019-2025 C-PAC Developers # This file is part of C-PAC. @@ -14,9 +14,10 @@ # You should have received a copy of the GNU Lesser General Public # License along with C-PAC. If not, see . -import json +"""Test datasource utilities.""" -import pytest +import json +from pathlib import Path from CPAC.pipeline import nipype_pipeline_engine as pe from CPAC.utils.datasource import match_epi_fmaps @@ -24,8 +25,8 @@ from CPAC.utils.test_resources import setup_test_wf -@pytest.mark.skip(reason="needs refactoring") -def test_match_epi_fmaps(): +def test_match_epi_fmaps(tmp_path: Path) -> None: + """Test `~CPAC.utils.datasource.match_epi_fmaps`.""" # good data to use s3_prefix = "s3://fcp-indi/data/Projects/HBN/MRI/Site-CBIC/sub-NDARAB708LM5" s3_paths = [ @@ -36,7 +37,9 @@ def test_match_epi_fmaps(): "fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.json", ] - wf, ds, local_paths = setup_test_wf(s3_prefix, s3_paths, "test_match_epi_fmaps") + wf, ds, local_paths = setup_test_wf( + s3_prefix, s3_paths, "test_match_epi_fmaps", test_dir=str(tmp_path) + ) opposite_pe_json = local_paths["fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.json"] same_pe_json = local_paths["fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.json"] @@ -65,15 +68,24 @@ def test_match_epi_fmaps(): match_fmaps = pe.Node( Function( - input_names=["fmap_dct", "bold_pedir"], + input_names=[ + "bold_pedir", + "epi_fmap_one", + "epi_fmap_params_one", + "epi_fmap_two", + "epi_fmap_params_two", + ], output_names=["opposite_pe_epi", "same_pe_epi"], function=match_epi_fmaps, as_module=True, ), name="match_epi_fmaps", ) - match_fmaps.inputs.fmap_dct = fmap_paths_dct match_fmaps.inputs.bold_pedir = bold_pedir + match_fmaps.inputs.epi_fmap_one = fmap_paths_dct["epi_PA"]["scan"] + match_fmaps.inputs.epi_fmap_params_one = fmap_paths_dct["epi_PA"]["scan_parameters"] + match_fmaps.inputs.epi_fmap_two = fmap_paths_dct["epi_AP"]["scan"] + match_fmaps.inputs.epi_fmap_params_two = fmap_paths_dct["epi_AP"]["scan_parameters"] ds.inputs.func_json = func_json ds.inputs.opposite_pe_json = opposite_pe_json From 8a7f1036433630133f06152f01992348e45e7286 Mon Sep 17 00:00:00 2001 From: Jon Cluce Date: Wed, 18 Jun 2025 22:21:26 -0400 Subject: [PATCH 3/6] :goal_net: Handle bytestring in phase encoding metadata --- CPAC/utils/datasource.py | 28 ++- CPAC/utils/tests/test_datasource.py | 366 ++++++++++++++++++++++++---- 2 files changed, 339 insertions(+), 55 deletions(-) diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 25adb1eec..aa102b0e3 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012-2024 C-PAC Developers +# Copyright (C) 2012-2025 C-PAC Developers # This file is part of C-PAC. @@ -20,6 +20,7 @@ import json from pathlib import Path import re +from typing import Any, Optional from voluptuous import RequiredFieldInvalid from nipype.interfaces import utility as util @@ -463,12 +464,12 @@ def gather_echo_times(echotime_1, echotime_2, echotime_3=None, echotime_4=None): def match_epi_fmaps( - bold_pedir, - epi_fmap_one, - epi_fmap_params_one, - epi_fmap_two=None, - epi_fmap_params_two=None, -): + bold_pedir: str, + epi_fmap_one: str, + epi_fmap_params_one: dict[str, Any], + epi_fmap_two: Optional[str] = None, + epi_fmap_params_two: Optional[dict[str, Any]] = None, +) -> tuple[str, str]: """Match EPI field maps to the BOLD scan. Parse the field map files in the data configuration and determine which @@ -504,13 +505,22 @@ def match_epi_fmaps( with open(scan_params, "r") as f: scan_params = json.load(f) if "PhaseEncodingDirection" in scan_params: - epi_pedir = scan_params["PhaseEncodingDirection"] + epi_pedir: str | bytes = scan_params["PhaseEncodingDirection"] + if isinstance(epi_pedir, bytes): + epi_pedir = epi_pedir.decode("utf-8") if epi_pedir == bold_pedir: same_pe_epi = epi_scan elif epi_pedir[0] == bold_pedir[0]: opposite_pe_epi = epi_scan - return (opposite_pe_epi, same_pe_epi) + if same_pe_epi is None: + msg = f"Same phase encoding EPI: {bold_pedir}" + raise FileNotFoundError(msg) + if opposite_pe_epi is None: + msg = f"Opposite phase encoding EPI: {bold_pedir}" + raise FileNotFoundError(msg) + + return opposite_pe_epi, same_pe_epi def ingress_func_metadata( diff --git a/CPAC/utils/tests/test_datasource.py b/CPAC/utils/tests/test_datasource.py index dea1a3877..0d1a5c675 100644 --- a/CPAC/utils/tests/test_datasource.py +++ b/CPAC/utils/tests/test_datasource.py @@ -16,55 +16,331 @@ # License along with C-PAC. If not, see . """Test datasource utilities.""" +from dataclasses import dataclass import json from pathlib import Path +from typing import Any + +from networkx.classes.digraph import DiGraph +import pytest from CPAC.pipeline import nipype_pipeline_engine as pe from CPAC.utils.datasource import match_epi_fmaps from CPAC.utils.interfaces import Function from CPAC.utils.test_resources import setup_test_wf +from CPAC.utils.utils import PE_DIRECTION -def test_match_epi_fmaps(tmp_path: Path) -> None: - """Test `~CPAC.utils.datasource.match_epi_fmaps`.""" - # good data to use - s3_prefix = "s3://fcp-indi/data/Projects/HBN/MRI/Site-CBIC/sub-NDARAB708LM5" - s3_paths = [ - "func/sub-NDARAB708LM5_task-rest_run-1_bold.json", - "fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.nii.gz", - "fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.json", - "fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.nii.gz", - "fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.json", - ] +@dataclass +class MatchEpiFmapsInputs: + """Store test data for `match_epi_fmaps`.""" - wf, ds, local_paths = setup_test_wf( - s3_prefix, s3_paths, "test_match_epi_fmaps", test_dir=str(tmp_path) - ) + bold_pedir: PE_DIRECTION + epi_fmaps: list[tuple[str, dict[str, Any]]] + + +def match_epi_fmaps_inputs( + generate: bool, path: Path +) -> tuple[pe.Workflow, MatchEpiFmapsInputs]: + """Return inputs for `~CPAC.utils.datasource.match_epi_fmaps`.""" + if generate: + # good data to use + s3_prefix = "s3://fcp-indi/data/Projects/HBN/MRI/Site-CBIC/sub-NDARAB708LM5" + s3_paths = [ + "func/sub-NDARAB708LM5_task-rest_run-1_bold.json", + "fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.nii.gz", + "fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.json", + "fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.nii.gz", + "fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.json", + ] + + wf, ds, local_paths = setup_test_wf( + s3_prefix, s3_paths, "test_match_epi_fmaps", test_dir=str(path) + ) - opposite_pe_json = local_paths["fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.json"] - same_pe_json = local_paths["fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.json"] - func_json = local_paths["func/sub-NDARAB708LM5_task-rest_run-1_bold.json"] + opposite_pe_json = local_paths["fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.json"] + same_pe_json = local_paths["fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.json"] + func_json = local_paths["func/sub-NDARAB708LM5_task-rest_run-1_bold.json"] - with open(opposite_pe_json, "r") as f: - opposite_pe_params = json.load(f) + with open(opposite_pe_json, "r") as f: + opposite_pe_params = json.load(f) - with open(same_pe_json, "r") as f: - same_pe_params = json.load(f) + with open(same_pe_json, "r") as f: + same_pe_params = json.load(f) - with open(func_json, "r") as f: - func_params = json.load(f) - bold_pedir = func_params["PhaseEncodingDirection"] + with open(func_json, "r") as f: + func_params = json.load(f) + bold_pedir = func_params["PhaseEncodingDirection"] + + fmap_paths_dct = { + "epi_PA": { + "scan": local_paths["fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.nii.gz"], + "scan_parameters": opposite_pe_params, + }, + "epi_AP": { + "scan": local_paths["fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.nii.gz"], + "scan_parameters": same_pe_params, + }, + } + ds.inputs.func_json = func_json + ds.inputs.opposite_pe_json = opposite_pe_json + ds.inputs.same_pe_json = same_pe_json + return wf, MatchEpiFmapsInputs( + bold_pedir, + [ + (scan["scan"], scan["scan_parameters"]) + for scan in fmap_paths_dct.values() + ], + ) + _paths = [ + f"{path}/sub-NDARAB514MAJ_dir-AP_acq-fMRI_epi.nii.gz", + f"{path}/sub-NDARAB514MAJ_dir-PA_acq-fMRI_epi.nii.gz", + ] + for _ in _paths: + Path(_).touch(exist_ok=True) + return pe.Workflow("test_match_epi_fmaps", path), MatchEpiFmapsInputs( + "j-", + [ + ( + _paths[0], + { + "AcquisitionMatrixPE": 84, + "BandwidthPerPixelPhaseEncode": 23.81, + "BaseResolution": 84, + "BodyPartExamined": b"BRAIN", + "ConsistencyInfo": b"N4_VE11B_LATEST_20150530", + "ConversionSoftware": b"dcm2niix", + "ConversionSoftwareVersion": b"v1.0.20171215 GCC4.8.4", + "DerivedVendorReportedEchoSpacing": 0.00049999, + "DeviceSerialNumber": b"67080", + "DwellTime": 2.6e-06, + "EchoTime": 0.0512, + "EchoTrainLength": 84, + "EffectiveEchoSpacing": 0.00049999, + "FlipAngle": 90, + "ImageOrientationPatientDICOM": [1, 0, 0, 0, 1, 0], + "ImageType": ["ORIGINAL", "PRIMARY", "M", "ND", "MOSAIC"], + "InPlanePhaseEncodingDirectionDICOM": b"COL", + "MRAcquisitionType": b"2D", + "MagneticFieldStrength": 3, + "Manufacturer": b"Siemens", + "ManufacturersModelName": b"Prisma_fit", + "Modality": b"MR", + "PartialFourier": 1, + "PatientPosition": b"HFS", + "PercentPhaseFOV": 100, + "PhaseEncodingDirection": b"j-", + "PhaseEncodingSteps": 84, + "PhaseResolution": 1, + "PixelBandwidth": 2290, + "ProcedureStepDescription": b"CMI_HBN-CBIC", + "ProtocolName": b"cmrr_fMRI_DistortionMap_AP", + "PulseSequenceDetails": b"%CustomerSeq%_cmrr_mbep2d_se", + "ReceiveCoilActiveElements": b"HEA;HEP", + "ReceiveCoilName": b"Head_32", + "ReconMatrixPE": 84, + "RepetitionTime": 5.301, + "SAR": 0.364379, + "ScanOptions": b"FS", + "ScanningSequence": b"EP", + "SequenceName": b"epse2d1_84", + "SequenceVariant": b"SK", + "SeriesDescription": b"cmrr_fMRI_DistortionMap_AP", + "ShimSetting": [208, -10464, -5533, 615, -83, -88, 55, 30], + "SliceThickness": 2.4, + "SliceTiming": [ + 2.64, + 0, + 2.7275, + 0.0875, + 2.815, + 0.175, + 2.9025, + 0.2625, + 2.9925, + 0.3525, + 3.08, + 0.44, + 3.1675, + 0.5275, + 3.255, + 0.615, + 3.3425, + 0.7025, + 3.4325, + 0.7925, + 3.52, + 0.88, + 3.6075, + 0.9675, + 3.695, + 1.055, + 3.785, + 1.1425, + 3.8725, + 1.2325, + 3.96, + 1.32, + 4.0475, + 1.4075, + 4.135, + 1.495, + 4.225, + 1.5825, + 4.3125, + 1.6725, + 4.4, + 1.76, + 4.4875, + 1.8475, + 4.575, + 1.935, + 4.665, + 2.0225, + 4.7525, + 2.1125, + 4.84, + 2.2, + 4.9275, + 2.2875, + 5.015, + 2.375, + 5.105, + 2.4625, + 5.1925, + 2.5525, + ], + "SoftwareVersions": b"syngo_MR_E11", + "SpacingBetweenSlices": 2.4, + "StationName": b"MRTRIO3TX72", + "TotalReadoutTime": 0.0414992, + "TxRefAmp": 209.923, + }, + ), + ( + _paths[1], + { + "AcquisitionMatrixPE": 84, + "BandwidthPerPixelPhaseEncode": 23.81, + "BaseResolution": 84, + "BodyPartExamined": b"BRAIN", + "ConsistencyInfo": b"N4_VE11B_LATEST_20150530", + "ConversionSoftware": b"dcm2niix", + "ConversionSoftwareVersion": b"v1.0.20171215 GCC4.8.4", + "DerivedVendorReportedEchoSpacing": 0.00049999, + "DeviceSerialNumber": b"67080", + "DwellTime": 2.6e-06, + "EchoTime": 0.0512, + "EchoTrainLength": 84, + "EffectiveEchoSpacing": 0.00049999, + "FlipAngle": 90, + "ImageOrientationPatientDICOM": [1, 0, 0, 0, 1, 0], + "ImageType": ["ORIGINAL", "PRIMARY", "M", "ND", "MOSAIC"], + "InPlanePhaseEncodingDirectionDICOM": b"COL", + "MRAcquisitionType": b"2D", + "MagneticFieldStrength": 3, + "Manufacturer": b"Siemens", + "ManufacturersModelName": b"Prisma_fit", + "Modality": b"MR", + "PartialFourier": 1, + "PatientPosition": b"HFS", + "PercentPhaseFOV": 100, + "PhaseEncodingDirection": b"j", + "PhaseEncodingSteps": 84, + "PhaseResolution": 1, + "PixelBandwidth": 2290, + "ProcedureStepDescription": b"CMI_HBN-CBIC", + "ProtocolName": b"cmrr_fMRI_DistortionMap_PA", + "PulseSequenceDetails": b"%CustomerSeq%_cmrr_mbep2d_se", + "ReceiveCoilActiveElements": b"HEA;HEP", + "ReceiveCoilName": b"Head_32", + "ReconMatrixPE": 84, + "RepetitionTime": 5.301, + "SAR": 0.364379, + "ScanOptions": b"FS", + "ScanningSequence": b"EP", + "SequenceName": b"epse2d1_84", + "SequenceVariant": b"SK", + "SeriesDescription": b"cmrr_fMRI_DistortionMap_PA", + "ShimSetting": [208, -10464, -5533, 615, -83, -88, 55, 30], + "SliceThickness": 2.4, + "SliceTiming": [ + 2.64, + 0, + 2.73, + 0.09, + 2.8175, + 0.1775, + 2.905, + 0.265, + 2.9925, + 0.3525, + 3.08, + 0.44, + 3.17, + 0.53, + 3.2575, + 0.6175, + 3.345, + 0.705, + 3.4325, + 0.7925, + 3.52, + 0.88, + 3.61, + 0.97, + 3.6975, + 1.0575, + 3.785, + 1.145, + 3.8725, + 1.2325, + 3.9625, + 1.32, + 4.05, + 1.41, + 4.1375, + 1.4975, + 4.225, + 1.585, + 4.3125, + 1.6725, + 4.4025, + 1.76, + 4.49, + 1.85, + 4.5775, + 1.9375, + 4.665, + 2.025, + 4.7525, + 2.1125, + 4.8425, + 2.2, + 4.93, + 2.29, + 5.0175, + 2.3775, + 5.105, + 2.465, + 5.1925, + 2.5525, + ], + "SoftwareVersions": b"syngo_MR_E11", + "SpacingBetweenSlices": 2.4, + "StationName": b"MRTRIO3TX72", + "TotalReadoutTime": 0.0414992, + "TxRefAmp": 209.923, + }, + ), + ], + ) - fmap_paths_dct = { - "epi_PA": { - "scan": local_paths["fmap/sub-NDARAB708LM5_dir-PA_acq-fMRI_epi.nii.gz"], - "scan_parameters": opposite_pe_params, - }, - "epi_AP": { - "scan": local_paths["fmap/sub-NDARAB708LM5_dir-AP_acq-fMRI_epi.nii.gz"], - "scan_parameters": same_pe_params, - }, - } + +@pytest.mark.parametrize("generate", [True, False]) +def test_match_epi_fmaps(generate: bool, tmp_path: Path) -> None: + """Test `~CPAC.utils.datasource.match_epi_fmaps`.""" + wf, data = match_epi_fmaps_inputs(generate, tmp_path) match_fmaps = pe.Node( Function( @@ -81,17 +357,15 @@ def test_match_epi_fmaps(tmp_path: Path) -> None: ), name="match_epi_fmaps", ) - match_fmaps.inputs.bold_pedir = bold_pedir - match_fmaps.inputs.epi_fmap_one = fmap_paths_dct["epi_PA"]["scan"] - match_fmaps.inputs.epi_fmap_params_one = fmap_paths_dct["epi_PA"]["scan_parameters"] - match_fmaps.inputs.epi_fmap_two = fmap_paths_dct["epi_AP"]["scan"] - match_fmaps.inputs.epi_fmap_params_two = fmap_paths_dct["epi_AP"]["scan_parameters"] - - ds.inputs.func_json = func_json - ds.inputs.opposite_pe_json = opposite_pe_json - ds.inputs.same_pe_json = same_pe_json + match_fmaps.inputs.bold_pedir = data.bold_pedir + match_fmaps.inputs.epi_fmap_one = data.epi_fmaps[0][0] + match_fmaps.inputs.epi_fmap_params_one = data.epi_fmaps[0][1] + match_fmaps.inputs.epi_fmap_two = data.epi_fmaps[1][0] + match_fmaps.inputs.epi_fmap_params_two = data.epi_fmaps[1][1] - wf.connect(match_fmaps, "opposite_pe_epi", ds, "should_be_dir-PA") - wf.connect(match_fmaps, "same_pe_epi", ds, "should_be_dir-AP") + wf.add_nodes([match_fmaps]) - wf.run() + graph: DiGraph = wf.run() + result = list(graph.nodes)[-1].run() + assert Path(result.outputs.opposite_pe_epi).exists() + assert Path(result.outputs.same_pe_epi).exists() From 73d64d3c82ccf5c56537b0bf50d962fec4b9d769 Mon Sep 17 00:00:00 2001 From: Jon Cluce Date: Wed, 18 Jun 2025 22:45:17 -0400 Subject: [PATCH 4/6] :rewind: Remove debugging code --- CPAC/utils/monitoring/draw_gantt_chart.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/CPAC/utils/monitoring/draw_gantt_chart.py b/CPAC/utils/monitoring/draw_gantt_chart.py index a299b5108..a7a0aaac9 100644 --- a/CPAC/utils/monitoring/draw_gantt_chart.py +++ b/CPAC/utils/monitoring/draw_gantt_chart.py @@ -39,7 +39,8 @@ # You should have received a copy of the GNU Lesser General Public # License along with C-PAC. If not, see . -"""Module to draw an html gantt chart from logfile produced by `~CPAC.utils.monitoring.log_nodes_cb`. +"""Module to draw an html gantt chart from logfile produced by +``CPAC.utils.monitoring.log_nodes_cb()``. See https://nipype.readthedocs.io/en/latest/api/generated/nipype.utils.draw_gantt_chart.html """ @@ -429,12 +430,9 @@ def generate_gantt_chart( html_string += "

Cores: " + str(cores) + "

" html_string += close_header # Draw nipype nodes Gantt chart and runtimes - try: - html_string += draw_lines( - start_node["start"], duration, minute_scale, space_between_minutes - ) - except: - breakpoint() + html_string += draw_lines( + start_node["start"], duration, minute_scale, space_between_minutes + ) html_string += draw_nodes( start_node["start"], nodes_list, From 1ed8dff3f5b81db4b9617fb0c51fc6b5834959ad Mon Sep 17 00:00:00 2001 From: Jon Cluce Date: Wed, 18 Jun 2025 23:00:02 -0400 Subject: [PATCH 5/6] :art: DRY `match_epi_fmaps_function_node` --- .../distortion_correction.py | 20 ++----------------- CPAC/utils/datasource.py | 19 ++++++++++++++++++ CPAC/utils/tests/test_datasource.py | 19 ++---------------- 3 files changed, 23 insertions(+), 35 deletions(-) diff --git a/CPAC/distortion_correction/distortion_correction.py b/CPAC/distortion_correction/distortion_correction.py index 4457ab91f..0ddf005e9 100644 --- a/CPAC/distortion_correction/distortion_correction.py +++ b/CPAC/distortion_correction/distortion_correction.py @@ -36,7 +36,7 @@ from CPAC.pipeline import nipype_pipeline_engine as pe from CPAC.pipeline.nodeblock import nodeblock from CPAC.utils import function -from CPAC.utils.datasource import match_epi_fmaps +from CPAC.utils.datasource import match_epi_fmaps_function_node from CPAC.utils.interfaces.function import Function @@ -406,23 +406,7 @@ def distcor_blip_afni_qwarp(wf, cfg, strat_pool, pipe_num, opt=None): 3dQWarp. The output of this can then proceed to func_preproc. """ - match_epi_imports = ["import json"] - match_epi_fmaps_node = pe.Node( - Function( - input_names=[ - "bold_pedir", - "epi_fmap_one", - "epi_fmap_params_one", - "epi_fmap_two", - "epi_fmap_params_two", - ], - output_names=["opposite_pe_epi", "same_pe_epi"], - function=match_epi_fmaps, - imports=match_epi_imports, - as_module=True, - ), - name=f"match_epi_fmaps_{pipe_num}", - ) + match_epi_fmaps_node = match_epi_fmaps_function_node(f"match_epi_fmaps_{pipe_num}") node, out = strat_pool.get_data("epi-1") wf.connect(node, out, match_epi_fmaps_node, "epi_fmap_one") diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index aa102b0e3..a23f37348 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -523,6 +523,25 @@ def match_epi_fmaps( return opposite_pe_epi, same_pe_epi +def match_epi_fmaps_function_node(name: str = "match_epi_fmaps"): + """Return a Function node for `~CPAC.utils.datasource.match_epi_fmaps`.""" + return pe.Node( + Function( + input_names=[ + "bold_pedir", + "epi_fmap_one", + "epi_fmap_params_one", + "epi_fmap_two", + "epi_fmap_params_two", + ], + output_names=["opposite_pe_epi", "same_pe_epi"], + function=match_epi_fmaps, + as_module=True, + ), + name=name, + ) + + def ingress_func_metadata( wf, cfg, diff --git a/CPAC/utils/tests/test_datasource.py b/CPAC/utils/tests/test_datasource.py index 0d1a5c675..c95113a9b 100644 --- a/CPAC/utils/tests/test_datasource.py +++ b/CPAC/utils/tests/test_datasource.py @@ -25,8 +25,7 @@ import pytest from CPAC.pipeline import nipype_pipeline_engine as pe -from CPAC.utils.datasource import match_epi_fmaps -from CPAC.utils.interfaces import Function +from CPAC.utils.datasource import match_epi_fmaps_function_node from CPAC.utils.test_resources import setup_test_wf from CPAC.utils.utils import PE_DIRECTION @@ -342,21 +341,7 @@ def test_match_epi_fmaps(generate: bool, tmp_path: Path) -> None: """Test `~CPAC.utils.datasource.match_epi_fmaps`.""" wf, data = match_epi_fmaps_inputs(generate, tmp_path) - match_fmaps = pe.Node( - Function( - input_names=[ - "bold_pedir", - "epi_fmap_one", - "epi_fmap_params_one", - "epi_fmap_two", - "epi_fmap_params_two", - ], - output_names=["opposite_pe_epi", "same_pe_epi"], - function=match_epi_fmaps, - as_module=True, - ), - name="match_epi_fmaps", - ) + match_fmaps = match_epi_fmaps_function_node() match_fmaps.inputs.bold_pedir = data.bold_pedir match_fmaps.inputs.epi_fmap_one = data.epi_fmaps[0][0] match_fmaps.inputs.epi_fmap_params_one = data.epi_fmaps[0][1] From 1156a214c91d48d301da9a48990cd119a23fab83 Mon Sep 17 00:00:00 2001 From: Jon Cluce Date: Fri, 20 Jun 2025 15:21:33 -0400 Subject: [PATCH 6/6] :white_check_mark: Add direct test for better coverage --- CPAC/utils/tests/test_datasource.py | 35 +++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/CPAC/utils/tests/test_datasource.py b/CPAC/utils/tests/test_datasource.py index c95113a9b..6e9e52c0d 100644 --- a/CPAC/utils/tests/test_datasource.py +++ b/CPAC/utils/tests/test_datasource.py @@ -19,13 +19,13 @@ from dataclasses import dataclass import json from pathlib import Path -from typing import Any +from typing import Any, Literal, TypeAlias from networkx.classes.digraph import DiGraph import pytest from CPAC.pipeline import nipype_pipeline_engine as pe -from CPAC.utils.datasource import match_epi_fmaps_function_node +from CPAC.utils.datasource import match_epi_fmaps, match_epi_fmaps_function_node from CPAC.utils.test_resources import setup_test_wf from CPAC.utils.utils import PE_DIRECTION @@ -336,6 +336,10 @@ def match_epi_fmaps_inputs( ) +RunType: TypeAlias = Literal["nipype"] | Literal["direct"] +Direction: TypeAlias = Literal["opposite"] | Literal["same"] + + @pytest.mark.parametrize("generate", [True, False]) def test_match_epi_fmaps(generate: bool, tmp_path: Path) -> None: """Test `~CPAC.utils.datasource.match_epi_fmaps`.""" @@ -352,5 +356,28 @@ def test_match_epi_fmaps(generate: bool, tmp_path: Path) -> None: graph: DiGraph = wf.run() result = list(graph.nodes)[-1].run() - assert Path(result.outputs.opposite_pe_epi).exists() - assert Path(result.outputs.same_pe_epi).exists() + str_outputs: dict[RunType, dict[Direction, str]] = { + "nipype": { + "opposite": result.outputs.opposite_pe_epi, + "same": result.outputs.same_pe_epi, + }, + "direct": {}, + } + path_outputs: dict[RunType, dict[Direction, Path]] = {"nipype": {}, "direct": {}} + str_outputs["direct"]["opposite"], str_outputs["direct"]["same"] = match_epi_fmaps( + data.bold_pedir, + data.epi_fmaps[0][0], + data.epi_fmaps[0][1], + data.epi_fmaps[1][0], + data.epi_fmaps[1][1], + ) + directions: list[Direction] = ["opposite", "same"] + runtypes: list[RunType] = ["nipype", "direct"] + for direction in directions: + for runtype in runtypes: + path_outputs[runtype][direction] = Path(str_outputs[runtype][direction]) + assert path_outputs[runtype][direction].exists() + assert ( + path_outputs["nipype"][direction].name + == path_outputs["direct"][direction].name + )