Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions cubids/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,16 @@ def _parse_datalad_save():
action="store",
help="message for this commit",
)
parser.add_argument(
"--n-cpus",
"--n_cpus",
type=int,
action="store",
dest="n_cpus",
default=1,
help="Number of CPUs (jobs) to use for `datalad save --jobs`. Defaults to 1.",
required=False,
)

return parser

Expand Down Expand Up @@ -801,6 +811,20 @@ def _parse_add_nifti_info():
default=False,
help="unlock dataset before adding nifti info ",
)
parser.add_argument(
"--n-cpus",
"--n_cpus",
type=int,
action="store",
dest="n_cpus",
default=1,
help=(
"Number of CPUs to use for parallel add-nifti-info. "
"Defaults to 1 (sequential processing). When --use-datalad is set, "
"this will set parallel jobs for `datalad save -J <n-cpus>`."
),
required=False,
)
return parser


Expand Down Expand Up @@ -858,6 +882,19 @@ def _parse_add_file_collections():
default=False,
help="unlock dataset before adding file collection metadata",
)
parser.add_argument(
"--n-cpus",
"--n_cpus",
type=int,
action="store",
dest="n_cpus",
default=1,
help=(
"Number of CPUs to use for `datalad save --jobs`. "
"Defaults to 1 (sequential processing)."
),
required=False,
)
return parser


Expand Down Expand Up @@ -922,6 +959,19 @@ def _parse_purge():
default=False,
help="ensure that there are no untracked changes before finding groups",
)
parser.add_argument(
"--n-cpus",
"--n_cpus",
type=int,
action="store",
dest="n_cpus",
default=1,
help=(
"Number of CPUs to use for `datalad save --jobs`. "
"Defaults to 1 (sequential processing)."
),
required=False,
)
return parser


Expand Down
166 changes: 108 additions & 58 deletions cubids/cubids.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import subprocess
import warnings
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from shutil import copyfile, copytree

Expand Down Expand Up @@ -239,7 +240,7 @@ def init_datalad(self):
self.path, cfg_proc="text2git", force=True, annex=True
)

def datalad_save(self, message=None):
def datalad_save(self, message=None, jobs=None):
"""Perform a DataLad Save operation on the BIDS tree.

This method checks for an active DataLad handle and ensures that the
Expand All @@ -250,6 +251,9 @@ def datalad_save(self, message=None):
message : str or None, optional
Commit message to use with DataLad save. If None, a default message
"CuBIDS Save" will be used.
jobs : int or None, optional
Number of parallel jobs to use for the save operation (maps to
DataLad `-J/--jobs`). If None, DataLad's default is used.

Raises
------
Expand All @@ -259,7 +263,7 @@ def datalad_save(self, message=None):
if not self.datalad_ready:
raise Exception("DataLad has not been initialized. use datalad_init()")

statuses = self.datalad_handle.save(message=message or "CuBIDS Save")
statuses = self.datalad_handle.save(message=message or "CuBIDS Save", jobs=jobs)
saved_status = set([status["status"] for status in statuses])
if not saved_status == set(["ok"]):
raise Exception("Failed to save in DataLad")
Expand Down Expand Up @@ -297,7 +301,7 @@ def datalad_undo_last_commit(self):
reset_proc = subprocess.run(["git", "reset", "--hard", "HEAD~1"], cwd=self.path)
reset_proc.check_returncode()

def add_nifti_info(self):
def add_nifti_info(self, n_cpus=1):
"""
Add information from NIfTI files to their corresponding JSON sidecars.

Expand Down Expand Up @@ -326,72 +330,59 @@ def add_nifti_info(self):
directory structure.
- The method will skip any files in hidden directories (directories starting with a dot).
- If a JSON sidecar file does not exist for a NIfTI file, it will be skipped.
Parameters
----------
n_cpus : :obj:`int`
Number of CPUs to use for parallel add-nifti-info. Default is 1 (sequential).

"""
# check if force_unlock is set
if self.force_unlock:
# CHANGE TO SUBPROCESS.CALL IF NOT BLOCKING
subprocess.run(["datalad", "unlock"], cwd=self.path)

# loop through all niftis in the bids dir
# build list of nifti paths in the bids dir
nifti_paths = []
for path in Path(self.path).rglob("sub-*/**/*.*"):
# ignore all dot directories
if "/." in str(path):
continue

if str(path).endswith(".nii") or str(path).endswith(".nii.gz"):
try:
img = nb.load(str(path))
except Exception:
print("Empty Nifti File: ", str(path))
continue

# get important info from niftis
obliquity = np.any(nb.affines.obliquity(img.affine) > 1e-4)
voxel_sizes = img.header.get_zooms()
matrix_dims = img.shape
# add nifti info to corresponding sidecars​
sidecar = utils.img_to_new_ext(str(path), ".json")
if Path(sidecar).exists():
try:
with open(sidecar) as f:
data = json.load(f)
except Exception:
print("Error parsing this sidecar: ", sidecar)

if "Obliquity" not in data.keys():
data["Obliquity"] = str(obliquity)
if "VoxelSizeDim1" not in data.keys():
data["VoxelSizeDim1"] = float(voxel_sizes[0])
if "VoxelSizeDim2" not in data.keys():
data["VoxelSizeDim2"] = float(voxel_sizes[1])
if "VoxelSizeDim3" not in data.keys():
data["VoxelSizeDim3"] = float(voxel_sizes[2])
if "Dim1Size" not in data.keys():
data["Dim1Size"] = matrix_dims[0]
if "Dim2Size" not in data.keys():
data["Dim2Size"] = matrix_dims[1]
if "Dim3Size" not in data.keys():
data["Dim3Size"] = matrix_dims[2]
if "NumVolumes" not in data.keys():
if img.ndim == 4:
data["NumVolumes"] = matrix_dims[3]
elif img.ndim == 3:
data["NumVolumes"] = 1
if "ImageOrientation" not in data.keys():
orient = nb.orientations.aff2axcodes(img.affine)
orient = [str(orientation) for orientation in orient]
joined = "".join(orient) + "+"
data["ImageOrientation"] = joined

with open(sidecar, "w") as file:
json.dump(data, file, indent=4)
nifti_paths.append(str(path))

# Ensure n_cpus is at least 1
try:
n_cpus = int(n_cpus)
except Exception:
n_cpus = 1
n_cpus = max(1, n_cpus)

if n_cpus > 1 and len(nifti_paths) > 0:
with ProcessPoolExecutor(n_cpus) as executor:
list(
tqdm(
executor.map(_add_metadata_single_nifti, nifti_paths),
total=len(nifti_paths),
desc="Processing NIfTI files",
unit="file",
)
)
else:
for nifti_path in tqdm(nifti_paths, desc="Processing NIfTI files", unit="file"):
_add_metadata_single_nifti(nifti_path)

if self.use_datalad:
self.datalad_save(message="Added nifti info to sidecars")
# Check if there are any changes to save
if self.is_datalad_clean():
print("nothing to save, working tree clean")
else:
# Use parallel jobs for DataLad save
dl_jobs = n_cpus if n_cpus and n_cpus > 1 else None
self.datalad_save(message="Added nifti info to sidecars", jobs=dl_jobs)

self.reset_bids_layout()

def add_file_collections(self):
def add_file_collections(self, n_cpus=1):
"""Add file collections to the dataset.

This method processes all files in the BIDS directory specified by `self.path`.
Expand Down Expand Up @@ -435,7 +426,8 @@ def add_file_collections(self):
json.dump(data, f, sort_keys=True, indent=4)

if self.use_datalad:
self.datalad_save(message="Added file collection metadata to sidecars")
dl_jobs = n_cpus if n_cpus and n_cpus > 1 else None
self.datalad_save(message="Added file collection metadata to sidecars", jobs=dl_jobs)

self.reset_bids_layout()

Expand Down Expand Up @@ -823,7 +815,7 @@ def copy_exemplars(self, exemplars_dir, exemplars_tsv, min_group_size):
if self.use_datalad:
subprocess.run(["datalad", "save", "-d", exemplars_dir, "-m", msg])

def purge(self, scans_txt):
def purge(self, scans_txt, n_cpus=1):
"""Purge all associations of desired scans from a bids dataset.

Parameters
Expand All @@ -844,9 +836,9 @@ def purge(self, scans_txt):

# check to ensure scans are all real files in the ds!

self._purge_associations(scans)
self._purge_associations(scans, n_cpus=n_cpus)

def _purge_associations(self, scans):
def _purge_associations(self, scans, n_cpus=1):
"""Purge field map JSONs' IntendedFor references.

Parameters
Expand Down Expand Up @@ -886,7 +878,8 @@ def _purge_associations(self, scans):
s1 = "Purged IntendedFor references to files "
s2 = "requested for removal"
message = s1 + s2
self.datalad_save(message=message)
dl_jobs = n_cpus if n_cpus and n_cpus > 1 else None
self.datalad_save(message=message, jobs=dl_jobs)
self.reset_bids_layout()

# NOW WE WANT TO PURGE ALL ASSOCIATIONS
Expand Down Expand Up @@ -1567,3 +1560,60 @@ def get_fieldmap_lookup(self):
def get_layout(self):
"""Get layout."""
return self.layout


def _add_metadata_single_nifti(nifti_path):
"""Extract metadata from a single NIfTI and write to its sidecar JSON.

Parameters
----------
nifti_path : :obj:`str`
Path to a NIfTI file.
"""
try:
img = nb.load(str(nifti_path))
except Exception:
print("Empty Nifti File: ", str(nifti_path))
return

# get important info from niftis
obliquity = np.any(nb.affines.obliquity(img.affine) > 1e-4)
voxel_sizes = img.header.get_zooms()
matrix_dims = img.shape
# add nifti info to corresponding sidecars
sidecar = utils.img_to_new_ext(str(nifti_path), ".json")
if Path(sidecar).exists():
try:
with open(sidecar) as f:
data = json.load(f)
except Exception:
print("Error parsing this sidecar: ", sidecar)
return

if "Obliquity" not in data.keys():
data["Obliquity"] = str(obliquity)
if "VoxelSizeDim1" not in data.keys():
data["VoxelSizeDim1"] = float(voxel_sizes[0])
if "VoxelSizeDim2" not in data.keys():
data["VoxelSizeDim2"] = float(voxel_sizes[1])
if "VoxelSizeDim3" not in data.keys():
data["VoxelSizeDim3"] = float(voxel_sizes[2])
if "Dim1Size" not in data.keys():
data["Dim1Size"] = matrix_dims[0]
if "Dim2Size" not in data.keys():
data["Dim2Size"] = matrix_dims[1]
if "Dim3Size" not in data.keys():
data["Dim3Size"] = matrix_dims[2]
if "NumVolumes" not in data.keys():
if img.ndim == 4:
data["NumVolumes"] = matrix_dims[3]
elif img.ndim == 3:
data["NumVolumes"] = 1
if "ImageOrientation" not in data.keys():
orient = nb.orientations.aff2axcodes(img.affine)
orient = [str(orientation) for orientation in orient]
joined = "".join(orient) + "+"
data["ImageOrientation"] = joined

with open(sidecar, "w") as file:
json.dump(data, file, indent=4)
Loading