-
Notifications
You must be signed in to change notification settings - Fork 108
iris: use tmpfs for task workdirs, replace du with disk_usage #3696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
9dc337d
ede2d1a
df11570
e0b6324
bbd6888
a925138
2850e03
2db0bdd
2c568c5
87fbe9e
1ef8d92
14a3b59
af32041
bf98e46
ef46478
21cce8c
ab30adb
4d08420
af45265
867ad72
447112b
b505e02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,9 @@ | |
| import os | ||
| import re | ||
| import shlex | ||
| import shutil | ||
| import subprocess | ||
| import sys | ||
| import threading | ||
| import time | ||
| import uuid | ||
|
|
@@ -42,6 +44,7 @@ | |
| ContainerStats, | ||
| ContainerStatus, | ||
| ImageInfo, | ||
| WorkdirSpec, | ||
| ) | ||
| from iris.cluster.worker.worker_types import LogLine, TaskLogs | ||
| from iris.rpc import cluster_pb2 | ||
|
|
@@ -383,6 +386,15 @@ def stats(self) -> ContainerStats: | |
| return ContainerStats(memory_mb=0, cpu_percent=0, process_count=0, available=False) | ||
| return self._docker_stats(self._run_container_id) | ||
|
|
||
| def disk_usage_mb(self) -> int: | ||
| """Return used space in MB on the filesystem containing the workdir.""" | ||
| for host_path, container_path, _mode in self.config.mounts: | ||
| if container_path == self.config.workdir: | ||
| path = Path(host_path) | ||
| if path.exists(): | ||
| return int(shutil.disk_usage(path).used / (1024 * 1024)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rjpower this is an issue? |
||
| return 0 | ||
|
|
||
| def profile(self, duration_seconds: int, profile_type: "cluster_pb2.ProfileType") -> bytes: | ||
| """Profile the running process using py-spy (CPU), memray (memory), or thread dump.""" | ||
| container_id = self._run_container_id | ||
|
|
@@ -482,11 +494,16 @@ def _profile_memory( | |
| self._docker_rm_files(container_id, [trace_path, output_path]) | ||
|
|
||
| def cleanup(self) -> None: | ||
| """Remove the run container and clean up resources.""" | ||
| """Remove the run container and clean up resources (including tmpfs mounts).""" | ||
| if self._run_container_id: | ||
| self._docker_remove(self._run_container_id) | ||
| self.runtime.untrack_container(self._run_container_id) | ||
| self._run_container_id = None | ||
| # Release any tmpfs backing storage for the workdir | ||
| for host_path, container_path, _mode in self.config.mounts: | ||
| if container_path == self.config.workdir: | ||
| self.runtime.release_tmpfs(Path(host_path)) | ||
| break | ||
|
|
||
| # ------------------------------------------------------------------------- | ||
| # Docker CLI helpers | ||
|
|
@@ -715,6 +732,7 @@ class DockerRuntime: | |
| def __init__(self) -> None: | ||
| self._handles: list[DockerContainerHandle] = [] | ||
| self._created_containers: set[str] = set() | ||
| self._tmpfs_mounts: set[Path] = set() | ||
| # Serializes `docker pull` per image tag so that concurrent task threads | ||
| # don't each trigger docker-credential-gcloud against the metadata server, | ||
| # which causes sporadic "no active account" errors under load. | ||
|
|
@@ -781,12 +799,47 @@ def stage_bundle( | |
| workdir: Path, | ||
| workdir_files: dict[str, bytes], | ||
| bundle_store: BundleStore, | ||
| workdir_spec: WorkdirSpec | None = None, | ||
| ) -> None: | ||
| """Stage bundle and workdir files on worker-local filesystem.""" | ||
| """Provision backing storage, then stage bundle and workdir files.""" | ||
| if workdir_spec and workdir_spec.disk_bytes > 0: | ||
| self._mount_tmpfs(workdir, workdir_spec.disk_bytes) | ||
| if bundle_id: | ||
| bundle_store.extract_bundle_to(bundle_id, workdir) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This mounts tmpfs before Useful? React with 👍 / 👎. |
||
| bundle_store.write_workdir_files(workdir, workdir_files) | ||
|
|
||
| def _mount_tmpfs(self, workdir: Path, disk_bytes: int) -> None: | ||
| if sys.platform != "linux": | ||
| raise RuntimeError("Docker workdir disk limits require Linux tmpfs mounts") | ||
| workdir.mkdir(parents=True, exist_ok=True) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: |
||
| if os.path.ismount(workdir): | ||
| logger.info("Workdir %s is already a mountpoint; reusing", workdir) | ||
| return | ||
| result = subprocess.run( | ||
| ["mount", "-t", "tmpfs", "-o", f"size={disk_bytes},nodev,nosuid", "tmpfs", str(workdir)], | ||
| capture_output=True, | ||
|
rjpower marked this conversation as resolved.
Outdated
|
||
| text=True, | ||
| check=False, | ||
| ) | ||
| if result.returncode != 0: | ||
| raise RuntimeError(f"Failed to mount tmpfs workdir {workdir}: {result.stderr.strip()}") | ||
| self._tmpfs_mounts.add(workdir) | ||
| logger.info("Mounted tmpfs workdir %s with size=%d bytes", workdir, disk_bytes) | ||
|
|
||
| def release_tmpfs(self, workdir: Path) -> None: | ||
| """Unmount a tmpfs workdir if it was mounted by this runtime.""" | ||
| if workdir not in self._tmpfs_mounts: | ||
| return | ||
| if not os.path.ismount(workdir): | ||
| self._tmpfs_mounts.discard(workdir) | ||
| return | ||
| result = subprocess.run(["umount", str(workdir)], capture_output=True, text=True, check=False) | ||
| if result.returncode != 0: | ||
| logger.warning("Failed to unmount tmpfs workdir %s: %s", workdir, result.stderr.strip()) | ||
| else: | ||
| logger.info("Unmounted tmpfs workdir %s", workdir) | ||
| self._tmpfs_mounts.discard(workdir) | ||
|
rjpower marked this conversation as resolved.
Outdated
|
||
|
|
||
| def track_container(self, container_id: str) -> None: | ||
| """Track a container ID for cleanup.""" | ||
| self._created_containers.add(container_id) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,14 +29,14 @@ | |
| ContainerPhase, | ||
| ContainerRuntime, | ||
| RuntimeLogReader, | ||
| WorkdirSpec, | ||
| ) | ||
| from iris.cluster.types import ( | ||
| JobName, | ||
| TaskAttempt as TaskAttemptIdentity, | ||
| is_task_finished, | ||
| ) | ||
| from iris.cluster.bundle import BundleStore | ||
| from iris.cluster.worker.env_probe import collect_workdir_size_mb | ||
| from iris.cluster.worker.port_allocator import PortAllocator | ||
| from iris.cluster.log_store import LogCursor, LogStore, task_log_key | ||
| from iris.logging import parse_log_level, str_to_log_level | ||
|
|
@@ -87,6 +87,7 @@ def _format_exit_error(exit_code: int | None, oom_killed: bool = False) -> str: | |
| # /dev/shm/iris into the worker container so this path is available on GCE VMs. | ||
| _TMPFS_DIR = Path("/dev/shm/iris") | ||
| _TMPFS_MIN_FREE_BYTES = 1 * 1024 * 1024 * 1024 # 1 GB | ||
| _DISK_CHECK_INTERVAL_SECONDS = 60.0 | ||
|
|
||
|
|
||
| def get_fast_io_dir(cache_dir: Path) -> Path: | ||
|
|
@@ -549,11 +550,14 @@ def _download_bundle(self) -> None: | |
| # to BundleStore.extract_bundle_to if long downloads become a problem.) | ||
|
|
||
| assert self.workdir is not None | ||
| disk_bytes = self.request.resources.disk_bytes if self.request.HasField("resources") else 0 | ||
| workdir_spec = WorkdirSpec(disk_bytes=disk_bytes, tmpfs=disk_bytes > 0) if disk_bytes > 0 else None | ||
| self._runtime.stage_bundle( | ||
| bundle_id=self.request.bundle_id, | ||
| workdir=self.workdir, | ||
| workdir_files=dict(self.request.entrypoint.workdir_files), | ||
| bundle_store=self._bundle_store, | ||
| workdir_spec=workdir_spec, | ||
| ) | ||
|
|
||
| logger.info( | ||
|
|
@@ -711,6 +715,7 @@ def _monitor_loop( | |
| log_reader: RuntimeLogReader, | ||
| deadline: Deadline | None, | ||
| ) -> None: | ||
| last_disk_check = 0.0 | ||
| while True: | ||
| if rule := chaos("worker.task_monitor"): | ||
| time.sleep(rule.delay_seconds) | ||
|
|
@@ -794,8 +799,10 @@ def _monitor_loop( | |
| if stats.memory_mb > self.peak_memory_mb: | ||
| self.peak_memory_mb = stats.memory_mb | ||
|
|
||
| if self.workdir: | ||
| self.disk_mb = collect_workdir_size_mb(self.workdir) | ||
| now = time.monotonic() | ||
| if now - last_disk_check >= _DISK_CHECK_INTERVAL_SECONDS: | ||
| self.disk_mb = handle.disk_usage_mb() | ||
| last_disk_check = now | ||
|
Comment on lines
+768
to
+770
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This throttles disk sampling to every 60 seconds, but Useful? React with 👍 / 👎. |
||
| except Exception: | ||
| logger.debug("Stats collection failed for task %s", self.task_id, exc_info=True) | ||
|
|
||
|
|
@@ -845,7 +852,7 @@ def _cleanup(self) -> None: | |
| except Exception as e: | ||
| logger.warning("Failed to release ports for task %s: %s", self.task_id, e) | ||
|
|
||
| # Remove working directory | ||
| # Remove working directory (handle.cleanup() already released backing storage) | ||
| if self.workdir and self.workdir.exists(): | ||
| try: | ||
| shutil.rmtree(self.workdir) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DockerRuntime now mounts tmpfs workdirs inside the worker process (
_mount_tmpfsinlib/iris/src/iris/cluster/runtime/docker.py), but the worker creates task containers via the host daemon through/var/run/docker.sock; with this plain cache bind (-v {{ cache_dir }}:{{ cache_dir }}) and no shared propagation, those inner mounts are not propagated to the host namespace. In the default bootstrap path, bundle staging and generated scripts can be written to the worker-only tmpfs while the task container sees the underlying host directory, leading to missing files under/appat runtime.Useful? React with 👍 / 👎.