|
| 1 | +"""Functionality for handling cases where anatomical images are not available.""" |
| 2 | +import nibabel |
| 3 | +import numpy |
| 4 | +import shutil |
| 5 | +import os |
| 6 | +import re |
| 7 | +from pathlib import Path |
| 8 | +from typing import Union, Dict, Optional |
| 9 | + |
| 10 | + |
| 11 | +def get_data_path(filename: str) -> Path: |
| 12 | + """Get the path to a data file included in the package. |
| 13 | + |
| 14 | + Parameters |
| 15 | + ---------- |
| 16 | + filename : str |
| 17 | + Name of the file to get the path for. This should be relative to the data directory. |
| 18 | + For example: "sub-01/ses-baseline/anat/sub-01_ses-baseline_T1w.nii" |
| 19 | + |
| 20 | + Returns |
| 21 | + ------- |
| 22 | + Path |
| 23 | + Path to the requested data file |
| 24 | + |
| 25 | + Raises |
| 26 | + ------ |
| 27 | + FileNotFoundError |
| 28 | + If the requested file is not found in the package data |
| 29 | + """ |
| 30 | + # Get the path to the data directory |
| 31 | + data_dir = Path(__file__).parent.parent / "data" |
| 32 | + |
| 33 | + # Construct the full path |
| 34 | + full_path = data_dir / filename |
| 35 | + |
| 36 | + # Check if the file exists |
| 37 | + if not full_path.exists(): |
| 38 | + raise FileNotFoundError(f"Could not find data file {filename} in data directory") |
| 39 | + |
| 40 | + return full_path |
| 41 | + |
| 42 | + |
| 43 | +def get_default_anat() -> Path: |
| 44 | + """Get the path to the default anatomical image. |
| 45 | + |
| 46 | + Returns |
| 47 | + ------- |
| 48 | + Path |
| 49 | + Path to the default T1w image that should be used when no anatomical |
| 50 | + image is available for a PET scan. |
| 51 | + """ |
| 52 | + return get_data_path("sub-01/ses-baseline/anat/sub-01_ses-baseline_T1w.nii") |
| 53 | + |
| 54 | + |
| 55 | +def get_default_anat_data() -> nibabel.Nifti1Image: |
| 56 | + """Get the default anatomical image as a nibabel image object. |
| 57 | + |
| 58 | + Returns |
| 59 | + ------- |
| 60 | + nibabel.Nifti1Image |
| 61 | + The default T1w image loaded as a nibabel image object |
| 62 | + """ |
| 63 | + return nibabel.load(get_default_anat()) |
| 64 | + |
| 65 | + |
| 66 | +def extract_subject_id(input_str: str) -> str: |
| 67 | + """Extract subject ID from various input formats. |
| 68 | + |
| 69 | + This function can handle: |
| 70 | + - Full paths (e.g., "/path/to/sub-123/anat/file.nii") |
| 71 | + - Subject IDs with prefix (e.g., "sub-123") |
| 72 | + - Raw subject IDs (e.g., "123") |
| 73 | + |
| 74 | + Parameters |
| 75 | + ---------- |
| 76 | + input_str : str |
| 77 | + Input string containing a subject ID in any format |
| 78 | + |
| 79 | + Returns |
| 80 | + ------- |
| 81 | + str |
| 82 | + Extracted subject ID without the 'sub-' prefix |
| 83 | + |
| 84 | + Raises |
| 85 | + ------ |
| 86 | + ValueError |
| 87 | + If no valid subject ID can be extracted |
| 88 | + """ |
| 89 | + # Pattern to match 'sub-XXXX' in various contexts |
| 90 | + # The pattern captures everything after 'sub-' until it hits a non-alphanumeric character or the end of the string |
| 91 | + pattern = r'sub-([a-zA-Z0-9]+)(?:_|$)' |
| 92 | + |
| 93 | + # Try to find a match |
| 94 | + match = re.search(pattern, input_str) |
| 95 | + |
| 96 | + if match: |
| 97 | + # Return the captured group (the subject ID without 'sub-' prefix) |
| 98 | + return match.group(1) |
| 99 | + else: |
| 100 | + # If no 'sub-' prefix is found, check if the input is a valid subject ID |
| 101 | + if re.match(r'^[a-zA-Z0-9]+$', input_str): |
| 102 | + return input_str |
| 103 | + else: |
| 104 | + # Try a more flexible approach for paths |
| 105 | + path_match = re.search(r'sub-([a-zA-Z0-9]+)', input_str) |
| 106 | + if path_match: |
| 107 | + return path_match.group(1) |
| 108 | + else: |
| 109 | + raise ValueError(f"Could not extract a valid subject ID from '{input_str}'") |
| 110 | + |
| 111 | + |
| 112 | +def copy_default_anat_to_subject(bids_dir: Union[str, Path], subject_id: str) -> dict: |
| 113 | + """Copy the default anatomical image to a PET subject's folder in a BIDS dataset. |
| 114 | + |
| 115 | + This function extracts the subject ID from the provided string using regex, |
| 116 | + then copies the default anatomical image and its JSON metadata to the subject's folder in the BIDS dataset. |
| 117 | + |
| 118 | + Parameters |
| 119 | + ---------- |
| 120 | + bids_dir : Union[str, Path] |
| 121 | + Path to the BIDS dataset |
| 122 | + subject_id : str |
| 123 | + Subject ID in any format: |
| 124 | + - Full path (e.g., "/path/to/sub-123/anat/file.nii") |
| 125 | + - Subject ID with prefix (e.g., "sub-123") |
| 126 | + - Raw subject ID (e.g., "123") |
| 127 | + |
| 128 | + Returns |
| 129 | + ------- |
| 130 | + dict |
| 131 | + A dictionary containing information about the created files and directories: |
| 132 | + { |
| 133 | + 'subject_dir': Path to the subject directory, |
| 134 | + 'anat_dir': Path to the anatomical directory, |
| 135 | + 'created_dirs': List of paths to newly created directories, |
| 136 | + 'created_files': List of paths to newly created files |
| 137 | + } |
| 138 | + |
| 139 | + Raises |
| 140 | + ------ |
| 141 | + FileNotFoundError |
| 142 | + If the BIDS directory or subject directory does not exist |
| 143 | + ValueError |
| 144 | + If the subject ID is invalid or cannot be extracted |
| 145 | + """ |
| 146 | + # Convert bids_dir to Path if it's a string |
| 147 | + bids_dir = Path(bids_dir) |
| 148 | + |
| 149 | + # Check if the BIDS directory exists |
| 150 | + if not bids_dir.exists(): |
| 151 | + raise FileNotFoundError(f"BIDS directory {bids_dir} does not exist") |
| 152 | + |
| 153 | + # Extract subject ID using regex |
| 154 | + try: |
| 155 | + extracted_id = extract_subject_id(subject_id) |
| 156 | + except ValueError as e: |
| 157 | + raise ValueError(f"Invalid subject ID: {e}") |
| 158 | + |
| 159 | + # Create the subject directory structure |
| 160 | + subject_dir = bids_dir / f"sub-{extracted_id}" |
| 161 | + |
| 162 | + # Check if the subject directory exists |
| 163 | + if not subject_dir.exists(): |
| 164 | + raise FileNotFoundError(f"Subject directory {subject_dir} does not exist") |
| 165 | + |
| 166 | + anat_dir = subject_dir / "anat" |
| 167 | + |
| 168 | + # Create the anatomical directory if it doesn't exist |
| 169 | + created_dirs = [] |
| 170 | + if not anat_dir.exists(): |
| 171 | + anat_dir.mkdir(parents=True, exist_ok=True) |
| 172 | + created_dirs.append(anat_dir) |
| 173 | + |
| 174 | + # Define the target file paths |
| 175 | + target_nii = anat_dir / f"sub-{extracted_id}_T1w.nii" |
| 176 | + target_json = anat_dir / f"sub-{extracted_id}_T1w.json" |
| 177 | + |
| 178 | + # Get the source file paths |
| 179 | + source_nii = get_default_anat() |
| 180 | + source_json = get_data_path("sub-01/ses-baseline/anat/sub-01_ses-baseline_T1w.json") |
| 181 | + |
| 182 | + # Copy the files |
| 183 | + created_files = [] |
| 184 | + shutil.copy2(source_nii, target_nii) |
| 185 | + created_files.append(target_nii) |
| 186 | + |
| 187 | + shutil.copy2(source_json, target_json) |
| 188 | + created_files.append(target_json) |
| 189 | + |
| 190 | + print(f"Copied default anatomical image to {target_nii}") |
| 191 | + print(f"Copied default anatomical metadata to {target_json}") |
| 192 | + |
| 193 | + # Return information about created files and directories |
| 194 | + return { |
| 195 | + 'subject_dir': subject_dir, |
| 196 | + 'anat_dir': anat_dir, |
| 197 | + 'created_dirs': created_dirs, |
| 198 | + 'created_files': created_files |
| 199 | + } |
| 200 | + |
| 201 | + |
| 202 | +def remove_default_anat( |
| 203 | + bids_dir: Union[str, Path], |
| 204 | + subject_id: Optional[str] = None, |
| 205 | + created_items: Optional[Dict] = None |
| 206 | +) -> None: |
| 207 | + """Remove the default anatomical image and directory created for a subject. |
| 208 | + |
| 209 | + This function can be used in two ways: |
| 210 | + 1. With the bids_dir and subject_id to identify what to remove |
| 211 | + 2. With the bids_dir and the dictionary returned by copy_default_anat_to_subject |
| 212 | + |
| 213 | + Parameters |
| 214 | + ---------- |
| 215 | + bids_dir : Union[str, Path] |
| 216 | + Path to the BIDS dataset |
| 217 | + subject_id : Optional[str] |
| 218 | + Subject ID in any format (if created_items is not provided) |
| 219 | + created_items : Optional[Dict] |
| 220 | + Dictionary returned by copy_default_anat_to_subject (if subject_id is not provided) |
| 221 | + |
| 222 | + Returns |
| 223 | + ------- |
| 224 | + None |
| 225 | + |
| 226 | + Raises |
| 227 | + ------ |
| 228 | + ValueError |
| 229 | + If neither subject_id nor created_items is provided, or if both are provided |
| 230 | + FileNotFoundError |
| 231 | + If the BIDS directory or subject directory does not exist |
| 232 | + """ |
| 233 | + # Convert bids_dir to Path if it's a string |
| 234 | + bids_dir = Path(bids_dir) |
| 235 | + |
| 236 | + # Check if the BIDS directory exists |
| 237 | + if not bids_dir.exists(): |
| 238 | + raise FileNotFoundError(f"BIDS directory {bids_dir} does not exist") |
| 239 | + |
| 240 | + # Determine which approach to use |
| 241 | + if subject_id is not None and created_items is not None: |
| 242 | + raise ValueError("Cannot provide both subject_id and created_items") |
| 243 | + elif subject_id is None and created_items is None: |
| 244 | + raise ValueError("Must provide either subject_id or created_items") |
| 245 | + |
| 246 | + if created_items is not None: |
| 247 | + # Use the dictionary returned by copy_default_anat_to_subject |
| 248 | + anat_dir = created_items['anat_dir'] |
| 249 | + created_files = created_items['created_files'] |
| 250 | + created_dirs = created_items['created_dirs'] |
| 251 | + else: |
| 252 | + # Extract subject ID using regex |
| 253 | + try: |
| 254 | + extracted_id = extract_subject_id(subject_id) |
| 255 | + except ValueError as e: |
| 256 | + raise ValueError(f"Invalid subject ID: {e}") |
| 257 | + |
| 258 | + # Create the subject directory structure |
| 259 | + subject_dir = bids_dir / f"sub-{extracted_id}" |
| 260 | + |
| 261 | + # Check if the subject directory exists |
| 262 | + if not subject_dir.exists(): |
| 263 | + raise FileNotFoundError(f"Subject directory {subject_dir} does not exist") |
| 264 | + |
| 265 | + anat_dir = subject_dir / "anat" |
| 266 | + |
| 267 | + # Define the file paths |
| 268 | + target_nii = anat_dir / f"sub-{extracted_id}_T1w.nii" |
| 269 | + target_json = anat_dir / f"sub-{extracted_id}_T1w.json" |
| 270 | + |
| 271 | + # Check if the files exist |
| 272 | + created_files = [] |
| 273 | + if target_nii.exists(): |
| 274 | + created_files.append(target_nii) |
| 275 | + if target_json.exists(): |
| 276 | + created_files.append(target_json) |
| 277 | + |
| 278 | + # Check if the directory exists and is empty |
| 279 | + created_dirs = [] |
| 280 | + if anat_dir.exists(): |
| 281 | + # Only add to created_dirs if it's empty or only contains our files |
| 282 | + remaining_files = set(anat_dir.iterdir()) - set(created_files) |
| 283 | + if not remaining_files: |
| 284 | + created_dirs.append(anat_dir) |
| 285 | + |
| 286 | + # Remove the files |
| 287 | + for file_path in created_files: |
| 288 | + if file_path.exists(): |
| 289 | + file_path.unlink() |
| 290 | + print(f"Removed file: {file_path}") |
| 291 | + |
| 292 | + # Remove the directories (only if they're empty) |
| 293 | + for dir_path in created_dirs: |
| 294 | + if dir_path.exists() and not any(dir_path.iterdir()): |
| 295 | + dir_path.rmdir() |
| 296 | + print(f"Removed directory: {dir_path}") |
| 297 | + |
| 298 | + print("Cleanup completed successfully") |
| 299 | + |
0 commit comments