-
Notifications
You must be signed in to change notification settings - Fork 52
Description
Problem
Currently, spyglass's approach to opening files is to cache all open files, and
leave them open.
spyglass/src/spyglass/utils/nwb_helper_fn.py
Lines 15 to 39 in 67ca666
| # dict mapping file path to an open NWBHDF5IO object in read mode and its NWBFile | |
| __open_nwb_files = dict() | |
| # dict mapping NWB file path to config after it is loaded once | |
| __configs = dict() | |
| global invalid_electrode_index | |
| invalid_electrode_index = 99999999 | |
| def _open_nwb_file(nwb_file_path, source="local"): | |
| """Open an NWB file, add to cache, return contents. Does not close file.""" | |
| if source == "local": | |
| io = pynwb.NWBHDF5IO(path=nwb_file_path, mode="r", load_namespaces=True) | |
| nwbfile = io.read() | |
| elif source == "dandi": | |
| from ..common.common_dandi import DandiPath | |
| io, nwbfile = DandiPath().fetch_file_from_dandi( | |
| nwb_file_path=nwb_file_path | |
| ) | |
| else: | |
| raise ValueError(f"Invalid open_nwb source: {source}") | |
| __open_nwb_files[nwb_file_path] = (io, nwbfile) | |
| return nwbfile |
This runs into a handful of problems...
- OS limitations: unix ulimit cap on number of open files
- Resource limits: RAM caps, slowing down processes with files no longer needed
- Concurrency issues: User1 opens fil6e, completes operations without closing python, User2 cannot open same file
Existing operations allow users to close all files
spyglass/src/spyglass/utils/nwb_helper_fn.py
Lines 190 to 194 in 67ca666
| def close_nwb_files(): | |
| """Close all open NWB files.""" | |
| for io, _ in __open_nwb_files.values(): | |
| io.close() | |
| __open_nwb_files.clear() |
But selective closing is opaque in a private cache
Solutions
(1) Allow immediate close
PR #1193 allowed closing of a file immediately after opening
- Pro:
- Easy edit
- user-managed
- Con:
- Closes the most relevant file
- Files left open not readily closed
(2) Register open files to table
Add a registry of open files to spyglass, allowing users to see and close files
selectively.
- Pro:
- Users can see open files
- Users can close files selectively
- Users access can be tracked and limited
- Con:
- More complex implementation
- Slows performance slightly due to table operations
(3) Context manager
Implement a context manager to handle file opening and closing automatically.
- Pro:
- Automatic resource management
- Reduces risk of leaving files open
- Con:
- Requires users to adapt to context manager usage
- Automatic closing may not fit all cases that would benefit from caching
(4) Resource management thread
Implement a background thread to monitor resource usage and close files
when limits are approached, to automatically manage resources. Register
files for closing at exit.
- Pro:
- Automatic resource management
- Reduces risk of resource exhaustion
- Con:
- Unexpected background operations
- More complex implementation
Combined solution
Drafted implementation combining (2), (3), and (4)
Example code
import atexit
import datajoint as dj
import threading
from datajoint.utils import time
from functools import cached_property
from spyglass.utils.nwb_helper_fn import open_nwbfile, close_nwbfile
class OpenFileRegistry(dj.Manual):
definition = """
file_path: varchar(255) # Path to the open NWB file
---
user : varchar(32) # DataJoint user who opened the file
opened_at: timestamp # Timestamp when the file was opened
access_count=1: int # Number of active accesses to the file
"""
_monitoring_thread = None
_file_cache = {}
def access(self, file_path):
return AccessContextManager(self, file_path)
@cached_property
def user_dict(self):
return {'user': dj.config['database.user']}
@classmethod
def file_dict(cls, file_path):
return {'file_path': file_path}
def update_entry(self, entry=None, file_path=None):
if not entry and not file_path:
raise ValueError("Either entry or file_path must be provided.")
if file_path:
entry = (self & self.file_dict(file_path)).fetch1(as_dict=True)
entry.update(dict(access_count=entry['access_count'] + 1))
self.update1(entry)
def open_file(self, file_path):
if not self._monitoring_thread:
self._monitoring_thread = threading.Thread(target=monitor_memory_usage, daemon=True)
self._monitoring_thread.start()
if file_path in self._file_cache:
self.update_entry(file_path=file_path)
return self._file_cache[file_path]
file_dict = self.file_dict(file_path)
query = self & file_dict
if not query:
self.insert1(dict(self.user_dict, **file_dict, opened_at=time.now()))
nwbf = open_nwbfile(file_path)
elif query.fetch('user') == self.user_dict['user']:
entry = query.fetch1(as_dict=True)
self.update_entry(entry)
nwbf = open_nwbfile(file_path)
else:
raise RuntimeError(f"File already open by {query.fetch1('user')}")
self._file_cache[file_path] = nwbf
return nwbf
def close_file(self, file_path):
if query := self & self.file_dict(file_path):
query.delete(safemode=False)
close_nwbfile(file_path)
def close_all_files(self):
query = self & self.user_dict
for file_path in query.fetch('file_path'):
self.close_file(file_path)
query.delete(safemode=False)
if self._monitoring_thread:
self._monitoring_thread.close()
def close_unused_files(self, close_count=5):
"""Basic LRU strategy to close files with low access counts."""
query = self & self.user_dict.fetch(as_dict=True, order_by='opened_at ASC')
closed_count = 0
for entry in query:
if entry['access_count'] <= FILE_ACCESS_THRESHOLD:
self.close_file(entry['file_path'])
closed_count += 1
if closed_count >= close_count:
break
class AccessContextManager:
def __init__(self, file_path):
self.file_path = file_path
self.registry = OpenFileRegistry()
self.nwbfile = None
def __enter__(self):
self.nwbfile = self.registry.open_file(self.file_path)
return self.nwbfile
def __exit__(self, exc_type, exc_value, traceback):
self.registry.close_file(self.file_path)
def file(self):
return self.nwbfile
def monitor_memory_usage():
import psutil
process = psutil.Process()
mem_info = process.memory_info()
while True:
file_count, mem_avail = somefunction_to_get_file_count_and_memory()
if file_count > MAX_OPEN_FILES or mem_avail < MIN_AVAILABLE_MEMORY:
OpenFileRegistry().close_unused_files()
# EXAMPLE
with OpenFileRegistry().access('/path/to/nwbfile.nwb') as nwbfile:
# Perform operations on nwbfile
pass