Skip to content

Commit e722a67

Browse files
authored
Add parallel processing to cubids apply (#481)
* add --n-cpus to cubids apply * fix pytests * fix pytests * Handle both participants.tsv and participants.json in subject validation * add logger warning * add na_rep="n/a" to all to_csv
1 parent e5bc1c9 commit e722a67

File tree

8 files changed

+276
-156
lines changed

8 files changed

+276
-156
lines changed

cubids/cli.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,20 @@ def _parse_apply():
535535
required=False,
536536
)
537537

538+
parser.add_argument(
539+
"--n-cpus",
540+
"--n_cpus",
541+
type=int,
542+
action="store",
543+
dest="n_cpus",
544+
default=1,
545+
help=(
546+
"Number of CPUs to use for datalad jobs. "
547+
"Used as --jobs for datalad save and datalad run."
548+
),
549+
required=False,
550+
)
551+
538552
return parser
539553

540554

cubids/cubids.py

Lines changed: 201 additions & 118 deletions
Large diffs are not rendered by default.

cubids/metadata_merge.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ def group_by_acquisition_sets(files_tsv, output_prefix, acq_group_level, is_long
381381

382382
# Write the mapping of subject/session to
383383
acq_group_df = pd.DataFrame(grouped_sub_sess)
384-
acq_group_df.to_csv(output_prefix + "_AcqGrouping.tsv", sep="\t", index=False)
384+
acq_group_df.to_csv(output_prefix + "_AcqGrouping.tsv", sep="\t", index=False, na_rep="n/a")
385385

386386
# Create data dictionary for acq group tsv
387387
acq_dict = get_acq_dictionary(is_longitudinal)

cubids/tests/test_apply.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,12 +323,12 @@ def test_cubids_apply_intendedfor(
323323
# Create a CuBIDS summary tsv
324324
summary_tsv = tmpdir / "summary.tsv"
325325
df = pd.DataFrame(summary_data)
326-
df.to_csv(summary_tsv, sep="\t", index=False)
326+
df.to_csv(summary_tsv, sep="\t", index=False, na_rep="n/a")
327327

328328
# Create a CuBIDS files tsv
329329
files_tsv = tmpdir / "files.tsv"
330330
df = pd.DataFrame(fdata)
331-
df.to_csv(files_tsv, sep="\t", index=False)
331+
df.to_csv(files_tsv, sep="\t", index=False, na_rep="n/a")
332332

333333
# Run cubids apply
334334
if isinstance(expected, str):

cubids/tests/test_bond.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ def test_tsv_merge_no_datalad(tmp_path):
459459
summary_df.loc[fa_nan_dwi_row, "MergeInto"] = summary_df.ParamGroup[complete_dwi_row]
460460

461461
valid_tsv_file = tsv_prefix + "_valid_summary.tsv"
462-
summary_df.to_csv(valid_tsv_file, sep="\t", index=False)
462+
summary_df.to_csv(valid_tsv_file, sep="\t", index=False, na_rep="n/a")
463463

464464
# about to apply merges!
465465

@@ -472,7 +472,7 @@ def test_tsv_merge_no_datalad(tmp_path):
472472
complete_dwi_row
473473
]
474474
invalid_tsv_file = tsv_prefix + "_invalid_summary.tsv"
475-
summary_df.to_csv(invalid_tsv_file, sep="\t", index=False)
475+
summary_df.to_csv(invalid_tsv_file, sep="\t", index=False, na_rep="n/a")
476476

477477
with pytest.raises(Exception):
478478
bod.apply_tsv_changes(
@@ -572,7 +572,7 @@ def test_tsv_merge_changes(tmp_path):
572572
summary_df.loc[fa_nan_dwi_row, "MergeInto"] = summary_df.ParamGroup[complete_dwi_row]
573573

574574
valid_tsv_file = tsv_prefix + "_valid_summary.tsv"
575-
summary_df.to_csv(valid_tsv_file, sep="\t", index=False)
575+
summary_df.to_csv(valid_tsv_file, sep="\t", index=False, na_rep="n/a")
576576

577577
# about to merge
578578
bod.apply_tsv_changes(valid_tsv_file, original_files_tsv, str(tmp_path / "ok_modified"))
@@ -584,7 +584,7 @@ def test_tsv_merge_changes(tmp_path):
584584
complete_dwi_row
585585
]
586586
invalid_tsv_file = tsv_prefix + "_invalid_summary.tsv"
587-
summary_df.to_csv(invalid_tsv_file, sep="\t", index=False)
587+
summary_df.to_csv(invalid_tsv_file, sep="\t", index=False, na_rep="n/a")
588588

589589
with pytest.raises(Exception):
590590
bod.apply_tsv_changes(

cubids/tests/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def _add_deletion(summary_tsv):
108108
"""
109109
df = pd.read_table(summary_tsv)
110110
df.loc[3, "MergeInto"] = 0
111-
df.to_csv(summary_tsv, sep="\t", index=False)
111+
df.to_csv(summary_tsv, sep="\t", index=False, na_rep="n/a")
112112
return df.loc[3, "KeyParamGroup"]
113113

114114

cubids/workflows.py

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -127,34 +127,44 @@ def _link_or_copy(src_path, dst_path):
127127
if not os.path.exists(subject_folder_path):
128128
os.makedirs(subject_folder_path, exist_ok=True)
129129

130-
# Ensure participants.tsv is available in temp root and is a copy (not a link)
131-
# Always COPY (never link) to avoid modifying the original file when filtering
132-
participants_tsv_path = os.path.join(temporary_bids_dir, "participants.tsv")
133-
# Always remove existing file first in case it was linked in the earlier loop
134-
if os.path.exists(participants_tsv_path):
130+
# Ensure participants.tsv and participants.json are available in temp root
131+
# Always COPY (never link) to avoid modifying the original files when filtering
132+
participants_files = ["participants.tsv", "participants.json"]
133+
for filename in participants_files:
134+
dest_path = os.path.join(temporary_bids_dir, filename)
135+
# Always remove existing file first in case it was linked in the earlier loop
136+
if os.path.exists(dest_path):
137+
try:
138+
os.remove(dest_path)
139+
except Exception as e:
140+
logger.warning(
141+
f"Failed to remove existing file '{dest_path}': {e}. "
142+
"The file may be overwritten or cause conflicts."
143+
)
144+
# Try to find source file in the provided file list
135145
try:
136-
os.remove(participants_tsv_path)
137-
except Exception: # noqa: BLE001
138-
pass
139-
# Try to find a source participants.tsv in the provided file list
140-
try:
141-
source_participants_tsv_path = None
142-
for candidate_path in files_list:
143-
if os.path.basename(candidate_path) == "participants.tsv":
144-
source_participants_tsv_path = candidate_path
145-
break
146-
# If not in file list, try to get it from the original bids_dir
147-
if not source_participants_tsv_path and bids_dir:
148-
potential_path = os.path.join(bids_dir, "participants.tsv")
149-
if os.path.exists(potential_path):
150-
source_participants_tsv_path = potential_path
151-
if source_participants_tsv_path:
152-
# Always copy (not link) to protect the original file from modification
153-
shutil.copy2(source_participants_tsv_path, participants_tsv_path)
154-
except Exception: # noqa: BLE001
155-
pass
146+
source_path = None
147+
for candidate_path in files_list:
148+
if os.path.basename(candidate_path) == filename:
149+
source_path = candidate_path
150+
break
151+
# If not in file list, try to get it from the original bids_dir
152+
if not source_path and bids_dir:
153+
potential_path = os.path.join(bids_dir, filename)
154+
if os.path.exists(potential_path):
155+
source_path = potential_path
156+
if source_path:
157+
# Always copy (not link) to protect the original file from modification
158+
shutil.copy2(source_path, dest_path)
159+
except Exception as e:
160+
source_info = source_path if source_path else "unknown location"
161+
logger.warning(
162+
f"Failed to copy '{filename}' from '{source_info}' to '{dest_path}': {e}. "
163+
"The file may be missing or inaccessible."
164+
)
156165

157166
# If participants.tsv exists in the temp BIDS root, filter to current subject
167+
participants_tsv_path = os.path.join(temporary_bids_dir, "participants.tsv")
158168
if os.path.exists(participants_tsv_path):
159169
try:
160170
participants_table = pd.read_csv(participants_tsv_path, sep="\t")
@@ -166,10 +176,13 @@ def _link_or_copy(src_path, dst_path):
166176
participants_tsv_path,
167177
sep="\t",
168178
index=False,
179+
na_rep="n/a",
169180
)
170-
except Exception as e: # noqa: F841
171-
# Non-fatal: continue validation even if filtering fails
172-
pass
181+
except Exception as e:
182+
logger.warning(
183+
f"Failed to filter participants.tsv for subject {subject}: {e}. "
184+
"Continuing validation without filtering."
185+
)
173186

174187
# Run the validator
175188
call = build_validator_call(
@@ -271,7 +284,7 @@ def validate(
271284
else:
272285
val_tsv = str(bids_dir) + "/code/CuBIDS/" + str(output_prefix) + "_validation.tsv"
273286

274-
parsed.to_csv(val_tsv, sep="\t", index=False)
287+
parsed.to_csv(val_tsv, sep="\t", index=False, na_rep="n/a")
275288

276289
# build validation data dictionary json sidecar
277290
val_dict = get_val_dictionary()
@@ -363,7 +376,7 @@ def validate(
363376
else:
364377
val_tsv = str(bids_dir) + "/code/CuBIDS/" + str(output_prefix) + "_validation.tsv"
365378

366-
parsed.to_csv(val_tsv, sep="\t", index=False)
379+
parsed.to_csv(val_tsv, sep="\t", index=False, na_rep="n/a")
367380

368381
# build validation data dictionary json sidecar
369382
val_dict = get_val_dictionary()
@@ -487,6 +500,7 @@ def apply(
487500
edited_summary_tsv,
488501
files_tsv,
489502
new_tsv_prefix,
503+
n_cpus=1,
490504
):
491505
"""Apply the tsv changes.
492506
@@ -525,6 +539,7 @@ def apply(
525539
str(files_tsv),
526540
str(new_tsv_prefix),
527541
raise_on_error=False,
542+
n_cpus=n_cpus,
528543
)
529544

530545

docs/example.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,14 @@ We can execute ``cubids apply`` with the following command:
413413
414414
$ cubids apply BIDS_Dataset_DataLad v0_edited_summary.tsv v0_files.tsv v1 --use-datalad
415415
416+
.. note::
417+
For large datasets, you can speed up DataLad operations by using the ``--n-cpus`` flag
418+
to enable parallel jobs for ``datalad save`` and ``datalad run`` operations. For example, to use 4 CPUs:
419+
420+
.. code-block:: console
421+
422+
$ cubids apply BIDS_Dataset_DataLad v0_edited_summary.tsv v0_files.tsv v1 --use-datalad --n-cpus 4
423+
416424
Checking our git log, we can see that our changes from apply have been saved.
417425

418426
.. image:: _static/screenshot_7.png

0 commit comments

Comments
 (0)