-
Notifications
You must be signed in to change notification settings - Fork 108
iris: use tmpfs for task workdirs, replace du with disk_usage #3696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
9dc337d
ede2d1a
df11570
e0b6324
bbd6888
a925138
2850e03
2db0bdd
2c568c5
87fbe9e
1ef8d92
14a3b59
af32041
bf98e46
ef46478
21cce8c
ab30adb
4d08420
af45265
867ad72
447112b
b505e02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,9 @@ | |
| import os | ||
| import re | ||
| import shlex | ||
| import shutil | ||
| import subprocess | ||
| import sys | ||
| import threading | ||
| import time | ||
| import uuid | ||
|
|
@@ -42,13 +44,39 @@ | |
| ContainerStats, | ||
| ContainerStatus, | ||
| ImageInfo, | ||
| MountKind, | ||
| MountSpec, | ||
| ) | ||
| from iris.cluster.worker.worker_types import LogLine, TaskLogs | ||
| from iris.rpc import cluster_pb2 | ||
| from iris.time_utils import Timestamp | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| _TMPFS_DIR = Path("/dev/shm/iris") | ||
| _TMPFS_MIN_FREE_BYTES = 1 * 1024 * 1024 * 1024 # 1 GB | ||
|
|
||
|
|
||
| def get_fast_io_dir(cache_dir: Path) -> Path: | ||
| """Return a fast IO directory for ephemeral task data. | ||
|
|
||
| Prefers /dev/shm/iris (tmpfs) for memory-speed IOPS when available and has | ||
| sufficient free space. Falls back to *cache_dir* on persistent disk. | ||
| """ | ||
| try: | ||
| if _TMPFS_DIR.is_dir(): | ||
| stat = os.statvfs(_TMPFS_DIR) | ||
| free_bytes = stat.f_bavail * stat.f_frsize | ||
| if free_bytes >= _TMPFS_MIN_FREE_BYTES: | ||
| logger.info("Using tmpfs at %s for fast IO (%d MB free)", _TMPFS_DIR, free_bytes // (1024 * 1024)) | ||
| return _TMPFS_DIR | ||
| except OSError: | ||
| logger.warning("OSError checking tmpfs at %s, falling back to persistent disk", _TMPFS_DIR, exc_info=True) | ||
| else: | ||
| logger.warning("Fast IO (tmpfs) not available at %s, falling back to persistent disk", _TMPFS_DIR) | ||
| return cache_dir | ||
|
|
||
|
|
||
| # Substrings that indicate a docker/registry infrastructure problem rather than | ||
| # a user-code error. Checked case-insensitively against stderr from docker | ||
| # create/start/pull. | ||
|
|
@@ -72,6 +100,19 @@ def _is_docker_infra_error(stderr: str) -> bool: | |
| return any(p.lower() in stderr_lower for p in _INFRA_ERROR_PATTERNS) | ||
|
|
||
|
|
||
| DEFAULT_WORKDIR_DISK_BYTES = 10 * 1024 * 1024 * 1024 # 10 GB | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class ResolvedMount: | ||
| """A MountSpec resolved to concrete host and container paths for Docker.""" | ||
|
|
||
| host_path: str | ||
| container_path: str | ||
| mode: str # "rw" or "ro" | ||
| kind: MountKind | ||
|
|
||
|
|
||
| def _build_device_flags(config: ContainerConfig) -> list[str]: | ||
| """Build Docker device flags based on resource configuration. | ||
|
|
||
|
|
@@ -104,17 +145,17 @@ def _build_device_flags(config: ContainerConfig) -> list[str]: | |
| return flags | ||
|
|
||
|
|
||
| def _detect_mount_user(mounts: list[tuple[str, str, str]]) -> str | None: | ||
| def _detect_mount_user(mounts: list[ResolvedMount]) -> str | None: | ||
| """Detect user to run container as based on bind mount ownership. | ||
|
|
||
| When bind-mounting directories owned by non-root users, the container | ||
| must run as that user to have write access. Returns "uid:gid" for | ||
| --user flag, or None to run as root. | ||
| """ | ||
| for host_path, _container_path, mode in mounts: | ||
| if "w" not in mode: | ||
| for mount in mounts: | ||
| if "w" not in mount.mode: | ||
| continue | ||
| path = Path(host_path) | ||
| path = Path(mount.host_path) | ||
| if not path.exists(): | ||
| continue | ||
| stat = path.stat() | ||
|
|
@@ -234,6 +275,7 @@ class DockerContainerHandle: | |
|
|
||
| config: ContainerConfig | ||
| runtime: "DockerRuntime" | ||
| _resolved_mounts: list[ResolvedMount] = field(default_factory=list, repr=False) | ||
| _run_container_id: str | None = field(default=None, repr=False) | ||
|
|
||
| @property | ||
|
|
@@ -312,9 +354,9 @@ def _generate_setup_script(self) -> str: | |
|
|
||
| def _write_setup_script(self, script: str) -> None: | ||
| """Write the setup script to the workdir mount.""" | ||
| for host_path, container_path, _mode in self.config.mounts: | ||
| if container_path == "/app": | ||
| (Path(host_path) / "_setup_env.sh").write_text(script) | ||
| for rm in self._resolved_mounts: | ||
| if rm.container_path == "/app": | ||
| (Path(rm.host_path) / "_setup_env.sh").write_text(script) | ||
| return | ||
| raise RuntimeError("No /app mount found in config") | ||
|
|
||
|
|
@@ -356,9 +398,9 @@ def run(self) -> None: | |
|
|
||
| def _write_run_script(self, script: str) -> None: | ||
| """Write the run script to the workdir mount.""" | ||
| for host_path, container_path, _mode in self.config.mounts: | ||
| if container_path == "/app": | ||
| (Path(host_path) / "_run.sh").write_text(script) | ||
| for rm in self._resolved_mounts: | ||
| if rm.container_path == "/app": | ||
| (Path(rm.host_path) / "_run.sh").write_text(script) | ||
| return | ||
| raise RuntimeError("No /app mount found in config") | ||
|
|
||
|
|
@@ -383,6 +425,15 @@ def stats(self) -> ContainerStats: | |
| return ContainerStats(memory_mb=0, cpu_percent=0, process_count=0, available=False) | ||
| return self._docker_stats(self._run_container_id) | ||
|
|
||
| def disk_usage_mb(self) -> int: | ||
| """Return used space in MB on the filesystem containing the workdir.""" | ||
| for rm in self._resolved_mounts: | ||
| if rm.container_path == self.config.workdir: | ||
| path = Path(rm.host_path) | ||
| if path.exists(): | ||
| return int(shutil.disk_usage(path).used / (1024 * 1024)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rjpower this is an issue? |
||
| return 0 | ||
|
|
||
| def profile(self, duration_seconds: int, profile_type: "cluster_pb2.ProfileType") -> bytes: | ||
| """Profile the running process using py-spy (CPU), memray (memory), or thread dump.""" | ||
| container_id = self._run_container_id | ||
|
|
@@ -482,11 +533,15 @@ def _profile_memory( | |
| self._docker_rm_files(container_id, [trace_path, output_path]) | ||
|
|
||
| def cleanup(self) -> None: | ||
| """Remove the run container and clean up resources.""" | ||
| """Remove the run container and clean up resources (including tmpfs mounts).""" | ||
| if self._run_container_id: | ||
| self._docker_remove(self._run_container_id) | ||
| self.runtime.untrack_container(self._run_container_id) | ||
| self._run_container_id = None | ||
| # Release tmpfs backing for WORKDIR and TMPFS mounts | ||
| for rm in self._resolved_mounts: | ||
| if rm.kind in (MountKind.WORKDIR, MountKind.TMPFS): | ||
| self.runtime.release_tmpfs(Path(rm.host_path)) | ||
|
|
||
| # ------------------------------------------------------------------------- | ||
| # Docker CLI helpers | ||
|
|
@@ -514,7 +569,7 @@ def _docker_create( | |
| ] | ||
|
|
||
| # Run as the owner of bind-mounted directories | ||
| user_flag = _detect_mount_user(config.mounts) | ||
| user_flag = _detect_mount_user(self._resolved_mounts) | ||
| if user_flag: | ||
| cmd.extend(["--user", user_flag]) | ||
|
|
||
|
|
@@ -555,8 +610,8 @@ def _docker_create( | |
| cmd.extend(["-e", f"{k}={v}"]) | ||
|
|
||
| # Mounts | ||
| for host, container, mode in config.mounts: | ||
| cmd.extend(["-v", f"{host}:{container}:{mode}"]) | ||
| for rm in self._resolved_mounts: | ||
| cmd.extend(["-v", f"{rm.host_path}:{rm.container_path}:{rm.mode}"]) | ||
|
|
||
| cmd.append(config.image) | ||
| cmd.extend(command) | ||
|
|
@@ -712,9 +767,15 @@ class DockerRuntime: | |
| Tracks all created containers for cleanup on shutdown. | ||
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| def __init__(self, cache_dir: Path) -> None: | ||
| self._cache_dir = cache_dir | ||
| self._fast_io_dir = get_fast_io_dir(cache_dir) | ||
| self._handles: list[DockerContainerHandle] = [] | ||
| self._created_containers: set[str] = set() | ||
| self._tmpfs_mounts: set[Path] = set() | ||
| # Serializes tmpfs mount/unmount to avoid concurrent `mount` failures | ||
| # under high task parallelism (e.g. 200 tasks mounting under /dev/shm). | ||
| self._mount_lock = threading.Lock() | ||
| # Serializes `docker pull` per image tag so that concurrent task threads | ||
| # don't each trigger docker-credential-gcloud against the metadata server, | ||
| # which causes sporadic "no active account" errors under load. | ||
|
|
@@ -764,13 +825,36 @@ def ensure_image(self, image: str) -> None: | |
| logger.info("Image %s pulled successfully", image) | ||
| self._pulled_images.add(image) | ||
|
|
||
| def resolve_mounts(self, mounts: list[MountSpec], workdir_host_path: Path | None = None) -> list[ResolvedMount]: | ||
| """Convert semantic MountSpecs to ResolvedMount instances. | ||
|
|
||
| Creates host directories as needed. WORKDIR uses the explicit host path | ||
| (created by task_attempt) and mounts tmpfs on it for isolation. | ||
| CACHE and TMPFS get dirs under fast_io_dir. | ||
| """ | ||
| result: list[ResolvedMount] = [] | ||
| for mount in mounts: | ||
| mode = "ro" if mount.read_only else "rw" | ||
| if mount.kind == MountKind.WORKDIR: | ||
| if workdir_host_path is None: | ||
| raise RuntimeError("WORKDIR mount requires workdir_host_path") | ||
| size = mount.size_bytes if mount.size_bytes > 0 else DEFAULT_WORKDIR_DISK_BYTES | ||
| self._mount_tmpfs(workdir_host_path, size) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Mounting tmpfs here happens during Useful? React with 👍 / 👎. |
||
| result.append(ResolvedMount(str(workdir_host_path), mount.container_path, mode, mount.kind)) | ||
| elif mount.kind in (MountKind.TMPFS, MountKind.CACHE): | ||
| host_dir = self._fast_io_dir / mount.container_path.strip("/").replace("/", "-") | ||
| host_dir.mkdir(parents=True, exist_ok=True) | ||
| result.append(ResolvedMount(str(host_dir), mount.container_path, mode, mount.kind)) | ||
| return result | ||
|
|
||
| def create_container(self, config: ContainerConfig) -> DockerContainerHandle: | ||
| """Create a container handle from config. | ||
|
|
||
| The handle is not started - call handle.build() then handle.run() | ||
| to execute the container. | ||
| """ | ||
| handle = DockerContainerHandle(config=config, runtime=self) | ||
| resolved = self.resolve_mounts(config.mounts, workdir_host_path=config.workdir_host_path) | ||
| handle = DockerContainerHandle(config=config, runtime=self, _resolved_mounts=resolved) | ||
| self._handles.append(handle) | ||
| return handle | ||
|
|
||
|
|
@@ -787,6 +871,53 @@ def stage_bundle( | |
| bundle_store.extract_bundle_to(bundle_id, workdir) | ||
| bundle_store.write_workdir_files(workdir, workdir_files) | ||
|
|
||
| def _mount_tmpfs(self, workdir: Path, disk_bytes: int) -> None: | ||
| if sys.platform != "linux": | ||
| raise RuntimeError("Docker workdir disk limits require Linux tmpfs mounts") | ||
| workdir.mkdir(parents=True, exist_ok=True) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: |
||
| with self._mount_lock: | ||
| if os.path.ismount(workdir): | ||
| logger.info("Workdir %s is already a mountpoint; reusing", workdir) | ||
| return | ||
| result = subprocess.run( | ||
| ["mount", "-t", "tmpfs", "-o", f"size={disk_bytes},nodev,nosuid", "tmpfs", str(workdir)], | ||
| capture_output=True, | ||
| text=True, | ||
| check=False, | ||
| ) | ||
| if result.returncode != 0: | ||
| # Under high concurrency the mount utility can spuriously fail | ||
| # even for distinct paths; if the target is now mounted, accept it. | ||
| if os.path.ismount(workdir): | ||
| logger.warning( | ||
| "mount command failed but %s is now a mountpoint; treating as success (stderr: %s)", | ||
| workdir, | ||
| result.stderr.strip(), | ||
| ) | ||
| else: | ||
| raise RuntimeError(f"Failed to mount tmpfs workdir {workdir}: {result.stderr.strip()}") | ||
| self._tmpfs_mounts.add(workdir) | ||
| logger.info("Mounted tmpfs workdir %s with size=%d bytes", workdir, disk_bytes) | ||
|
|
||
| def release_tmpfs(self, workdir: Path) -> None: | ||
| """Unmount a tmpfs workdir if it was mounted by this runtime. | ||
|
|
||
| On umount failure the path stays in ``_tmpfs_mounts`` so a later | ||
| cleanup pass can retry, preventing leaked RAM-backed mounts. | ||
| """ | ||
| with self._mount_lock: | ||
| if workdir not in self._tmpfs_mounts: | ||
| return | ||
| if not os.path.ismount(workdir): | ||
| self._tmpfs_mounts.discard(workdir) | ||
| return | ||
| result = subprocess.run(["umount", str(workdir)], capture_output=True, text=True, check=False) | ||
| if result.returncode != 0: | ||
| logger.warning("Failed to unmount tmpfs workdir %s: %s", workdir, result.stderr.strip()) | ||
| else: | ||
| logger.info("Unmounted tmpfs workdir %s", workdir) | ||
| self._tmpfs_mounts.discard(workdir) | ||
|
|
||
| def track_container(self, container_id: str) -> None: | ||
| """Track a container ID for cleanup.""" | ||
| self._created_containers.add(container_id) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DockerRuntime now mounts tmpfs workdirs inside the worker process (
_mount_tmpfsinlib/iris/src/iris/cluster/runtime/docker.py), but the worker creates task containers via the host daemon through/var/run/docker.sock; with this plain cache bind (-v {{ cache_dir }}:{{ cache_dir }}) and no shared propagation, those inner mounts are not propagated to the host namespace. In the default bootstrap path, bundle staging and generated scripts can be written to the worker-only tmpfs while the task container sees the underlying host directory, leading to missing files under/appat runtime.Useful? React with 👍 / 👎.