Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 58 additions & 58 deletions rbc_qc_metrics_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ import numpy as np
def calculate_FD_J_from_rms(rms_file):
"""
Calculate FD-Jenkinson from MCFLIRT *_rel.rms output.

Parameters
----------
rms_file : str
Path to *_rel.rms file from MCFLIRT

Returns
-------
fdj : np.ndarray
Expand All @@ -82,29 +82,29 @@ import numpy as np
def calculate_FD_P(motion_params_file):
"""
Calculate FD-Power from 6-parameter motion file.

Parameters
----------
motion_params_file : str
Path to motion parameters file (6 columns: 3 rotation, 3 translation)
Format: [rot_x, rot_y, rot_z, trans_x, trans_y, trans_z] per row

Returns
-------
fd : np.ndarray
Framewise displacement array
"""
motion_params = np.genfromtxt(motion_params_file).T

# Rotations (first 3 params) - convert to mm using 50mm sphere radius
rotations = np.transpose(np.abs(np.diff(motion_params[0:3, :])))
# Translations (last 3 params)
translations = np.transpose(np.abs(np.diff(motion_params[3:6, :])))

# FD = sum of translations + (50mm * pi/180) * sum of rotations
fd = np.sum(translations, axis=1) + (50 * np.pi / 180) * np.sum(rotations, axis=1)
fd = np.insert(fd, 0, 0) # Prepend 0 for first volume

return fd
```

Expand All @@ -118,12 +118,12 @@ import numpy as np
def calculate_rms_motion(motion_params_file):
"""
Calculate RMS of translation parameters.

Parameters
----------
motion_params_file : str
Path to motion parameters (6 columns)

Returns
-------
mean_rms : float
Expand All @@ -132,10 +132,10 @@ def calculate_rms_motion(motion_params_file):
Maximum RMS motion
"""
mot = np.genfromtxt(motion_params_file).T

# RMS of translation parameters (columns 3, 4, 5 for x, y, z translation)
rms = np.sqrt(mot[3]**2 + mot[4]**2 + mot[5]**2)

return np.mean(rms), np.max(rms)
```

Expand All @@ -156,14 +156,14 @@ import nibabel as nib
def calculate_DVARS(func_file, mask_file):
"""
Calculate DVARS (temporal derivative of RMS variance).

Parameters
----------
func_file : str
Path to 4D functional image
mask_file : str
Path to brain mask

Returns
-------
dvars : np.ndarray
Expand All @@ -173,22 +173,22 @@ def calculate_DVARS(func_file, mask_file):
"""
func_data = nib.load(func_file).get_fdata().astype(np.float32)
mask_data = nib.load(mask_file).get_fdata().astype(bool)

# Temporal difference (derivative)
diff_data = np.diff(func_data, axis=3)

# Square of differences
squared_diff = np.square(diff_data)

# Apply mask
masked_data = squared_diff[mask_data]

# RMS across voxels for each timepoint
dvars = np.sqrt(np.mean(masked_data, axis=0))

# Prepend 0 for first volume
dvars = np.insert(dvars, 0, 0)

return dvars, np.mean(dvars)
```

Expand All @@ -207,33 +207,33 @@ import numpy as np
def calculate_motion_dvars_correlation(dvars_file, fdj_file):
"""
Calculate correlation between DVARS and FD-Jenkinson.

Note: DVARS has N-1 values (no derivative for first volume),
so we correlate with FD[1:] (skipping first FD value).

Parameters
----------
dvars_file : str
Path to DVARS 1D file
fdj_file : str
Path to FD-Jenkinson 1D file

Returns
-------
correlation : float
Pearson correlation coefficient
"""
dvars = np.loadtxt(dvars_file)
fdj = np.loadtxt(fdj_file)

# DVARS is 1 shorter than FD (or both have leading 0)
# If both have leading 0, skip it
if len(dvars) == len(fdj):
dvars = dvars[1:]
fdj = fdj[1:]
elif len(dvars) == len(fdj) - 1:
fdj = fdj[1:]

return np.corrcoef(dvars, fdj)[0, 1]
```

Expand Down Expand Up @@ -261,21 +261,21 @@ import nibabel as nib
def dice_coefficient(mask1_file, mask2_file):
"""
Calculate Dice coefficient between two binary masks.

DC = 2|A ∩ B| / (|A| + |B|)

Range: 0 (no overlap) to 1 (perfect overlap)
"""
mask1 = nib.load(mask1_file).get_fdata().astype(bool)
mask2 = nib.load(mask2_file).get_fdata().astype(bool)

intersection = np.count_nonzero(mask1 & mask2)
size1 = np.count_nonzero(mask1)
size2 = np.count_nonzero(mask2)

if size1 + size2 == 0:
return 0.0

return 2.0 * intersection / (size1 + size2)
```

Expand All @@ -285,20 +285,20 @@ def dice_coefficient(mask1_file, mask2_file):
def jaccard_coefficient(mask1_file, mask2_file):
"""
Calculate Jaccard coefficient between two binary masks.

JC = |A ∩ B| / |A ∪ B|

Range: 0 (no overlap) to 1 (perfect overlap)
"""
mask1 = nib.load(mask1_file).get_fdata().astype(bool)
mask2 = nib.load(mask2_file).get_fdata().astype(bool)

intersection = np.count_nonzero(mask1 & mask2)
union = np.count_nonzero(mask1 | mask2)

if union == 0:
return 0.0

return intersection / union
```

Expand All @@ -311,7 +311,7 @@ def cross_correlation(mask1_file, mask2_file):
"""
mask1 = nib.load(mask1_file).get_fdata().astype(bool).flatten()
mask2 = nib.load(mask2_file).get_fdata().astype(bool).flatten()

return np.corrcoef(mask1, mask2)[0, 1]
```

Expand All @@ -321,18 +321,18 @@ def cross_correlation(mask1_file, mask2_file):
def coverage(mask1_file, mask2_file):
"""
Calculate coverage: intersection / smaller mask.

Measures how much of the smaller mask is covered by the overlap.
"""
mask1 = nib.load(mask1_file).get_fdata().astype(bool)
mask2 = nib.load(mask2_file).get_fdata().astype(bool)

intersection = np.count_nonzero(mask1 & mask2)
smaller_size = min(np.count_nonzero(mask1), np.count_nonzero(mask2))

if smaller_size == 0:
return 0.0

return intersection / smaller_size
```

Expand All @@ -348,7 +348,7 @@ Number of volumes flagged for censoring based on motion/DVARS thresholds.
def count_censored_volumes(fd, dvars, fd_threshold=0.2, dvars_threshold=None):
"""
Count volumes exceeding motion thresholds.

Parameters
----------
fd : np.ndarray
Expand All @@ -359,7 +359,7 @@ def count_censored_volumes(fd, dvars, fd_threshold=0.2, dvars_threshold=None):
FD threshold (RBC uses 0.2 mm)
dvars_threshold : float, optional
DVARS threshold

Returns
-------
n_censored : int
Expand All @@ -368,10 +368,10 @@ def count_censored_volumes(fd, dvars, fd_threshold=0.2, dvars_threshold=None):
Indices of censored volumes
"""
censor_mask = fd > fd_threshold

if dvars_threshold is not None:
censor_mask |= dvars > dvars_threshold

return np.sum(censor_mask), np.where(censor_mask)[0].tolist()
```

Expand Down Expand Up @@ -407,7 +407,7 @@ def generate_xcp_qc(
):
"""
Generate XCP-style QC metrics TSV.

Returns
-------
qc_dict : dict
Expand All @@ -416,46 +416,46 @@ def generate_xcp_qc(
# Motion metrics
fdj = np.loadtxt(fd_jenkinson_file)
mean_fd = np.mean(fdj)

mot = np.genfromtxt(motion_params_file).T
rms = np.sqrt(mot[3]**2 + mot[4]**2 + mot[5]**2)

# DVARS
dvars_init = np.loadtxt(dvars_before_file)
dvars_final = np.loadtxt(dvars_after_file)

# Motion-DVARS correlation
def dvcorr(dvars, fd):
if len(dvars) == len(fd):
return np.corrcoef(dvars[1:], fd[1:])[0, 1]
return np.corrcoef(dvars, fd[1:])[0, 1]

# Registration metrics
def load_mask(f):
return nib.load(f).get_fdata().astype(bool)

def dice(m1, m2):
inter = np.count_nonzero(m1 & m2)
return 2.0 * inter / (np.count_nonzero(m1) + np.count_nonzero(m2))

def jaccard(m1, m2):
return np.count_nonzero(m1 & m2) / np.count_nonzero(m1 | m2)

def crosscorr(m1, m2):
return np.corrcoef(m1.flatten(), m2.flatten())[0, 1]

def coverage(m1, m2):
inter = np.count_nonzero(m1 & m2)
return inter / min(np.count_nonzero(m1), np.count_nonzero(m2))

# Load masks
coreg_m1, coreg_m2 = load_mask(bold2t1w_mask), load_mask(t1w_mask)
norm_m1, norm_m2 = load_mask(bold2template_mask), load_mask(template_mask)

# Volume counts
orig_nvols = nib.load(original_func).shape[3]
final_nvols = nib.load(final_func).shape[3]

return {
'sub': sub, 'ses': ses, 'task': task, 'run': run,
'desc': desc, 'regressors': regressors, 'space': space,
Expand Down Expand Up @@ -496,22 +496,22 @@ From the RBC paper (Shafiei et al., Neuron 2025):
def passes_rbc_qc(qc_dict, fd_timeseries):
"""
Apply RBC recommended QC thresholds.

Parameters
----------
qc_dict : dict
Output from generate_xcp_qc
fd_timeseries : np.ndarray
Full FD timeseries (for median calculation)

Returns
-------
passes : bool
True if scan passes QC
"""
median_fd = np.median(fd_timeseries)
norm_cc = qc_dict['normCrossCorr']

return (median_fd <= 0.2) and (norm_cc >= 0.8)
```

Expand Down
Loading
Loading