-
Notifications
You must be signed in to change notification settings - Fork 98
[MRG] Parallel writing of the object #1451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 71 commits
Commits
Show all changes
83 commits
Select commit
Hold shift + click to select a range
bae18fa
handle parallel reading and writing
bruAristimunha e8670eb
updating the whats new file
bruAristimunha 05c5776
playing the parallel
bruAristimunha 3fb6f45
updating the whats new file
bruAristimunha 900f39d
pre-commit
bruAristimunha 4fd7176
changing the logic to not needed to increase the mne version
bruAristimunha ae8699a
updating the whats new
bruAristimunha b25480f
including two tests
bruAristimunha 5e1b33d
more unify
bruAristimunha d029e06
improve test
bruAristimunha bfc7e51
first iteration of writing in parallel
bruAristimunha 37062ab
updating the make parallel raw
bruAristimunha f14bfd9
today
bruAristimunha 75def8a
updating the writing logic
bruAristimunha 92ac60b
no longer joblib )=
bruAristimunha dc2f1a5
updating old names
bruAristimunha 5772118
read only for now
bruAristimunha 5f48fdb
Merge branch 'main' into parallel_writing
bruAristimunha 8b9b463
Merge branch 'main' into parallel_writing
sappelhoff 5ea0c3d
returning the write
bruAristimunha bf95f89
trying to not create multiple files with lock mechanism, should be in…
bruAristimunha d1734da
creating the test and fileio
bruAristimunha 14cd999
updating the _fileio.py
bruAristimunha daf225a
creating the touch file
bruAristimunha 2d5a7df
updating the fileio
bruAristimunha cd0060f
set with set
bruAristimunha 400c1ad
first iteration
bruAristimunha f8d868b
updating the tests
bruAristimunha 8089f2c
updating the _fileio.py
bruAristimunha b26af4d
Update doc/whats_new.rst
bruAristimunha fdcae55
Update mne_bids/_fileio.py
bruAristimunha 0967ba9
Update mne_bids/_fileio.py
bruAristimunha 48635b3
Update mne_bids/_fileio.py
bruAristimunha 6fdc487
Apply suggestions from code review
bruAristimunha 91c5194
updating the python script and trying again
bruAristimunha 56f8588
updating the lock logic
bruAristimunha 3ce7c9c
updating the write
bruAristimunha 1308ebe
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 2db5fc8
updating the _fileio
bruAristimunha 86bbc48
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 6461e14
pre-commit
bruAristimunha 7cb039a
Merge branch 'parallel_writing' of https://github.com/bruAristimunha/…
bruAristimunha 1c76cb6
changing the logic
bruAristimunha e554d0f
refactor: Unify _open_lock and _file_lock implementations
bruAristimunha 6fb992e
removing the duplicate code
bruAristimunha bae71e8
reducing the timeout
bruAristimunha 1930724
updating the test
bruAristimunha 7ea77e3
Merge branch 'main' into parallel_writing
bruAristimunha 6e15877
Merge branch 'main' into parallel_writing
bruAristimunha a5219ac
missing options
bruAristimunha 96eaab3
old comments
bruAristimunha 3bed03a
BUG: Need to pass args or kwargs if going to use
larsoner a550c4f
FIX: Add to full
larsoner 0940e8e
Merge branch 'main' into parallel_writing
larsoner 4ff4d55
FIX: Another
larsoner fb45128
Fix double-lock acquisition in _participants_tsv and _scans_tsv
bruAristimunha dc803df
Merge branch 'main' into parallel_writing
bruAristimunha 0b78efe
removing the locked function
bruAristimunha 67635e2
Merge branch 'parallel_writing' of https://github.com/bruAristimunha/…
bruAristimunha 0197b64
trying something
bruAristimunha d0f7af6
including extra dep
bruAristimunha cdd7920
updating the whats new
bruAristimunha 442b777
fixing the context
bruAristimunha 282fa63
changing the cache system
bruAristimunha 43586bd
more CI adjustment
bruAristimunha 7cb1566
cleaning the lock
bruAristimunha 44709bd
ugly solution....
bruAristimunha b07e92c
more mne style
bruAristimunha 1ac7639
test coverage
bruAristimunha 471e130
coverage and refactoring
bruAristimunha 13f45d9
removing one test
bruAristimunha 294f84a
Apply suggestion from @agramfort
bruAristimunha 822d0c9
Apply suggestion from @agramfort
bruAristimunha 1a36d1f
implementing suggestion part 1
bruAristimunha c7a08f4
implementing suggestion part 2
bruAristimunha 7fd5fa8
Merge branch 'main' into parallel_writing
bruAristimunha bc7f3ba
Merge branch 'main' into parallel_writing
sappelhoff 5aee828
increasing the delay and flatting the function
bruAristimunha b97c792
Merge branch 'parallel_writing' of https://github.com/bruAristimunha/…
bruAristimunha 0dd8666
updating comments of the revision
bruAristimunha 43e784e
Update mne_bids/tsv_handler.py
bruAristimunha 9db1516
reducing the try scope
bruAristimunha ebce415
Merge branch 'parallel_writing' of https://github.com/bruAristimunha/…
bruAristimunha File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,328 @@ | ||
| """File I/O helpers with file locking support.""" | ||
|
|
||
| # Authors: The MNE-BIDS developers | ||
| # SPDX-License-Identifier: BSD-3-Clause | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import contextlib | ||
| import inspect | ||
| import os | ||
| import threading | ||
| from contextlib import contextmanager | ||
| from pathlib import Path | ||
|
|
||
| from mne.utils import _soft_import, logger, warn | ||
|
|
||
| _LOCK_TIMEOUT_FALLBACK = 60.0 | ||
| _env_lock_timeout = os.getenv("MNE_BIDS_FILELOCK_TIMEOUT", "") | ||
| try: | ||
| DEFAULT_LOCK_TIMEOUT = ( | ||
| float(_env_lock_timeout) if _env_lock_timeout else _LOCK_TIMEOUT_FALLBACK | ||
| ) | ||
| if DEFAULT_LOCK_TIMEOUT <= 0: | ||
| raise ValueError | ||
| except ValueError: | ||
| DEFAULT_LOCK_TIMEOUT = _LOCK_TIMEOUT_FALLBACK | ||
|
|
||
| _ACTIVE_LOCKS: dict[str, int] = {} | ||
| _ACTIVE_LOCKS_GUARD = threading.RLock() | ||
|
|
||
|
|
||
| def _canonical_lock_path(path: str | os.PathLike[str]) -> Path: | ||
| """Return an absolute, normalised path without requiring it to exist.""" | ||
| return Path(path).expanduser().resolve(strict=False) | ||
|
|
||
|
|
||
| @contextmanager | ||
| def _get_lock_context(path, timeout=None): | ||
| """Get a file lock context for the given path. | ||
|
|
||
| Internal helper function that creates a FileLock if available, | ||
| or returns a nullcontext() as fallback. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| path : str | ||
| The path to acquire a lock for. | ||
| timeout : float | ||
| Timeout in seconds for acquiring the lock. If ``None``, the value of | ||
| ``DEFAULT_LOCK_TIMEOUT`` (default 60 seconds) is used. The timeout can | ||
| be overridden via the ``MNE_BIDS_FILELOCK_TIMEOUT`` environment | ||
| variable. | ||
|
|
||
| Yields | ||
| ------ | ||
| context : context manager | ||
| Either a FileLock or nullcontext. | ||
| """ | ||
| if timeout is None: | ||
| timeout = DEFAULT_LOCK_TIMEOUT | ||
|
|
||
| canonical_path = _canonical_lock_path(path) | ||
|
|
||
| filelock = _soft_import( | ||
| "filelock", purpose="parallel file I/O locking", strict=False | ||
| ) | ||
|
|
||
| lock_path = canonical_path.with_name(f"{canonical_path.name}.lock") | ||
| lock_context = contextlib.nullcontext() | ||
|
|
||
| stack = "unknown" | ||
| try: # this should always work but let's be safe | ||
| # [0] = here | ||
| # [1] = contextlib __enter__ | ||
| # [2] = _open_lock | ||
| # [3] = contextlib __enter__ | ||
| # [4] = caller of _open_lock | ||
| where = inspect.stack()[4] | ||
| stack = f"{where.filename}:{where.lineno} {where.function}" | ||
| del where | ||
| except Exception: | ||
| pass | ||
| logger.debug(f"Lock: acquiring {canonical_path} from {stack}") | ||
|
|
||
| if filelock: | ||
| try: | ||
| # Ensure parent directory exists | ||
| Path(lock_path).parent.mkdir(parents=True, exist_ok=True) | ||
| lock_context = filelock.FileLock( | ||
| str(lock_path), | ||
| timeout=timeout, | ||
| ) | ||
| except (OSError, TypeError): | ||
| # OSError: permission issues creating lock file | ||
| # TypeError: invalid timeout parameter | ||
| warn("Could not create lock. Proceeding without a lock.") | ||
| try: | ||
| yield lock_context | ||
| except Exception: | ||
| logger.debug(f"Lock: exception {canonical_path} from {stack}") | ||
| raise | ||
| finally: | ||
| logger.debug(f"Lock: released {canonical_path} from {stack}") | ||
|
|
||
|
|
||
| @contextmanager | ||
| def _open_lock(path, *args, lock_timeout=None, **kwargs): | ||
| """Context manager that acquires a file lock with optional file opening. | ||
|
|
||
| If the `filelock` package is available, a lock is acquired on a lock file | ||
| based on the given path (by appending '.lock'). Lock files are left behind | ||
| to avoid race conditions during concurrent operations. | ||
|
|
||
| The lock is re-entrant per process: nested calls for the same ``path`` will | ||
| reuse the existing lock instead of attempting to acquire it again. | ||
|
|
||
| If file opening arguments (*args, **kwargs) are provided, the file is opened | ||
| in the specified mode. Otherwise, just the lock is acquired. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| path : str | ||
| The path to acquire a lock for (and optionally open). | ||
| *args : tuple | ||
| Additional positional arguments forwarded to ``open``. | ||
| lock_timeout : float | None | ||
| Timeout in seconds for acquiring the lock. If ``None``, the default | ||
| timeout applies. | ||
| **kwargs : dict | ||
| Additional keyword arguments forwarded to ``open``. | ||
|
|
||
| Yields | ||
| ------ | ||
| fid : file object or None | ||
| File object if file opening args were provided, None otherwise. | ||
| """ | ||
| canonical_path = _canonical_lock_path(path) | ||
| lock_key = str(canonical_path) | ||
|
|
||
| with _ACTIVE_LOCKS_GUARD: | ||
| lock_depth = _ACTIVE_LOCKS.get(lock_key, 0) | ||
| _ACTIVE_LOCKS[lock_key] = lock_depth + 1 | ||
| is_reentrant = lock_depth > 0 | ||
|
|
||
| try: | ||
| if is_reentrant: | ||
| if args or kwargs: | ||
| with open(canonical_path, *args, **kwargs) as fid: | ||
| yield fid | ||
| else: | ||
| yield None | ||
| return | ||
|
|
||
| # Increment multiprocess refcount before acquiring lock | ||
| _increment_lock_refcount(canonical_path) | ||
|
|
||
| with _get_lock_context( | ||
| canonical_path, | ||
| timeout=lock_timeout, | ||
| ) as lock_context: | ||
| with lock_context: | ||
| if args or kwargs: | ||
| with open(canonical_path, *args, **kwargs) as fid: | ||
| yield fid | ||
| else: | ||
| yield None | ||
| finally: | ||
| with _ACTIVE_LOCKS_GUARD: | ||
| _ACTIVE_LOCKS[lock_key] -= 1 | ||
| if _ACTIVE_LOCKS[lock_key] == 0: | ||
| del _ACTIVE_LOCKS[lock_key] | ||
|
|
||
| # Clean up lock files safely using reference counting across processes. | ||
| # Only clean up when this was the outermost lock (not re-entrant) and | ||
| # when the lock depth was 0 (meaning we actually acquired the lock). | ||
| if not is_reentrant and lock_depth == 0: | ||
| _decrement_and_cleanup_lock_file(canonical_path) | ||
|
|
||
|
|
||
| def _increment_lock_refcount(file_path): | ||
| """Increment the multiprocess reference count for a lock file. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| file_path : Path | ||
| The original file path (not the lock file path). | ||
| """ | ||
| file_path = Path(file_path) | ||
| refcount_path = file_path.parent / f"{file_path.name}.lock.refcount" | ||
| refcount_lock_path = Path(f"{refcount_path}.lock") | ||
|
|
||
| filelock = _soft_import("filelock", purpose="lock refcounting", strict=False) | ||
| if filelock is None: | ||
| return | ||
|
|
||
| refcount_lock = filelock.FileLock(str(refcount_lock_path), timeout=5.0) | ||
| try: | ||
| with refcount_lock: | ||
| # Read current refcount or initialize to 0 | ||
| current_count = 0 | ||
| if refcount_path.exists(): | ||
| try: | ||
| current_count = int(refcount_path.read_text().strip()) | ||
| except (ValueError, OSError): | ||
| logger.debug(f"Could not read refcount from {refcount_path}") | ||
|
|
||
| # Increment and write back | ||
| current_count += 1 | ||
| try: | ||
| refcount_path.write_text(str(current_count)) | ||
| except OSError as exp: | ||
| logger.debug(f"Could not write refcount to {refcount_path}: {exp}") | ||
| except TimeoutError: | ||
| # Another process is updating refcount concurrently | ||
| logger.debug(f"Timeout acquiring refcount lock for {file_path}") | ||
| except Exception as exp: | ||
| logger.debug(f"Error incrementing refcount for {file_path}: {exp}") | ||
| finally: | ||
| # Clean up the refcount lock file | ||
| try: | ||
| if refcount_lock_path.exists(): | ||
| refcount_lock_path.unlink() | ||
| except OSError: | ||
| pass | ||
|
|
||
|
|
||
| def _decrement_and_cleanup_lock_file(file_path): | ||
| """Decrement refcount and remove lock file if no longer in use. | ||
|
|
||
| Maintains a reference count in a .refcount file to track how many processes | ||
| are currently using or waiting for the lock. Only deletes the lock file when | ||
| the reference count reaches zero. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| file_path : Path | ||
| The original file path (not the lock file path). | ||
| """ | ||
| from pathlib import Path | ||
|
|
||
| file_path = Path(file_path) | ||
| lock_path = file_path.parent / f"{file_path.name}.lock" | ||
| refcount_path = file_path.parent / f"{file_path.name}.lock.refcount" | ||
| refcount_lock_path = Path(f"{refcount_path}.lock") | ||
|
|
||
| # Early return if lock file doesn't exist | ||
| if not lock_path.exists(): | ||
| # Clean up orphaned refcount file if it exists | ||
| try: | ||
| if refcount_path.exists(): | ||
| refcount_path.unlink() | ||
| except OSError: | ||
| pass | ||
| return | ||
|
|
||
| filelock = _soft_import("filelock", purpose="lock refcounting", strict=False) | ||
| if filelock is None: | ||
| return | ||
|
|
||
| refcount_lock = filelock.FileLock(str(refcount_lock_path), timeout=5.0) | ||
| try: | ||
| with refcount_lock: | ||
| # Read current refcount or initialize to 0 | ||
| current_count = 0 | ||
| if refcount_path.exists(): | ||
| try: | ||
| current_count = int(refcount_path.read_text().strip()) | ||
| except (ValueError, OSError): | ||
| logger.debug(f"Could not read refcount from {refcount_path}") | ||
|
|
||
| # Decrement refcount, ensuring it doesn't go negative | ||
| current_count = max(0, current_count - 1) | ||
|
|
||
| if current_count == 0: | ||
| # No more processes using this lock, safe to delete | ||
| try: | ||
| lock_path.unlink() | ||
| except OSError as exp: | ||
| logger.debug(f"Could not remove lock file {lock_path}: {exp}") | ||
| try: | ||
| if refcount_path.exists(): | ||
| refcount_path.unlink() | ||
| except OSError as exp: | ||
| logger.debug(f"Could not remove refcount {refcount_path}: {exp}") | ||
| else: | ||
| # Write back decremented count | ||
| try: | ||
| refcount_path.write_text(str(current_count)) | ||
| except OSError as exp: | ||
| logger.debug(f"Could not write refcount to {refcount_path}: {exp}") | ||
| except TimeoutError: | ||
| # Another process is updating refcount, it will handle cleanup | ||
| logger.debug(f"Timeout acquiring refcount lock for {file_path}") | ||
| except Exception as exp: | ||
| logger.debug(f"Error decrementing refcount for {file_path}: {exp}") | ||
| finally: | ||
| # Clean up the refcount lock file | ||
| try: | ||
| if refcount_lock_path.exists(): | ||
| refcount_lock_path.unlink() | ||
| except OSError: | ||
| pass | ||
|
|
||
|
|
||
| def cleanup_lock_files(root_path): | ||
| """Remove lock files associated with a path or an entire tree. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| root_path : str | Path | ||
| Root directory or file path used to derive lock file locations. | ||
| """ | ||
| root_path = Path(root_path) | ||
|
|
||
| if root_path.is_dir(): | ||
| for lock_file in root_path.rglob("*.lock"): | ||
| try: | ||
| lock_file.unlink() | ||
| except OSError: | ||
| pass | ||
| return | ||
|
|
||
| lock_candidate = root_path.parent / f"{root_path.name}.lock" | ||
| if lock_candidate.exists(): | ||
| try: | ||
| lock_candidate.unlink() | ||
| except OSError: | ||
| pass |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.