Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
bae18fa
handle parallel reading and writing
bruAristimunha Oct 11, 2025
e8670eb
updating the whats new file
bruAristimunha Oct 11, 2025
05c5776
playing the parallel
bruAristimunha Oct 11, 2025
3fb6f45
updating the whats new file
bruAristimunha Oct 11, 2025
900f39d
pre-commit
bruAristimunha Oct 11, 2025
4fd7176
changing the logic to not needed to increase the mne version
bruAristimunha Oct 11, 2025
ae8699a
updating the whats new
bruAristimunha Oct 11, 2025
b25480f
including two tests
bruAristimunha Oct 11, 2025
5e1b33d
more unify
bruAristimunha Oct 11, 2025
d029e06
improve test
bruAristimunha Oct 11, 2025
bfc7e51
first iteration of writing in parallel
bruAristimunha Oct 11, 2025
37062ab
updating the make parallel raw
bruAristimunha Oct 11, 2025
f14bfd9
today
bruAristimunha Oct 11, 2025
75def8a
updating the writing logic
bruAristimunha Oct 11, 2025
92ac60b
no longer joblib )=
bruAristimunha Oct 11, 2025
dc2f1a5
updating old names
bruAristimunha Oct 11, 2025
5772118
read only for now
bruAristimunha Oct 11, 2025
5f48fdb
Merge branch 'main' into parallel_writing
bruAristimunha Oct 13, 2025
8b9b463
Merge branch 'main' into parallel_writing
sappelhoff Oct 15, 2025
5ea0c3d
returning the write
bruAristimunha Oct 15, 2025
bf95f89
trying to not create multiple files with lock mechanism, should be in…
bruAristimunha Oct 15, 2025
d1734da
creating the test and fileio
bruAristimunha Oct 15, 2025
14cd999
updating the _fileio.py
bruAristimunha Oct 15, 2025
daf225a
creating the touch file
bruAristimunha Oct 15, 2025
2d5a7df
updating the fileio
bruAristimunha Oct 15, 2025
cd0060f
set with set
bruAristimunha Oct 15, 2025
400c1ad
first iteration
bruAristimunha Oct 15, 2025
f8d868b
updating the tests
bruAristimunha Oct 15, 2025
8089f2c
updating the _fileio.py
bruAristimunha Oct 15, 2025
b26af4d
Update doc/whats_new.rst
bruAristimunha Oct 15, 2025
fdcae55
Update mne_bids/_fileio.py
bruAristimunha Oct 15, 2025
0967ba9
Update mne_bids/_fileio.py
bruAristimunha Oct 15, 2025
48635b3
Update mne_bids/_fileio.py
bruAristimunha Oct 15, 2025
6fdc487
Apply suggestions from code review
bruAristimunha Oct 15, 2025
91c5194
updating the python script and trying again
bruAristimunha Oct 15, 2025
56f8588
updating the lock logic
bruAristimunha Oct 15, 2025
3ce7c9c
updating the write
bruAristimunha Oct 20, 2025
1308ebe
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 20, 2025
2db5fc8
updating the _fileio
bruAristimunha Oct 20, 2025
86bbc48
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 20, 2025
6461e14
pre-commit
bruAristimunha Oct 20, 2025
7cb039a
Merge branch 'parallel_writing' of https://github.com/bruAristimunha/…
bruAristimunha Oct 20, 2025
1c76cb6
changing the logic
bruAristimunha Oct 20, 2025
e554d0f
refactor: Unify _open_lock and _file_lock implementations
bruAristimunha Oct 20, 2025
6fb992e
removing the duplicate code
bruAristimunha Oct 20, 2025
bae71e8
reducing the timeout
bruAristimunha Oct 20, 2025
1930724
updating the test
bruAristimunha Oct 20, 2025
7ea77e3
Merge branch 'main' into parallel_writing
bruAristimunha Oct 20, 2025
6e15877
Merge branch 'main' into parallel_writing
bruAristimunha Oct 20, 2025
a5219ac
missing options
bruAristimunha Oct 20, 2025
96eaab3
old comments
bruAristimunha Oct 20, 2025
3bed03a
BUG: Need to pass args or kwargs if going to use
larsoner Oct 20, 2025
a550c4f
FIX: Add to full
larsoner Oct 20, 2025
0940e8e
Merge branch 'main' into parallel_writing
larsoner Oct 20, 2025
4ff4d55
FIX: Another
larsoner Oct 20, 2025
fb45128
Fix double-lock acquisition in _participants_tsv and _scans_tsv
bruAristimunha Oct 20, 2025
dc803df
Merge branch 'main' into parallel_writing
bruAristimunha Oct 25, 2025
0b78efe
removing the locked function
bruAristimunha Oct 25, 2025
67635e2
Merge branch 'parallel_writing' of https://github.com/bruAristimunha/…
bruAristimunha Oct 25, 2025
0197b64
trying something
bruAristimunha Oct 25, 2025
d0f7af6
including extra dep
bruAristimunha Oct 25, 2025
cdd7920
updating the whats new
bruAristimunha Oct 25, 2025
442b777
fixing the context
bruAristimunha Oct 25, 2025
282fa63
changing the cache system
bruAristimunha Oct 25, 2025
43586bd
more CI adjustment
bruAristimunha Oct 25, 2025
7cb1566
cleaning the lock
bruAristimunha Oct 25, 2025
44709bd
ugly solution....
bruAristimunha Oct 25, 2025
b07e92c
more mne style
bruAristimunha Oct 25, 2025
1ac7639
test coverage
bruAristimunha Oct 25, 2025
471e130
coverage and refactoring
bruAristimunha Oct 25, 2025
13f45d9
removing one test
bruAristimunha Oct 25, 2025
294f84a
Apply suggestion from @agramfort
bruAristimunha Oct 26, 2025
822d0c9
Apply suggestion from @agramfort
bruAristimunha Oct 26, 2025
1a36d1f
implementing suggestion part 1
bruAristimunha Oct 26, 2025
c7a08f4
implementing suggestion part 2
bruAristimunha Oct 26, 2025
7fd5fa8
Merge branch 'main' into parallel_writing
bruAristimunha Oct 27, 2025
bc7f3ba
Merge branch 'main' into parallel_writing
sappelhoff Oct 30, 2025
5aee828
increasing the delay and flatting the function
bruAristimunha Oct 31, 2025
b97c792
Merge branch 'parallel_writing' of https://github.com/bruAristimunha/…
bruAristimunha Oct 31, 2025
0dd8666
updating comments of the revision
bruAristimunha Oct 31, 2025
43e784e
Update mne_bids/tsv_handler.py
bruAristimunha Oct 31, 2025
9db1516
reducing the try scope
bruAristimunha Oct 31, 2025
ebce415
Merge branch 'parallel_writing' of https://github.com/bruAristimunha/…
bruAristimunha Oct 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/whats_new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Detailed list of changes
- :func:`mne_bids.write_raw_bids()` has a new parameter `electrodes_tsv_task` which allows adding the `task` entity to the `electrodes.tsv` filepath, by `Alex Lopez Marquez`_ (:gh:`1424`)
- Extended the configuration to recognise `motion` as a valid BIDS datatype by `Julius Welzel`_ (:gh:`1430`)
- Better control of verbosity in several functions, by `Bruno Aristimunha`_ (:gh:`1449`)
- Added parallel reading and writing in all the `open` operations using `_open_lock` mechanism from mne, creating small wrapper to manage the locks by `Bruno Aristimunha`_ (:gh:`1449`)

🧐 API and behavior changes
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
106 changes: 106 additions & 0 deletions mne_bids/_fileio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""File I/O helpers with automatic cleanup of lock files."""

# Authors: The MNE-BIDS developers
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import contextlib
import os
from contextlib import contextmanager
from contextvars import ContextVar
from pathlib import Path

from mne.utils import _soft_import, warn


def _normalize_lock_path(path):
path = Path(path)
try:
return path.resolve()
except OSError:
return path.absolute()


_LOCKED_PATHS: ContextVar[tuple[Path, ...]] = ContextVar(
"_LOCKED_PATHS", default=tuple()
)


def _get_lock_context(path):
"""Return a context manager that locks ``path`` if possible."""
filelock = _soft_import(
"filelock", purpose="parallel config set and get", strict=False
)

lock_context = contextlib.nullcontext()
lock_path = Path(f"{os.fspath(path)}.lock")
have_lock = False

if filelock:
try:
lock_context = filelock.FileLock(lock_path, timeout=5)
lock_context.acquire()
have_lock = True
except TimeoutError:
warn(
"Could not acquire lock file after 5 seconds, consider deleting it "
f"if you know the corresponding file is usable:\n{lock_path}"
)
lock_context = contextlib.nullcontext()
except OSError:
warn(
"Could not create lock file due to insufficient permissions. "
"Proceeding without a lock."
)
lock_context = contextlib.nullcontext()

return lock_context, lock_path, have_lock


def _path_is_locked(path) -> bool:
"""Return True if ``path`` is currently locked via :func:`_file_lock`."""
normalized = _normalize_lock_path(path)
return normalized in _LOCKED_PATHS.get()


@contextmanager
def _open_lock(path, *args, **kwargs):
"""Wrap :func:`mne.utils.config._open_lock` and remove stale ``.lock`` files."""
if _path_is_locked(path):
with open(path, *args, **kwargs) as fid:
yield fid
return

lock_context, lock_path, have_lock = _get_lock_context(path)
try:
with lock_context, open(path, *args, **kwargs) as fid:
yield fid
finally:
if have_lock and lock_path.exists():
try:
lock_path.unlink()
except OSError:
pass


@contextmanager
def _file_lock(path):
"""Acquire a lock on ``path`` without opening the file."""
lock_context, lock_path, have_lock = _get_lock_context(path)
normalized = _normalize_lock_path(path)
token = None
if not _path_is_locked(normalized):
current = _LOCKED_PATHS.get()
token = _LOCKED_PATHS.set(current + (normalized,))
try:
with lock_context:
yield
finally:
if token is not None:
_LOCKED_PATHS.reset(token)
if have_lock and lock_path.exists():
try:
lock_path.unlink()
except OSError:
pass
32 changes: 17 additions & 15 deletions mne_bids/copyfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from mne.utils import logger, verbose
from scipy.io import loadmat, savemat

from mne_bids._fileio import _open_lock
from mne_bids.path import BIDSPath, _mkdir_p, _parse_ext
from mne_bids.utils import _check_anonymize, _get_mrk_meas_date, warn

Expand Down Expand Up @@ -63,7 +64,7 @@ def _get_brainvision_encoding(vhdr_file):
in the header.

"""
with open(vhdr_file, "rb") as ef:
with _open_lock(vhdr_file, "rb") as ef:
enc = ef.read()
if enc.find(b"Codepage=") != -1:
enc = enc[enc.find(b"Codepage=") + 9 :]
Expand Down Expand Up @@ -101,7 +102,7 @@ def _get_brainvision_paths(vhdr_path):
enc = _get_brainvision_encoding(vhdr_path)

# ..and read it
with open(vhdr_path, encoding=enc) as f:
with _open_lock(vhdr_path, encoding=enc) as f:
lines = f.readlines()

# Try to find data file .eeg/.dat
Expand Down Expand Up @@ -274,13 +275,14 @@ def copyfile_kit(src, dest, subject_id, session_id, task, run, _init_kwargs):
def _replace_file(fname, pattern, replace):
"""Overwrite file, replacing end of lines matching pattern with replace."""
new_content = []
for line in open(fname):
match = re.match(pattern, line)
if match:
line = match.group()[: -len(replace)] + replace + "\n"
new_content.append(line)

with open(fname, "w", encoding="utf-8") as fout:
with _open_lock(fname) as fin:
for line in fin:
match = re.match(pattern, line)
if match:
line = match.group()[: -len(replace)] + replace + "\n"
new_content.append(line)

with _open_lock(fname, "w", encoding="utf-8") as fout:
fout.writelines(new_content)


Expand Down Expand Up @@ -395,15 +397,15 @@ def copyfile_brainvision(vhdr_src, vhdr_dest, anonymize=None, verbose=None):
f"MarkerFile={basename_src}.vmrk",
]

with open(vhdr_src, encoding=enc) as fin:
with open(vhdr_dest, "w", encoding=enc) as fout:
with _open_lock(vhdr_src, encoding=enc) as fin:
with _open_lock(vhdr_dest, "w", encoding=enc) as fout:
for line in fin.readlines():
if line.strip() in search_lines:
line = line.replace(basename_src, basename_dest)
fout.write(line)

with open(vmrk_file_path, encoding=enc) as fin:
with open(fname_dest + ".vmrk", "w", encoding=enc) as fout:
with _open_lock(vmrk_file_path, encoding=enc) as fin:
with _open_lock(fname_dest + ".vmrk", "w", encoding=enc) as fout:
for line in fin.readlines():
if line.strip() in search_lines:
line = line.replace(basename_src, basename_dest)
Expand Down Expand Up @@ -513,7 +515,7 @@ def copyfile_edf(src, dest, anonymize=None, verbose=None):
raise ValueError(f"Unsupported file type ({ext_src})")

# Get subject info, recording info, and recording date
with open(dest, "rb") as f:
with _open_lock(dest, "rb") as f:
f.seek(8) # id_info field starts 8 bytes in
id_info = f.read(80).decode("ascii").rstrip()
rec_info = f.read(80).decode("ascii").rstrip()
Expand All @@ -540,7 +542,7 @@ def copyfile_edf(src, dest, anonymize=None, verbose=None):
else:
id_info = ["0", "X", "X", "X"]
rec_info = ["Startdate", start_date, "X", "mne-bids_anonymize", "X"]
with open(dest, "r+b") as f:
with _open_lock(dest, "r+b") as f:
f.seek(8) # id_info field starts 8 bytes in
f.write(bytes(" ".join(id_info).ljust(80), "ascii"))
f.write(bytes(" ".join(rec_info).ljust(80), "ascii"))
Expand Down
12 changes: 9 additions & 3 deletions mne_bids/dig.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
import numpy as np
from mne.io.constants import FIFF
from mne.io.pick import _picks_to_idx
from mne.utils import _check_option, _validate_type, get_subjects_dir, logger
from mne.utils import (
_check_option,
_validate_type,
get_subjects_dir,
logger,
)

from mne_bids._fileio import _open_lock
from mne_bids.config import (
ALLOWED_SPACES,
BIDS_COORD_FRAME_DESCRIPTIONS,
Expand Down Expand Up @@ -78,7 +84,7 @@ def _handle_coordsystem_reading(coordsystem_fpath, datatype):
Handle reading the coordinate frame and coordinate unit
of each electrode.
"""
with open(coordsystem_fpath, encoding="utf-8-sig") as fin:
with _open_lock(coordsystem_fpath, encoding="utf-8-sig") as fin:
coordsystem_json = json.load(fin)

if datatype == "meg":
Expand Down Expand Up @@ -374,7 +380,7 @@ def _write_coordsystem_json(
# XXX: improve later when BIDS is updated
# check that there already exists a coordsystem.json
if Path(fname).exists() and not overwrite:
with open(fname, encoding="utf-8-sig") as fin:
with _open_lock(fname, encoding="utf-8-sig") as fin:
coordsystem_dict = json.load(fin)
if fid_json != coordsystem_dict:
raise RuntimeError(
Expand Down
5 changes: 3 additions & 2 deletions mne_bids/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import numpy as np
from mne.utils import _check_fname, _validate_type, logger, verbose

from mne_bids._fileio import _open_lock
from mne_bids.config import (
ALLOWED_DATATYPE_EXTENSIONS,
ALLOWED_DATATYPES,
Expand Down Expand Up @@ -1224,7 +1225,7 @@ def find_empty_room(self, use_sidecar_only=False, *, verbose=None):
.update(datatype=None, suffix="meg")
.find_matching_sidecar(extension=".json")
)
with open(sidecar_fname, encoding="utf-8") as f:
with _open_lock(sidecar_fname, encoding="utf-8") as f:
sidecar_json = json.load(f)

if "AssociatedEmptyRoom" in sidecar_json:
Expand Down Expand Up @@ -1453,7 +1454,7 @@ def _print_lines_with_entry(file, entry, folder, is_tsv, line_numbers, outfile):
prints to the console, else a string is printed to.
"""
entry_lines = list()
with open(file, encoding="utf-8-sig") as fid:
with _open_lock(file, encoding="utf-8-sig") as fid:
if is_tsv: # format tsv files nicely
header = _truncate_tsv_line(fid.readline())
if line_numbers:
Expand Down
3 changes: 2 additions & 1 deletion mne_bids/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from mne.transforms import apply_trans
from mne.utils import get_subjects_dir, logger

from mne_bids._fileio import _open_lock
from mne_bids.config import (
ALLOWED_DATATYPE_EXTENSIONS,
ANNOTATIONS_TO_KEEP,
Expand Down Expand Up @@ -442,7 +443,7 @@ def _handle_info_reading(sidecar_fname, raw):

Handle PowerLineFrequency of recording.
"""
with open(sidecar_fname, encoding="utf-8-sig") as fin:
with _open_lock(sidecar_fname, encoding="utf-8-sig") as fin:
sidecar_json = json.load(fin)

# read in the sidecar JSON's and raw object's line frequency
Expand Down
5 changes: 3 additions & 2 deletions mne_bids/report/_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
from mne.utils import logger, verbose

from mne_bids._fileio import _open_lock
from mne_bids.config import ALLOWED_DATATYPES, DOI
from mne_bids.path import (
BIDSPath,
Expand Down Expand Up @@ -155,7 +156,7 @@ def _summarize_dataset(root):
return dict()

# read file and 'REQUIRED' components of it
with open(dataset_descrip_fpath, encoding="utf-8-sig") as fin:
with _open_lock(dataset_descrip_fpath, encoding="utf-8-sig") as fin:
dataset_description = json.load(fin)

# create dictionary to pass into template string
Expand Down Expand Up @@ -329,7 +330,7 @@ def _summarize_sidecar_json(root, scans_fpaths):
sidecar_fname = _find_matching_sidecar(
bids_path=bids_path, suffix=datatype, extension=".json"
)
with open(sidecar_fname, encoding="utf-8-sig") as fin:
with _open_lock(sidecar_fname, encoding="utf-8-sig") as fin:
sidecar_json = json.load(fin)

# aggregate metadata from each scan
Expand Down
15 changes: 11 additions & 4 deletions mne_bids/sidecar_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,16 @@
from mne.channels import DigMontage, make_dig_montage
from mne.io import read_fiducials
from mne.io.constants import FIFF
from mne.utils import _check_on_missing, _on_missing, _validate_type, logger, verbose
from mne.utils import (
_check_on_missing,
_on_missing,
_validate_type,
logger,
verbose,
)

from mne_bids import BIDSPath
from mne_bids._fileio import _open_lock
from mne_bids.utils import _write_json


Expand Down Expand Up @@ -103,14 +110,14 @@ def update_sidecar_json(bids_path, entries, verbose=None):
if isinstance(entries, dict):
sidecar_tmp = entries
else:
with open(entries) as tmp_f:
with _open_lock(entries) as tmp_f:
sidecar_tmp = json.load(tmp_f, object_pairs_hook=OrderedDict)

logger.debug(sidecar_tmp)
logger.debug(f"Updating {fpath}...")

# load in sidecar filepath
with open(fpath) as tmp_f:
with _open_lock(fpath) as tmp_f:
sidecar_json = json.load(tmp_f, object_pairs_hook=OrderedDict)

# update sidecar JSON file with the fields passed in
Expand All @@ -132,7 +139,7 @@ def _update_sidecar(sidecar_fname, key, val):
val : str
The corresponding value to change to in the sidecar JSON file.
"""
with open(sidecar_fname, encoding="utf-8-sig") as fin:
with _open_lock(sidecar_fname, encoding="utf-8-sig") as fin:
sidecar_json = json.load(fin)
sidecar_json[key] = val
_write_json(sidecar_fname, sidecar_json, overwrite=True)
Expand Down
Loading
Loading