Skip to content
Merged
Show file tree
Hide file tree
Changes from 77 commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
bae18fa
handle parallel reading and writing
bruAristimunha Oct 11, 2025
e8670eb
updating the whats new file
bruAristimunha Oct 11, 2025
05c5776
playing the parallel
bruAristimunha Oct 11, 2025
3fb6f45
updating the whats new file
bruAristimunha Oct 11, 2025
900f39d
pre-commit
bruAristimunha Oct 11, 2025
4fd7176
changing the logic to not needed to increase the mne version
bruAristimunha Oct 11, 2025
ae8699a
updating the whats new
bruAristimunha Oct 11, 2025
b25480f
including two tests
bruAristimunha Oct 11, 2025
5e1b33d
more unify
bruAristimunha Oct 11, 2025
d029e06
improve test
bruAristimunha Oct 11, 2025
bfc7e51
first iteration of writing in parallel
bruAristimunha Oct 11, 2025
37062ab
updating the make parallel raw
bruAristimunha Oct 11, 2025
f14bfd9
today
bruAristimunha Oct 11, 2025
75def8a
updating the writing logic
bruAristimunha Oct 11, 2025
92ac60b
no longer joblib )=
bruAristimunha Oct 11, 2025
dc2f1a5
updating old names
bruAristimunha Oct 11, 2025
5772118
read only for now
bruAristimunha Oct 11, 2025
5f48fdb
Merge branch 'main' into parallel_writing
bruAristimunha Oct 13, 2025
8b9b463
Merge branch 'main' into parallel_writing
sappelhoff Oct 15, 2025
5ea0c3d
returning the write
bruAristimunha Oct 15, 2025
bf95f89
trying to not create multiple files with lock mechanism, should be in…
bruAristimunha Oct 15, 2025
d1734da
creating the test and fileio
bruAristimunha Oct 15, 2025
14cd999
updating the _fileio.py
bruAristimunha Oct 15, 2025
daf225a
creating the touch file
bruAristimunha Oct 15, 2025
2d5a7df
updating the fileio
bruAristimunha Oct 15, 2025
cd0060f
set with set
bruAristimunha Oct 15, 2025
400c1ad
first iteration
bruAristimunha Oct 15, 2025
f8d868b
updating the tests
bruAristimunha Oct 15, 2025
8089f2c
updating the _fileio.py
bruAristimunha Oct 15, 2025
b26af4d
Update doc/whats_new.rst
bruAristimunha Oct 15, 2025
fdcae55
Update mne_bids/_fileio.py
bruAristimunha Oct 15, 2025
0967ba9
Update mne_bids/_fileio.py
bruAristimunha Oct 15, 2025
48635b3
Update mne_bids/_fileio.py
bruAristimunha Oct 15, 2025
6fdc487
Apply suggestions from code review
bruAristimunha Oct 15, 2025
91c5194
updating the python script and trying again
bruAristimunha Oct 15, 2025
56f8588
updating the lock logic
bruAristimunha Oct 15, 2025
3ce7c9c
updating the write
bruAristimunha Oct 20, 2025
1308ebe
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 20, 2025
2db5fc8
updating the _fileio
bruAristimunha Oct 20, 2025
86bbc48
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 20, 2025
6461e14
pre-commit
bruAristimunha Oct 20, 2025
7cb039a
Merge branch 'parallel_writing' of https://github.com/bruAristimunha/…
bruAristimunha Oct 20, 2025
1c76cb6
changing the logic
bruAristimunha Oct 20, 2025
e554d0f
refactor: Unify _open_lock and _file_lock implementations
bruAristimunha Oct 20, 2025
6fb992e
removing the duplicate code
bruAristimunha Oct 20, 2025
bae71e8
reducing the timeout
bruAristimunha Oct 20, 2025
1930724
updating the test
bruAristimunha Oct 20, 2025
7ea77e3
Merge branch 'main' into parallel_writing
bruAristimunha Oct 20, 2025
6e15877
Merge branch 'main' into parallel_writing
bruAristimunha Oct 20, 2025
a5219ac
missing options
bruAristimunha Oct 20, 2025
96eaab3
old comments
bruAristimunha Oct 20, 2025
3bed03a
BUG: Need to pass args or kwargs if going to use
larsoner Oct 20, 2025
a550c4f
FIX: Add to full
larsoner Oct 20, 2025
0940e8e
Merge branch 'main' into parallel_writing
larsoner Oct 20, 2025
4ff4d55
FIX: Another
larsoner Oct 20, 2025
fb45128
Fix double-lock acquisition in _participants_tsv and _scans_tsv
bruAristimunha Oct 20, 2025
dc803df
Merge branch 'main' into parallel_writing
bruAristimunha Oct 25, 2025
0b78efe
removing the locked function
bruAristimunha Oct 25, 2025
67635e2
Merge branch 'parallel_writing' of https://github.com/bruAristimunha/…
bruAristimunha Oct 25, 2025
0197b64
trying something
bruAristimunha Oct 25, 2025
d0f7af6
including extra dep
bruAristimunha Oct 25, 2025
cdd7920
updating the whats new
bruAristimunha Oct 25, 2025
442b777
fixing the context
bruAristimunha Oct 25, 2025
282fa63
changing the cache system
bruAristimunha Oct 25, 2025
43586bd
more CI adjustment
bruAristimunha Oct 25, 2025
7cb1566
cleaning the lock
bruAristimunha Oct 25, 2025
44709bd
ugly solution....
bruAristimunha Oct 25, 2025
b07e92c
more mne style
bruAristimunha Oct 25, 2025
1ac7639
test coverage
bruAristimunha Oct 25, 2025
471e130
coverage and refactoring
bruAristimunha Oct 25, 2025
13f45d9
removing one test
bruAristimunha Oct 25, 2025
294f84a
Apply suggestion from @agramfort
bruAristimunha Oct 26, 2025
822d0c9
Apply suggestion from @agramfort
bruAristimunha Oct 26, 2025
1a36d1f
implementing suggestion part 1
bruAristimunha Oct 26, 2025
c7a08f4
implementing suggestion part 2
bruAristimunha Oct 26, 2025
7fd5fa8
Merge branch 'main' into parallel_writing
bruAristimunha Oct 27, 2025
bc7f3ba
Merge branch 'main' into parallel_writing
sappelhoff Oct 30, 2025
5aee828
increasing the delay and flatting the function
bruAristimunha Oct 31, 2025
b97c792
Merge branch 'parallel_writing' of https://github.com/bruAristimunha/…
bruAristimunha Oct 31, 2025
0dd8666
updating comments of the revision
bruAristimunha Oct 31, 2025
43e784e
Update mne_bids/tsv_handler.py
bruAristimunha Oct 31, 2025
9db1516
reducing the try scope
bruAristimunha Oct 31, 2025
ebce415
Merge branch 'parallel_writing' of https://github.com/bruAristimunha/…
bruAristimunha Oct 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ jobs:
uses: actions/cache@v4
with:
path: ${{ env.pythonLocation }}
key: test-2-${{ env.pythonLocation }}-${{ env.os }}-${{ hashFiles('pyproject.toml') }}
key: test-2-${{ env.pythonLocation }}-${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.mne-version }}-${{ hashFiles('pyproject.toml') }}

- name: Install Python dependencies using pip
# This would be nicer once this feature is implemented: https://github.com/pypa/pip/issues/11440
Expand Down Expand Up @@ -311,7 +311,7 @@ jobs:

- uses: actions/cache@v4
with:
key: testing_data-1-${{ env.TESTING_VERSION }}
key: testing_data-1-${{ env.TESTING_VERSION }}-${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.mne-version }}
path: ~/mne_data
name: 'Cache testing data'

Expand Down
1 change: 1 addition & 0 deletions doc/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Optional:
* ``pandas`` (>=1.3.2, for generating event statistics)
* ``edfio`` (>=0.4.10, for writing EDF data)
* ``defusedxml`` (for writing reading EGI MFF data and BrainVision montages)
* ``filelock`` (for atomic file writing, and parallel processing support)

We recommend installing ``mne-bids`` into an isolated Python environment,
for example created via ``conda``
Expand Down
2 changes: 2 additions & 0 deletions doc/whats_new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Detailed list of changes
- :func:`mne_bids.write_raw_bids()` has a new parameter `electrodes_tsv_task` which allows adding the `task` entity to the `electrodes.tsv` filepath, by `Alex Lopez Marquez`_ (:gh:`1424`)
- Extended the configuration to recognise `motion` as a valid BIDS datatype by `Julius Welzel`_ (:gh:`1430`)
- Better control of verbosity in several functions, by `Bruno Aristimunha`_ (:gh:`1449`)
- Added parallel reading and writing in all the `open` operations using `_open_lock` mechanism from mne, creating a small wrapper to manage the locks by `Bruno Aristimunha`_ (:gh:`1451`)
- :meth:`mne_bids.BIDSPath.match()` now short-circuits root directory scans when ``subject``, ``session``, or ``datatype`` entities are known, reducing lookup time on large datasets, by `Bruno Aristimunha`_ and `Maximilien Chaumon`_ (:gh:`1450`)

🧐 API and behavior changes
Expand All @@ -46,6 +47,7 @@ Detailed list of changes

🛠 Requirements
^^^^^^^^^^^^^^^
- Including ``filelock`` as a dependency to handle atomic file writing and parallel processing support by `Bruno Aristimunha`_ (:gh:`1451`)

- None yet

Expand Down
328 changes: 328 additions & 0 deletions mne_bids/_fileio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
"""File I/O helpers with file locking support."""

# Authors: The MNE-BIDS developers
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import contextlib
import inspect
import os
import threading
from contextlib import contextmanager
from pathlib import Path

from mne.utils import _soft_import, logger, warn

_LOCK_TIMEOUT_FALLBACK = 60.0
_env_lock_timeout = os.getenv("MNE_BIDS_FILELOCK_TIMEOUT", "")
try:
DEFAULT_LOCK_TIMEOUT = (
float(_env_lock_timeout) if _env_lock_timeout else _LOCK_TIMEOUT_FALLBACK
)
if DEFAULT_LOCK_TIMEOUT <= 0:
raise ValueError
except ValueError:
DEFAULT_LOCK_TIMEOUT = _LOCK_TIMEOUT_FALLBACK

_ACTIVE_LOCKS: dict[str, int] = {}
_ACTIVE_LOCKS_GUARD = threading.RLock()


def _canonical_lock_path(path: str | os.PathLike[str]) -> Path:
"""Return an absolute, normalised path without requiring it to exist."""
return Path(path).expanduser().resolve(strict=False)


@contextmanager
def _get_lock_context(path, timeout=None):
"""Get a file lock context for the given path.

Internal helper function that creates a FileLock if available,
or returns a nullcontext() as fallback.

Parameters
----------
path : str
The path to acquire a lock for.
timeout : float
Timeout in seconds for acquiring the lock. If ``None``, the value of
``DEFAULT_LOCK_TIMEOUT`` (default 60 seconds) is used. The timeout can
be overridden via the ``MNE_BIDS_FILELOCK_TIMEOUT`` environment
variable.

Yields
------
context : context manager
Either a FileLock or nullcontext.
"""
if timeout is None:
timeout = DEFAULT_LOCK_TIMEOUT

canonical_path = _canonical_lock_path(path)

filelock = _soft_import(
"filelock", purpose="parallel file I/O locking", strict=False
)

lock_path = canonical_path.with_name(f"{canonical_path.name}.lock")
lock_context = contextlib.nullcontext()

stack = "unknown"
try: # this should always work but let's be safe
# [0] = here
# [1] = contextlib __enter__
# [2] = _open_lock
# [3] = contextlib __enter__
# [4] = caller of _open_lock
where = inspect.stack()[4]
stack = f"{where.filename}:{where.lineno} {where.function}"
del where
except Exception:
pass
logger.debug(f"Lock: acquiring {canonical_path} from {stack}")

if filelock:
try:
# Ensure parent directory exists
Path(lock_path).parent.mkdir(parents=True, exist_ok=True)
lock_context = filelock.FileLock(
str(lock_path),
timeout=timeout,
)
except (OSError, TypeError):
# OSError: permission issues creating lock file
# TypeError: invalid timeout parameter
warn("Could not create lock. Proceeding without a lock.")
try:
yield lock_context
except Exception:
logger.debug(f"Lock: exception {canonical_path} from {stack}")
raise
finally:
logger.debug(f"Lock: released {canonical_path} from {stack}")


@contextmanager
def _open_lock(path, *args, lock_timeout=None, **kwargs):
"""Context manager that acquires a file lock with optional file opening.

If the `filelock` package is available, a lock is acquired on a lock file
based on the given path (by appending '.lock'). Lock files are left behind
to avoid race conditions during concurrent operations.

The lock is re-entrant per process: nested calls for the same ``path`` will
reuse the existing lock instead of attempting to acquire it again.

If file opening arguments (*args, **kwargs) are provided, the file is opened
in the specified mode. Otherwise, just the lock is acquired.

Parameters
----------
path : str
The path to acquire a lock for (and optionally open).
*args : tuple
Additional positional arguments forwarded to ``open``.
lock_timeout : float | None
Timeout in seconds for acquiring the lock. If ``None``, the default
timeout applies.
**kwargs : dict
Additional keyword arguments forwarded to ``open``.

Yields
------
fid : file object or None
File object if file opening args were provided, None otherwise.
"""
canonical_path = _canonical_lock_path(path)
lock_key = str(canonical_path)

with _ACTIVE_LOCKS_GUARD:
lock_depth = _ACTIVE_LOCKS.get(lock_key, 0)
_ACTIVE_LOCKS[lock_key] = lock_depth + 1
is_reentrant = lock_depth > 0

try:
if is_reentrant:
if args or kwargs:
with open(canonical_path, *args, **kwargs) as fid:
yield fid
else:
yield None
return

# Increment multiprocess refcount before acquiring lock
_increment_lock_refcount(canonical_path)

with _get_lock_context(
canonical_path,
timeout=lock_timeout,
) as lock_context:
with lock_context:
if args or kwargs:
with open(canonical_path, *args, **kwargs) as fid:
yield fid
else:
yield None
finally:
with _ACTIVE_LOCKS_GUARD:
_ACTIVE_LOCKS[lock_key] -= 1
if _ACTIVE_LOCKS[lock_key] == 0:
del _ACTIVE_LOCKS[lock_key]

# Clean up lock files safely using reference counting across processes.
# Only clean up when this was the outermost lock (not re-entrant) and
# when the lock depth was 0 (meaning we actually acquired the lock).
if not is_reentrant and lock_depth == 0:
_decrement_and_cleanup_lock_file(canonical_path)


def _increment_lock_refcount(file_path):
"""Increment the multiprocess reference count for a lock file.

Parameters
----------
file_path : Path
The original file path (not the lock file path).
"""
file_path = Path(file_path)
refcount_path = file_path.parent / f"{file_path.name}.lock.refcount"
refcount_lock_path = Path(f"{refcount_path}.lock")

filelock = _soft_import("filelock", purpose="lock refcounting", strict=False)
if filelock is None:
return

refcount_lock = filelock.FileLock(str(refcount_lock_path), timeout=5.0)
try:
with refcount_lock:
# Read current refcount or initialize to 0
current_count = 0
if refcount_path.exists():
try:
current_count = int(refcount_path.read_text().strip())
except (ValueError, OSError):
logger.debug(f"Could not read refcount from {refcount_path}")

# Increment and write back
current_count += 1
try:
refcount_path.write_text(str(current_count))
except OSError as exp:
logger.debug(f"Could not write refcount to {refcount_path}: {exp}")
except TimeoutError:
# Another process is updating refcount concurrently
logger.debug(f"Timeout acquiring refcount lock for {file_path}")
except Exception as exp:
logger.debug(f"Error incrementing refcount for {file_path}: {exp}")
finally:
# Clean up the refcount lock file
try:
if refcount_lock_path.exists():
refcount_lock_path.unlink()
except OSError:
pass


def _decrement_and_cleanup_lock_file(file_path):
"""Decrement refcount and remove lock file if no longer in use.

Maintains a reference count in a .refcount file to track how many processes
are currently using or waiting for the lock. Only deletes the lock file when
the reference count reaches zero.

Parameters
----------
file_path : Path
The original file path (not the lock file path).
"""
from pathlib import Path

file_path = Path(file_path)
lock_path = file_path.parent / f"{file_path.name}.lock"
refcount_path = file_path.parent / f"{file_path.name}.lock.refcount"
refcount_lock_path = Path(f"{refcount_path}.lock")

# Early return if lock file doesn't exist
if not lock_path.exists():
# Clean up orphaned refcount file if it exists
try:
if refcount_path.exists():
refcount_path.unlink()
except OSError:
pass
return

filelock = _soft_import("filelock", purpose="lock refcounting", strict=False)
if filelock is None:
return

refcount_lock = filelock.FileLock(str(refcount_lock_path), timeout=5.0)
try:
with refcount_lock:
# Read current refcount or initialize to 0
current_count = 0
if refcount_path.exists():
try:
current_count = int(refcount_path.read_text().strip())
except (ValueError, OSError):
logger.debug(f"Could not read refcount from {refcount_path}")

# Decrement refcount, ensuring it doesn't go negative
current_count = max(0, current_count - 1)

if current_count == 0:
# No more processes using this lock, safe to delete
try:
lock_path.unlink()
except OSError as exp:
logger.debug(f"Could not remove lock file {lock_path}: {exp}")
try:
if refcount_path.exists():
refcount_path.unlink()
except OSError as exp:
logger.debug(f"Could not remove refcount {refcount_path}: {exp}")
else:
# Write back decremented count
try:
refcount_path.write_text(str(current_count))
except OSError as exp:
logger.debug(f"Could not write refcount to {refcount_path}: {exp}")
except TimeoutError:
# Another process is updating refcount, it will handle cleanup
logger.debug(f"Timeout acquiring refcount lock for {file_path}")
except Exception as exp:
logger.debug(f"Error decrementing refcount for {file_path}: {exp}")
finally:
# Clean up the refcount lock file
try:
if refcount_lock_path.exists():
refcount_lock_path.unlink()
except OSError:
pass


def cleanup_lock_files(root_path):
"""Remove lock files associated with a path or an entire tree.

Parameters
----------
root_path : str | Path
Root directory or file path used to derive lock file locations.
"""
root_path = Path(root_path)

if root_path.is_dir():
for lock_file in root_path.rglob("*.lock"):
try:
lock_file.unlink()
except OSError:
pass
return

lock_candidate = root_path.parent / f"{root_path.name}.lock"
if lock_candidate.exists():
try:
lock_candidate.unlink()
except OSError:
pass
Loading
Loading