Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 5 additions & 25 deletions exca/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class BaseInfra(pydantic.BaseModel):
# general permission for folders and files
# use os.chmod / path.chmod compatible numbers, or None to deactivate
# eg: 0o777 for all rights to all users
permissions: int | str | None = 0o777
permissions: int | None = 0o777
# {folder} will be replaced by the class folder
# {user} by user id and %j by job id
logs: Path | str = "{folder}/logs/{user}/%j"
Expand Down Expand Up @@ -162,7 +162,7 @@ def _exclude_from_cls_uid(self) -> tp.List[str]:

def model_post_init(self, log__: tp.Any) -> None:
super().model_post_init(log__)
self._set_permissions(None) # set compatibility for permissions as string
self._permission_setter = utils.PermissionSetter(self.permissions)

def config(self, uid: bool = True, exclude_defaults: bool = False) -> ConfDict:
"""Exports the task configuration as a ConfigDict
Expand Down Expand Up @@ -215,10 +215,7 @@ def _check_configs(self, write: bool = True) -> None:
for name in ("uid", "full-uid", "config"):
fp = xpfolder / f"{name}.yaml"
if fp.exists():
try:
self._set_permissions(fp)
except (OSError, FileNotFoundError):
pass
self._permission_setter.set(fp)

def _factory(self) -> str:
cls = self._obj.__class__
Expand Down Expand Up @@ -295,8 +292,8 @@ def uid_folder(self, create: bool = False) -> Path | None:
if not create:
return folder
folder.mkdir(exist_ok=True, parents=True)
self._set_permissions(self.folder)
self._set_permissions(folder)
self._permission_setter.set(self.folder)
self._permission_setter.set(folder)
return folder

def iter_cached(self) -> tp.Iterable[pydantic.BaseModel]:
Expand All @@ -312,23 +309,6 @@ def iter_cached(self) -> tp.Iterable[pydantic.BaseModel]:
cfg = ConfDict.from_yaml(fp)
yield cls(**cfg)

def _set_permissions(self, path: str | Path | None) -> None:
if isinstance(self.permissions, str):
if not self.permissions:
self.permissions = None
elif self.permissions == "a+rwx":
self.permissions = 0o777
else:
raise ValueError(f"No compatibility for permissions {self.permissions}")
msg = "infra.permissions set to %s by compatibility mode"
logger.warning(msg, self.permissions)
if path is not None and self.permissions is not None:
try:
Path(path).chmod(self.permissions)
except Exception as e:
msg = f"Failed to set permission to {self.permissions} on '{path}'\n({e})"
logger.warning(msg)

def clone_obj(self, *args: tp.Dict[str, tp.Any], **kwargs: tp.Any) -> tp.Any:
"""Create a new decorated object by applying a diff config to the underlying object"""
if args:
Expand Down
21 changes: 6 additions & 15 deletions exca/cachedict.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,14 @@ def __init__(
permissions: int | None = 0o777,
) -> None:
self.folder = None if folder is None else Path(folder)
self.permissions = permissions
self._permission_setter = utils.PermissionSetter(permissions)
self.cache_type = cache_type
self._keep_in_ram = keep_in_ram
if self.folder is None and not keep_in_ram:
raise ValueError("At least folder or keep_in_ram should be activated")
if self.folder is not None:
self.folder.mkdir(exist_ok=True)
if self.permissions is not None:
try:
Path(self.folder).chmod(self.permissions)
except Exception as e:
msg = f"Failed to set permission to {self.permissions} on {self.folder}\n({e})"
logger.warning(msg)
self._permission_setter.set(self.folder)
# file cache access and RAM cache
self._ram_data: dict[str, X] = {}
self._key_info: dict[str, DumpInfo] = {}
Expand Down Expand Up @@ -308,8 +303,8 @@ def open(self) -> tp.Iterator[None]:
t = time.time() # make sure the modified time is updated:
os.utime(cd.folder, times=(t, t))
fp2 = self._info_filepath
if cd.permissions is not None and fp2 is not None and fp2.exists():
fp2.chmod(cd.permissions)
if fp2 is not None and fp2.exists():
cd._permission_setter.set(fp2)
self._exit_stack = None
self._info_filepath = None
self._info_handle = None
Expand Down Expand Up @@ -369,12 +364,8 @@ def __setitem__(self, key: str, value: X) -> None:
cd._jsonl_readers.setdefault(fp.name, JsonlReader(fp))._last = last
# reading will reload to in-memory cache if need be
# (since dumping may have loaded the underlying data, let's not keep it)
if cd.permissions is not None:
for fp in files:
try:
fp.chmod(cd.permissions)
except Exception: # pylint: disable=broad-except
pass # avoid issues in case of overlapping processes
for fp in files:
cd._permission_setter.set(fp)
os.utime(cd.folder) # make sure the modified time is updated


Expand Down
17 changes: 15 additions & 2 deletions exca/chain/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,22 @@ class Cache(Step):
folder: Path | None = None
mode: Mode = "cached"
cache_type: str | None = None
permissions: int | None = 0o777

# tracks if computation was already run (to avoid recomputing twice with same instance)
_computed: bool = False
_permission_setter: utils.PermissionSetter | None = None

def model_post_init(self, log__: tp.Any) -> None:
self._permission_setter = utils.PermissionSetter(self.permissions)

def _chain_folder(self) -> Path:
if self.folder is None:
raise RuntimeError("No folder provided")
folder = self.folder / self._chain_hash()
folder.mkdir(exist_ok=True, parents=True) # TODO permissions
folder.mkdir(exist_ok=True, parents=True)
if self._permission_setter is not None:
self._permission_setter.set(folder)
return folder

def _chain_hash(self) -> str:
Expand Down Expand Up @@ -110,7 +117,9 @@ def _cache_dict(self) -> exca.cachedict.CacheDict[tp.Any]:
if self.folder is None:
return exca.cachedict.CacheDict(folder=None, keep_in_ram=True)
folder = self._chain_folder() / "cache"
return exca.cachedict.CacheDict(folder=folder, cache_type=self.cache_type)
return exca.cachedict.CacheDict(
folder=folder, cache_type=self.cache_type, permissions=self.permissions
)

def _aligned_step(self) -> list[Step]:
return []
Expand Down Expand Up @@ -265,8 +274,12 @@ def forward(self, *params: tp.Any) -> tp.Any:
if pkl is not None and not isinstance(job, backends.ResultJob):
logger.debug("Dumping job into: %s", pkl.parent)
pkl.parent.mkdir(exist_ok=True, parents=True)
if self._permission_setter is not None:
self._permission_setter.set(pkl.parent)
with pkl.open("wb") as f:
pickle.dump(job, f)
if self._permission_setter is not None:
self._permission_setter.set(pkl)
self._computed = True # to ignore mode force/retry from now on
out = job.result()
if not isinstance(out, NoValue): # output was not cached
Expand Down
4 changes: 0 additions & 4 deletions exca/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,6 @@ def cache_dict(self) -> CacheDict[tp.Any]:
raise RuntimeError(f"Infra was not applied: {self!r}")
cache_type = imethod.cache_type
cache_path = self.uid_folder(create=True)
if isinstance(self.permissions, str):
self._set_permissions(None)
if isinstance(self.permissions, str):
raise RuntimeError("infra.permissions should have been an integer")
self._cache_dict = CacheDict(
folder=cache_path,
keep_in_ram=self.keep_in_ram,
Expand Down
43 changes: 35 additions & 8 deletions exca/steps/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,15 @@ def logs_folder(self) -> str:
"""Returns template string for submitit (with %j placeholder)."""
return str(self.step_folder / "logs" / "%j")

def ensure_folders(self) -> None:
def ensure_folders(
self, permission_setter: utils.PermissionSetter | None = None
) -> None:
"""Create necessary directories."""
self.cache_folder.mkdir(parents=True, exist_ok=True)
self.job_folder.mkdir(parents=True, exist_ok=True)
if permission_setter is not None:
permission_setter.set(self.cache_folder)
permission_setter.set(self.job_folder)

def clear_cache(self) -> None:
"""Clear cache and job folder for this item."""
Expand Down Expand Up @@ -146,15 +151,22 @@ def __init__(
func: tp.Callable[..., tp.Any],
paths: StepPaths,
cache_type: str | None,
permission_setter: utils.PermissionSetter | None = None,
):
self.func = func
self.paths = paths
self.cache_type = cache_type
self.permission_setter = permission_setter

def __call__(self, *args: tp.Any) -> None:
self.paths.ensure_folders() # Create folders before writing
self.paths.ensure_folders(self.permission_setter) # Create folders before writing
permissions = (
None if self.permission_setter is None else self.permission_setter.permissions
)
cd: exca.cachedict.CacheDict[tp.Any] = exca.cachedict.CacheDict(
folder=self.paths.cache_folder, cache_type=self.cache_type
folder=self.paths.cache_folder,
cache_type=self.cache_type,
permissions=permissions,
)
try:
result = self.func(*args)
Expand All @@ -163,6 +175,8 @@ def __call__(self, *args: tp.Any) -> None:
if not self.paths.error_pkl.exists():
with self.paths.error_pkl.open("wb") as f:
pickle.dump(e, f)
if self.permission_setter is not None:
self.permission_setter.set(self.paths.error_pkl)
raise
if self.paths.item_uid not in cd: # Only write if not already cached
with cd.writer() as w:
Expand All @@ -180,17 +194,22 @@ class Backend(exca.helpers.DiscriminatedModel, discriminator_key="backend"):

@classmethod
def _exclude_from_cls_uid(cls) -> list[str]:
return ["."] # force ignored in uid
return ["."]

folder: Path | None = None
cache_type: str | None = None
mode: ModeType = "cached"
keep_in_ram: bool = False
permissions: int | None = 0o777

_step: "Step" | None = None
_ram_cache: tp.Any = pydantic.PrivateAttr(default_factory=NoValue)
_paths: StepPaths | None = pydantic.PrivateAttr(default=None)
_checked_configs: bool = pydantic.PrivateAttr(default=False)
_permission_setter: utils.PermissionSetter = pydantic.PrivateAttr()

def model_post_init(self, log__: tp.Any) -> None:
self._permission_setter = utils.PermissionSetter(self.permissions)

def __eq__(self, other: tp.Any) -> bool:
"""Compare backends by model fields only, excluding _step to avoid recursion."""
Expand Down Expand Up @@ -280,10 +299,12 @@ def _check_configs(self, write: bool = True) -> None:
step = self._configured_step()
folder = self.paths.step_folder
folder.mkdir(exist_ok=True, parents=True)
self._permission_setter.set(folder)

# Use the full aligned chain as the config (list of steps)
# This ensures consistent configs whether written by chain or step
utils.ConfigDump(model=step._aligned_chain()).check_and_write(folder, write=write)
dump = utils.ConfigDump(model=step._aligned_chain())
dump.check_and_write(folder, write=write)
self._checked_configs = True

# =========================================================================
Expand All @@ -293,7 +314,9 @@ def _check_configs(self, write: bool = True) -> None:
def _cache_dict(self) -> "exca.cachedict.CacheDict[tp.Any]":
"""Get CacheDict for this step."""
return exca.cachedict.CacheDict(
folder=self.paths.cache_folder, cache_type=self.cache_type
folder=self.paths.cache_folder,
cache_type=self.cache_type,
permissions=self.permissions,
)

def has_cache(self) -> bool:
Expand Down Expand Up @@ -410,7 +433,9 @@ def run(self, func: tp.Callable[..., tp.Any], *args: tp.Any) -> tp.Any:
logger.debug("Recovering job: %s", self.paths.job_pkl)

if job is None:
wrapper = _CachingCall(func, self.paths, self.cache_type)
wrapper = _CachingCall(
func, self.paths, self.cache_type, self._permission_setter
)
job = self._submit(wrapper, *args)

job.result() # Wait (result is cached, not returned)
Expand Down Expand Up @@ -447,7 +472,9 @@ class _SubmititBackend(Backend):
_EXECUTOR_CLS: tp.ClassVar[tp.Type[submitit.Executor]]

def _submit(self, wrapper: _CachingCall, *args: tp.Any) -> tp.Any:
wrapper.paths.ensure_folders() # Create folders before writing job.pkl
wrapper.paths.ensure_folders(
self._permission_setter
) # Create folders before writing job.pkl
executor = self._EXECUTOR_CLS(folder=wrapper.paths.logs_folder)

# Get only submitit-specific fields (exclude Backend fields)
Expand Down
4 changes: 2 additions & 2 deletions exca/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def job_array(
else:
executor.update_parameters(slurm_array_parallelism=max_workers)
executor.folder.mkdir(exist_ok=True, parents=True)
self._set_permissions(executor.folder)
self._permission_setter.set(executor.folder)
name = self.uid().split("/", maxsplit=1)[0]
# select jobs to run
statuses: dict[Status, list[TaskInfra]] = collections.defaultdict(list)
Expand Down Expand Up @@ -316,7 +316,7 @@ def _set_job(
with utils.temporary_save_path(job_path) as tmp:
with tmp.open("wb") as f:
pickle.dump(job, f)
self._set_permissions(job_path)
self._permission_setter.set(job_path)
# dump config
self._check_configs(write=True)
return job
Expand Down
6 changes: 3 additions & 3 deletions exca/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,14 @@ class Whenever(Whatever): # type: ignore


def test_permissions(tmp_path: Path) -> None:
infra = Whatever(infra1={"permissions": "a+rwx"}).infra1 # type: ignore
infra = Whatever(infra1={"permissions": 0o777}).infra1 # type: ignore
fp = tmp_path / "test" / "whatever" / "text.txt"
fp.parent.mkdir(parents=True)
fp.touch()
before = fp.stat().st_mode
infra._set_permissions(fp)
infra._permission_setter.set(fp)
after = fp.stat().st_mode
assert after > before
assert after >= before # permissions may already be maxed out


class D2(pydantic.BaseModel):
Expand Down
40 changes: 40 additions & 0 deletions exca/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,43 @@ def test_check_configs(tmp_path: Path) -> None:
utils.ConfigDump(model=models).check_and_write(folder2)
content = (folder2 / "uid.yaml").read_text("utf8")
assert content == "- x: 1\n- x: 2\n"


def test_permission_setter(tmp_path: Path) -> None:
# Test with permissions enabled
ps = utils.PermissionSetter(0o777)
folder = tmp_path / "test_folder"
folder.mkdir()
ps.set(folder)
assert oct(folder.stat().st_mode)[-3:] == "777"

# Test with file
fp = folder / "test.txt"
fp.write_text("hello")
ps.set(fp)
assert oct(fp.stat().st_mode)[-3:] == "777"

# Test with permissions disabled (None)
ps_none = utils.PermissionSetter(None)
folder2 = tmp_path / "test_folder2"
folder2.mkdir()
before_mode = folder2.stat().st_mode
ps_none.set(folder2) # should do nothing
assert folder2.stat().st_mode == before_mode


def test_permission_setter_recursive(tmp_path: Path) -> None:
ps = utils.PermissionSetter(0o777)

# Create nested structure
root = tmp_path / "root"
sub = root / "sub"
sub.mkdir(parents=True)
fp = sub / "file.txt"
fp.write_text("content")

# Set permissions recursively
ps.set(root, recursive=True)
assert oct(root.stat().st_mode)[-3:] == "777"
assert oct(sub.stat().st_mode)[-3:] == "777"
assert oct(fp.stat().st_mode)[-3:] == "777"
Loading
Loading