Skip to content

Commit 0197b64

Browse files
trying something
1 parent 67635e2 commit 0197b64

File tree

3 files changed

+101
-42
lines changed

3 files changed

+101
-42
lines changed

mne_bids/_fileio.py

Lines changed: 98 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,35 @@
77

88
import contextlib
99
import inspect
10+
import os
11+
import threading
1012
from contextlib import contextmanager
1113
from pathlib import Path
1214

1315
from mne.utils import _soft_import, logger, warn
1416

17+
_LOCK_TIMEOUT_FALLBACK = 60.0
18+
_env_lock_timeout = os.getenv("MNE_BIDS_FILELOCK_TIMEOUT", "")
19+
try:
20+
DEFAULT_LOCK_TIMEOUT = (
21+
float(_env_lock_timeout) if _env_lock_timeout else _LOCK_TIMEOUT_FALLBACK
22+
)
23+
if DEFAULT_LOCK_TIMEOUT <= 0:
24+
raise ValueError
25+
except ValueError:
26+
DEFAULT_LOCK_TIMEOUT = _LOCK_TIMEOUT_FALLBACK
27+
28+
_ACTIVE_LOCKS: dict[str, int] = {}
29+
_ACTIVE_LOCKS_GUARD = threading.RLock()
30+
31+
32+
def _canonical_lock_path(path: str | os.PathLike[str]) -> Path:
33+
"""Return an absolute, normalised path without requiring it to exist."""
34+
return Path(path).expanduser().resolve(strict=False)
35+
1536

1637
@contextmanager
17-
def _get_lock_context(path, timeout=5):
38+
def _get_lock_context(path, timeout=None):
1839
"""Get a file lock context for the given path.
1940
2041
Internal helper function that creates a FileLock if available,
@@ -25,19 +46,27 @@ def _get_lock_context(path, timeout=5):
2546
path : str
2647
The path to acquire a lock for.
2748
timeout : float
28-
Timeout in seconds for acquiring the lock (default 60).
49+
Timeout in seconds for acquiring the lock. If ``None``, the value of
50+
``DEFAULT_LOCK_TIMEOUT`` (default 60 seconds) is used. The timeout can
51+
be overridden via the ``MNE_BIDS_FILELOCK_TIMEOUT`` environment
52+
variable.
2953
3054
Yields
3155
------
3256
context : context manager
3357
Either a FileLock or nullcontext.
3458
"""
59+
if timeout is None:
60+
timeout = DEFAULT_LOCK_TIMEOUT
61+
62+
canonical_path = _canonical_lock_path(path)
63+
3564
filelock = _soft_import(
3665
"filelock", purpose="parallel file I/O locking", strict=False
3766
)
3867

39-
lock_path = f"{path}.lock"
40-
lock_context = contextlib.nullcontext(enter_result=path)
68+
lock_path = canonical_path.with_name(f"{canonical_path.name}.lock")
69+
lock_context = contextlib.nullcontext()
4170

4271
stack = "unknown"
4372
try: # this should always work but let's be safe
@@ -51,88 +80,118 @@ def _get_lock_context(path, timeout=5):
5180
del where
5281
except Exception:
5382
pass
54-
logger.debug(f"Lock: acquiring {path} from {stack}")
83+
logger.debug(f"Lock: acquiring {canonical_path} from {stack}")
5584

5685
if filelock:
5786
try:
5887
# Ensure parent directory exists
5988
Path(lock_path).parent.mkdir(parents=True, exist_ok=True)
60-
lock_context = filelock.FileLock(lock_path, timeout=timeout)
89+
lock_context = filelock.FileLock(
90+
str(lock_path),
91+
timeout=timeout,
92+
)
6193
except (OSError, TypeError):
6294
# OSError: permission issues creating lock file
6395
# TypeError: invalid timeout parameter
6496
warn("Could not create lock. Proceeding without a lock.")
6597
try:
6698
yield lock_context
6799
except Exception:
68-
logger.debug(f"Lock: exception {path} from {stack}")
100+
logger.debug(f"Lock: exception {canonical_path} from {stack}")
69101
raise
70102
finally:
71-
logger.debug(f"Lock: released {path} from {stack}")
103+
logger.debug(f"Lock: released {canonical_path} from {stack}")
72104

73105

74106
@contextmanager
75-
def _open_lock(path, *args, **kwargs):
107+
def _open_lock(path, *args, lock_timeout=None, **kwargs):
76108
"""Context manager that acquires a file lock with optional file opening.
77109
78110
If the `filelock` package is available, a lock is acquired on a lock file
79111
based on the given path (by appending '.lock'). Lock files are left behind
80112
to avoid race conditions during concurrent operations.
81113
114+
The lock is re-entrant per process: nested calls for the same ``path`` will
115+
reuse the existing lock instead of attempting to acquire it again.
116+
82117
If file opening arguments (*args, **kwargs) are provided, the file is opened
83118
in the specified mode. Otherwise, just the lock is acquired.
84119
85120
Parameters
86121
----------
87122
path : str
88123
The path to acquire a lock for (and optionally open).
89-
*args, **kwargs : optional
90-
Additional arguments and keyword arguments to be passed to the
91-
`open` function. If provided, the file will be opened. If empty,
92-
only the lock will be acquired.
124+
*args : tuple
125+
Additional positional arguments forwarded to ``open``.
126+
lock_timeout : float | None
127+
Timeout in seconds for acquiring the lock. If ``None``, the default
128+
timeout applies.
129+
**kwargs : dict
130+
Additional keyword arguments forwarded to ``open``.
93131
94132
Yields
95133
------
96134
fid : file object or None
97135
File object if file opening args were provided, None otherwise.
98-
99136
"""
100-
with _get_lock_context(path, timeout=5) as lock_context:
101-
try:
102-
with lock_context:
103-
if args or kwargs:
104-
# File opening arguments provided - open the file
105-
with open(path, *args, **kwargs) as fid:
106-
yield fid
107-
else:
108-
# No file opening arguments - just yield None
109-
yield None
110-
finally:
111-
# Lock files are left behind to avoid race conditions with concurrent
112-
# processes. They should be cleaned up explicitly after all parallel
113-
# operations complete via cleanup_lock_files().
114-
cleanup_lock_files(path)
137+
canonical_path = _canonical_lock_path(path)
138+
lock_key = str(canonical_path)
115139

140+
with _ACTIVE_LOCKS_GUARD:
141+
lock_depth = _ACTIVE_LOCKS.get(lock_key, 0)
142+
_ACTIVE_LOCKS[lock_key] = lock_depth + 1
143+
is_reentrant = lock_depth > 0
144+
145+
try:
146+
if is_reentrant:
147+
if args or kwargs:
148+
with open(canonical_path, *args, **kwargs) as fid:
149+
yield fid
150+
else:
151+
yield None
152+
return
153+
154+
with _get_lock_context(
155+
canonical_path,
156+
timeout=lock_timeout,
157+
) as lock_context:
158+
try:
159+
with lock_context:
160+
if args or kwargs:
161+
with open(canonical_path, *args, **kwargs) as fid:
162+
yield fid
163+
else:
164+
yield None
165+
finally:
166+
cleanup_lock_files(canonical_path)
167+
finally:
168+
with _ACTIVE_LOCKS_GUARD:
169+
_ACTIVE_LOCKS[lock_key] -= 1
170+
if _ACTIVE_LOCKS[lock_key] == 0:
171+
del _ACTIVE_LOCKS[lock_key]
116172

117-
def cleanup_lock_files(root_path):
118-
"""Remove all .lock files in a directory tree.
119173

120-
This function should be called after parallel operations complete to clean up
121-
lock files that may have been left behind.
174+
def cleanup_lock_files(root_path):
175+
"""Remove lock files associated with a path or an entire tree.
122176
123177
Parameters
124178
----------
125179
root_path : str | Path
126-
Root directory to search for .lock files.
180+
Root directory or file path used to derive lock file locations.
127181
"""
128182
root_path = Path(root_path)
129-
if not root_path.exists():
183+
184+
if root_path.is_dir():
185+
for lock_file in root_path.rglob("*.lock"):
186+
try:
187+
lock_file.unlink()
188+
except OSError:
189+
pass
130190
return
131191

132-
# Find and remove all .lock files
133-
for lock_file in root_path.rglob("*.lock"):
192+
lock_candidate = root_path.parent / f"{root_path.name}.lock"
193+
if lock_candidate.exists():
134194
try:
135-
lock_file.unlink()
195+
lock_candidate.unlink()
136196
except OSError:
137-
# If we can't remove it, skip it (might be in use by another process)
138197
pass

mne_bids/tests/test_write.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def _make_parallel_raw(subject, *, seed=None):
106106
rng = np.random.default_rng(rng_seed)
107107
info = mne.create_info(["MEG0113"], 100, ch_types="mag")
108108
data = rng.standard_normal((1, 100)) * 1e-12
109-
raw = mne.io.RawArray(data, info)
109+
raw = mne.io.RawArray(data, info, verbose=False)
110110
raw.set_meas_date(datetime(2020, 1, 1, tzinfo=timezone.utc))
111111
raw.info["line_freq"] = 60
112112
raw.info["subject_info"] = {

mne_bids/write.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ def _participants_tsv(raw, subject_id, fname, overwrite=False):
637637
if existing_participants:
638638
data = _combine_rows(orig_data, data, "participant_id")
639639

640-
_write_tsv(fname, data)
640+
_write_tsv(fname, data, overwrite=True)
641641

642642

643643
def _participants_json(fname, overwrite=False):
@@ -819,7 +819,7 @@ def _scans_tsv(raw, raw_fname, fname, keep_source, overwrite=False):
819819
# otherwise add the new data
820820
data = _combine_rows(orig_data, data, "filename")
821821

822-
_write_tsv(fname, data)
822+
_write_tsv(fname, data, overwrite=True)
823823

824824

825825
def _load_image(image, name="image"):

0 commit comments

Comments
 (0)