diff --git a/lib/iris/examples/smoke.yaml b/lib/iris/examples/smoke.yaml index a9e5c85383..92a338ce89 100644 --- a/lib/iris/examples/smoke.yaml +++ b/lib/iris/examples/smoke.yaml @@ -2,7 +2,7 @@ # Usage: uv run pytest lib/iris/tests/e2e/test_smoke.py -m e2e --iris-config examples/smoke.yaml --iris-mode full -o "addopts=" platform: - label_prefix: smoke + label_prefix: smoke-ci gcp: project_id: hai-gcp-models diff --git a/lib/iris/src/iris/cluster/config.py b/lib/iris/src/iris/cluster/config.py index 4bf9f02a9e..15eb8d3c9b 100644 --- a/lib/iris/src/iris/cluster/config.py +++ b/lib/iris/src/iris/cluster/config.py @@ -48,7 +48,7 @@ ), worker=config_pb2.WorkerConfig( port=10001, - cache_dir="/var/cache/iris", + cache_dir="/dev/shm/iris", host="0.0.0.0", port_range="30000-40000", ), diff --git a/lib/iris/src/iris/cluster/controller/vm_lifecycle.py b/lib/iris/src/iris/cluster/controller/vm_lifecycle.py index 167a2432ef..b1231821af 100644 --- a/lib/iris/src/iris/cluster/controller/vm_lifecycle.py +++ b/lib/iris/src/iris/cluster/controller/vm_lifecycle.py @@ -398,7 +398,12 @@ def restart_controller( def stop_controller(platform: Platform, config: config_pb2.IrisClusterConfig) -> None: - """Find and terminate the controller VM.""" + """Find and terminate the controller VM. + + GCE instance deletion is synchronous (no --async), so the VM is fully gone + when this returns. This prevents the dying controller from writing stale + checkpoints after remote state is cleared. + """ label_prefix = config.platform.label_prefix or "iris" vm = _discover_controller_vm(platform, label_prefix) if vm: diff --git a/lib/iris/src/iris/cluster/platform/bootstrap.py b/lib/iris/src/iris/cluster/platform/bootstrap.py index 7ddf9602cf..d171af1b4e 100644 --- a/lib/iris/src/iris/cluster/platform/bootstrap.py +++ b/lib/iris/src/iris/cluster/platform/bootstrap.py @@ -137,12 +137,6 @@ def replace_var(match: re.Match) -> str: # Create cache directory sudo mkdir -p {{ cache_dir }} -# Create tmpfs working directory for fast IO (uv sync, .venv creation). -# GCE persistent disks have very low IOPS on small volumes (~68 read, ~135 -# write for pd-standard), making dependency installation extremely slow. -# /dev/shm is tmpfs backed by RAM, providing memory-speed IOPS. -sudo mkdir -p /dev/shm/iris - echo "[iris-init] Phase: docker_pull" echo "[iris-init] Pulling image: {{ docker_image }}" @@ -184,7 +178,6 @@ def replace_var(match: re.Match) -> str: --network=host \\ --ulimit core=0:0 \\ -v {{ cache_dir }}:{{ cache_dir }} \\ - -v /dev/shm/iris:/dev/shm/iris \\ -v /var/run/docker.sock:/var/run/docker.sock \\ -v /etc/iris/worker_config.json:/etc/iris/worker_config.json:ro \\ {{ docker_image }} \\ diff --git a/lib/iris/src/iris/cluster/platform/gcp.py b/lib/iris/src/iris/cluster/platform/gcp.py index 4178990ffd..8e5ea3b935 100644 --- a/lib/iris/src/iris/cluster/platform/gcp.py +++ b/lib/iris/src/iris/cluster/platform/gcp.py @@ -373,7 +373,6 @@ def reboot(self) -> None: f"--project={self._project_id}", f"--zone={self._zone}", "--quiet", - "--async", ] logger.info("Rebooting GCE instance: %s", self._gce_vm_name) logger.info("gcloud command: %s", cmd) @@ -389,10 +388,8 @@ def terminate(self) -> None: f"--project={self._project_id}", f"--zone={self._zone}", "--quiet", - "--async", ] - logger.info("Deleting GCE instance (async): %s", self._gce_vm_name) - logger.info("gcloud command: %s", cmd) + logger.info("Deleting GCE instance: %s", self._gce_vm_name) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: error = result.stderr.strip() @@ -725,9 +722,8 @@ def terminate(self) -> None: f"--project={self._project_id}", f"--zone={self._zone}", "--quiet", - "--async", ] - logger.info("Terminating VM slice (async): %s (vm=%s)", self._slice_id, self._vm_name) + logger.info("Terminating VM slice: %s (vm=%s)", self._slice_id, self._vm_name) logger.info("gcloud command: %s", cmd) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: @@ -814,7 +810,6 @@ def _best_effort_delete_vm(self, vm_name: str, zone: str) -> None: """Try to delete a GCE VM that may have been partially created. Silently ignores "not found" errors (resource was never created). - Uses --async so the caller is not blocked waiting for deletion. """ cmd = [ "gcloud", @@ -825,9 +820,8 @@ def _best_effort_delete_vm(self, vm_name: str, zone: str) -> None: f"--zone={zone}", f"--project={self._project_id}", "--quiet", - "--async", ] - logger.info("Best-effort async cleanup of VM %s in %s", vm_name, zone) + logger.info("Best-effort cleanup of VM %s in %s", vm_name, zone) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: error = result.stderr.strip() diff --git a/lib/iris/src/iris/cluster/platform/local.py b/lib/iris/src/iris/cluster/platform/local.py index 242b712878..527b5d2e00 100644 --- a/lib/iris/src/iris/cluster/platform/local.py +++ b/lib/iris/src/iris/cluster/platform/local.py @@ -410,7 +410,7 @@ def _create_slice_with_workers( db_path=self._cache_path / f"bundles-{worker_id}.sqlite3", controller_address=self._controller_address, ) - container_runtime = ProcessRuntime() + container_runtime = ProcessRuntime(cache_dir=self._cache_path / worker_id) worker_port = find_free_port() # Collect extra worker attributes from scale group config diff --git a/lib/iris/src/iris/cluster/runtime/docker.py b/lib/iris/src/iris/cluster/runtime/docker.py index 761573784a..39710ae9e6 100644 --- a/lib/iris/src/iris/cluster/runtime/docker.py +++ b/lib/iris/src/iris/cluster/runtime/docker.py @@ -16,6 +16,7 @@ import os import re import shlex +import shutil import subprocess import threading import time @@ -42,6 +43,8 @@ ContainerStats, ContainerStatus, ImageInfo, + MountKind, + MountSpec, ) from iris.cluster.worker.worker_types import LogLine, TaskLogs from iris.rpc import cluster_pb2 @@ -72,6 +75,16 @@ def _is_docker_infra_error(stderr: str) -> bool: return any(p.lower() in stderr_lower for p in _INFRA_ERROR_PATTERNS) +@dataclass(frozen=True) +class ResolvedMount: + """A MountSpec resolved to concrete host and container paths for Docker.""" + + host_path: str + container_path: str + mode: str # "rw" or "ro" + kind: MountKind + + def _build_device_flags(config: ContainerConfig) -> list[str]: """Build Docker device flags based on resource configuration. @@ -104,17 +117,17 @@ def _build_device_flags(config: ContainerConfig) -> list[str]: return flags -def _detect_mount_user(mounts: list[tuple[str, str, str]]) -> str | None: +def _detect_mount_user(mounts: list[ResolvedMount]) -> str | None: """Detect user to run container as based on bind mount ownership. When bind-mounting directories owned by non-root users, the container must run as that user to have write access. Returns "uid:gid" for --user flag, or None to run as root. """ - for host_path, _container_path, mode in mounts: - if "w" not in mode: + for mount in mounts: + if "w" not in mount.mode: continue - path = Path(host_path) + path = Path(mount.host_path) if not path.exists(): continue stat = path.stat() @@ -234,6 +247,7 @@ class DockerContainerHandle: config: ContainerConfig runtime: "DockerRuntime" + _resolved_mounts: list[ResolvedMount] = field(default_factory=list, repr=False) _run_container_id: str | None = field(default=None, repr=False) @property @@ -312,9 +326,9 @@ def _generate_setup_script(self) -> str: def _write_setup_script(self, script: str) -> None: """Write the setup script to the workdir mount.""" - for host_path, container_path, _mode in self.config.mounts: - if container_path == "/app": - (Path(host_path) / "_setup_env.sh").write_text(script) + for rm in self._resolved_mounts: + if rm.container_path == "/app": + (Path(rm.host_path) / "_setup_env.sh").write_text(script) return raise RuntimeError("No /app mount found in config") @@ -356,9 +370,9 @@ def run(self) -> None: def _write_run_script(self, script: str) -> None: """Write the run script to the workdir mount.""" - for host_path, container_path, _mode in self.config.mounts: - if container_path == "/app": - (Path(host_path) / "_run.sh").write_text(script) + for rm in self._resolved_mounts: + if rm.container_path == "/app": + (Path(rm.host_path) / "_run.sh").write_text(script) return raise RuntimeError("No /app mount found in config") @@ -383,6 +397,15 @@ def stats(self) -> ContainerStats: return ContainerStats(memory_mb=0, cpu_percent=0, process_count=0, available=False) return self._docker_stats(self._run_container_id) + def disk_usage_mb(self) -> int: + """Return used space in MB on the filesystem containing the workdir.""" + for rm in self._resolved_mounts: + if rm.container_path == self.config.workdir: + path = Path(rm.host_path) + if path.exists(): + return int(shutil.disk_usage(path).used / (1024 * 1024)) + return 0 + def profile(self, duration_seconds: int, profile_type: "cluster_pb2.ProfileType") -> bytes: """Profile the running process using py-spy (CPU), memray (memory), or thread dump.""" container_id = self._run_container_id @@ -514,7 +537,7 @@ def _docker_create( ] # Run as the owner of bind-mounted directories - user_flag = _detect_mount_user(config.mounts) + user_flag = _detect_mount_user(self._resolved_mounts) if user_flag: cmd.extend(["--user", user_flag]) @@ -555,8 +578,8 @@ def _docker_create( cmd.extend(["-e", f"{k}={v}"]) # Mounts - for host, container, mode in config.mounts: - cmd.extend(["-v", f"{host}:{container}:{mode}"]) + for rm in self._resolved_mounts: + cmd.extend(["-v", f"{rm.host_path}:{rm.container_path}:{rm.mode}"]) cmd.append(config.image) cmd.extend(command) @@ -712,7 +735,8 @@ class DockerRuntime: Tracks all created containers for cleanup on shutdown. """ - def __init__(self) -> None: + def __init__(self, cache_dir: Path) -> None: + self._cache_dir = cache_dir self._handles: list[DockerContainerHandle] = [] self._created_containers: set[str] = set() # Serializes `docker pull` per image tag so that concurrent task threads @@ -764,16 +788,40 @@ def ensure_image(self, image: str) -> None: logger.info("Image %s pulled successfully", image) self._pulled_images.add(image) + def resolve_mounts(self, mounts: list[MountSpec], workdir_host_path: Path | None = None) -> list[ResolvedMount]: + """Convert semantic MountSpecs to ResolvedMount instances. + + Creates host directories as needed. WORKDIR uses the explicit host path + (created by task_attempt) and mounts tmpfs on it for isolation. + CACHE and TMPFS get dirs under fast_io_dir. + """ + result: list[ResolvedMount] = [] + for mount in mounts: + mode = "ro" if mount.read_only else "rw" + if mount.kind == MountKind.WORKDIR: + if workdir_host_path is None: + raise RuntimeError("WORKDIR mount requires workdir_host_path") + result.append(ResolvedMount(str(workdir_host_path), mount.container_path, mode, mount.kind)) + elif mount.kind in (MountKind.TMPFS, MountKind.CACHE): + host_dir = self._cache_dir / mount.container_path.strip("/").replace("/", "-") + host_dir.mkdir(parents=True, exist_ok=True) + result.append(ResolvedMount(str(host_dir), mount.container_path, mode, mount.kind)) + return result + def create_container(self, config: ContainerConfig) -> DockerContainerHandle: """Create a container handle from config. The handle is not started - call handle.build() then handle.run() to execute the container. """ - handle = DockerContainerHandle(config=config, runtime=self) + resolved = self.resolve_mounts(config.mounts, workdir_host_path=config.workdir_host_path) + handle = DockerContainerHandle(config=config, runtime=self, _resolved_mounts=resolved) self._handles.append(handle) return handle + def prepare_workdir(self, workdir: Path, disk_bytes: int) -> None: + """No-op: workdirs live on cache_dir (/dev/shm/iris) which is already tmpfs.""" + def stage_bundle( self, *, diff --git a/lib/iris/src/iris/cluster/runtime/kubernetes.py b/lib/iris/src/iris/cluster/runtime/kubernetes.py index 05b0faf7fe..142df3fb14 100644 --- a/lib/iris/src/iris/cluster/runtime/kubernetes.py +++ b/lib/iris/src/iris/cluster/runtime/kubernetes.py @@ -37,6 +37,7 @@ ContainerPhase, ContainerStats, ContainerStatus, + MountKind, ) from iris.cluster.worker.worker_types import LogLine from iris.rpc import cluster_pb2 @@ -218,28 +219,39 @@ def run(self) -> None: mounts = [] volumes = [] - for i, (host_path, container_path, mode) in enumerate(self.config.mounts): - # /app is pod-local emptyDir so tasks can run on any node. - if container_path == self.config.workdir: - continue - + for i, mount in enumerate(self.config.mounts): volume_name = f"mount-{i}" - mounts.append( - { - "name": volume_name, - "mountPath": container_path, - "readOnly": "ro" in mode, - } - ) - volumes.append( - { - "name": volume_name, - "hostPath": {"path": host_path, "type": "DirectoryOrCreate"}, - } - ) - - mounts.append({"name": "workdir", "mountPath": self.config.workdir, "readOnly": False}) - volumes.append({"name": "workdir", "emptyDir": {}}) + if mount.kind in (MountKind.WORKDIR, MountKind.TMPFS): + empty_dir_spec: dict[str, str] = {} + if mount.size_bytes > 0: + empty_dir_spec["sizeLimit"] = str(mount.size_bytes) + mounts.append( + { + "name": volume_name, + "mountPath": mount.container_path, + "readOnly": mount.read_only, + } + ) + volumes.append( + { + "name": volume_name, + "emptyDir": empty_dir_spec, + } + ) + elif mount.kind == MountKind.CACHE: + mounts.append( + { + "name": volume_name, + "mountPath": mount.container_path, + "readOnly": mount.read_only, + } + ) + volumes.append( + { + "name": volume_name, + "hostPath": {"path": mount.container_path, "type": "DirectoryOrCreate"}, + } + ) workdir_files = dict(self.config.entrypoint.workdir_files) if workdir_files: @@ -277,10 +289,17 @@ def run(self) -> None: container["volumeMounts"] = mounts + # Find the workdir volume name for the init container + workdir_volume_name = None + for i, mount in enumerate(self.config.mounts): + if mount.kind == MountKind.WORKDIR: + workdir_volume_name = f"mount-{i}" + break + init_containers: list[dict[str, object]] = [] bundle_id = self.config.env.get("IRIS_BUNDLE_ID", "") if bundle_id or workdir_files: - stage_mounts = [{"name": "workdir", "mountPath": self.config.workdir, "readOnly": False}] + stage_mounts = [{"name": workdir_volume_name, "mountPath": self.config.workdir, "readOnly": False}] stage_env = [ {"name": "IRIS_WORKDIR", "value": self.config.workdir}, {"name": "IRIS_BUNDLE_ID", "value": bundle_id}, @@ -376,12 +395,14 @@ def run(self) -> None: self._workdir_configmap_name = None raise self._started = True + workdir_mount = next((m for m in self.config.mounts if m.kind == MountKind.WORKDIR), None) + workdir_size = workdir_mount.size_bytes if workdir_mount else 0 logger.info( "Started Kubernetes task pod %s (task_id=%s, gpus=%s, disk=%s, hostNetwork=%s)", self._pod_name, self.config.task_id, gpu_limits.get("nvidia.com/gpu", "0"), - f"{disk_gi}Gi" if disk_bytes else "default", + f"{workdir_size // (1024 * 1024 * 1024)}Gi" if workdir_size else "default", self.config.network_mode == "host", ) @@ -493,6 +514,10 @@ def stats(self) -> ContainerStats: available=True, ) + def disk_usage_mb(self) -> int: + """K8s workdir lives inside the pod; disk usage isn't observable from the worker.""" + return 0 + def profile(self, duration_seconds: int, profile_type: cluster_pb2.ProfileType) -> bytes: """Profile the running process using py-spy (CPU), memray (memory), or thread dump.""" if not self._pod_name: @@ -632,6 +657,9 @@ def create_container(self, config: ContainerConfig) -> KubernetesContainerHandle self._handles.append(handle) return handle + def prepare_workdir(self, workdir: Path, disk_bytes: int) -> None: + pass + def stage_bundle( self, *, diff --git a/lib/iris/src/iris/cluster/runtime/process.py b/lib/iris/src/iris/cluster/runtime/process.py index 16e92c0f78..6d40f10890 100644 --- a/lib/iris/src/iris/cluster/runtime/process.py +++ b/lib/iris/src/iris/cluster/runtime/process.py @@ -22,6 +22,7 @@ import logging import os import select +import shutil import signal import subprocess import sys @@ -48,6 +49,7 @@ ContainerPhase, ContainerStats, ContainerStatus, + MountKind, RuntimeLogReader, ) from iris.cluster.worker.worker_types import LogLine @@ -374,6 +376,25 @@ def read_all(self) -> list[LogLine]: return list(self._logs) +def _resolve_mount_map(config: ContainerConfig, cache_dir: Path | None = None) -> dict[str, str]: + """Build container_path -> host_path mapping for process runtime. + + WORKDIR mounts resolve to config.workdir_host_path (set by task_attempt). + CACHE/TMPFS mounts resolve to subdirectories under cache_dir, created on demand. + """ + result: dict[str, str] = {} + for mount in config.mounts: + if mount.kind == MountKind.WORKDIR: + if config.workdir_host_path: + result[mount.container_path] = str(config.workdir_host_path) + elif mount.kind in (MountKind.CACHE, MountKind.TMPFS): + if cache_dir: + host_dir = cache_dir / mount.container_path.strip("/").replace("/", "-") + host_dir.mkdir(parents=True, exist_ok=True) + result[mount.container_path] = str(host_dir) + return result + + @dataclass class ProcessContainerHandle: """Process implementation of ContainerHandle. @@ -410,7 +431,7 @@ def run(self) -> None: config = self.config # Remap container paths to host paths in env vars - mount_map = {container_path: host_path for host_path, container_path, _ in config.mounts} + mount_map = _resolve_mount_map(config, cache_dir=self.runtime._cache_dir) env = {**build_device_env_vars(config), **dict(config.env)} for key, value in env.items(): if value in mount_map: @@ -484,6 +505,12 @@ def stats(self) -> ContainerStats: available=memory_mb is not None, ) + def disk_usage_mb(self) -> int: + """Return used space in MB on the filesystem containing the workdir.""" + if self.config.workdir_host_path and self.config.workdir_host_path.exists(): + return int(shutil.disk_usage(self.config.workdir_host_path).used / (1024 * 1024)) + return 0 + def profile(self, duration_seconds: int, profile_type: cluster_pb2.ProfileType) -> bytes: """Profile the running process using py-spy (CPU), memray (memory), or thread dump.""" @@ -589,7 +616,8 @@ class ProcessRuntime: Creates ProcessContainerHandle instances with the build/run lifecycle. """ - def __init__(self): + def __init__(self, cache_dir: Path): + self._cache_dir = cache_dir self._handles: list[ProcessContainerHandle] = [] _active_runtimes.add(self) @@ -603,6 +631,9 @@ def create_container(self, config: ContainerConfig) -> ProcessContainerHandle: self._handles.append(handle) return handle + def prepare_workdir(self, workdir: Path, disk_bytes: int) -> None: + pass + def stage_bundle( self, *, diff --git a/lib/iris/src/iris/cluster/runtime/types.py b/lib/iris/src/iris/cluster/runtime/types.py index debba24432..b357715a8b 100644 --- a/lib/iris/src/iris/cluster/runtime/types.py +++ b/lib/iris/src/iris/cluster/runtime/types.py @@ -55,6 +55,20 @@ class ContainerPhase(StrEnum): STOPPED = "stopped" +class MountKind(StrEnum): + WORKDIR = "workdir" # task working directory (/app); tmpfs on Docker, emptyDir on K8s + TMPFS = "tmpfs" # volatile fast storage; tmpfs on Docker, emptyDir on K8s + CACHE = "cache" # persistent cross-task cache (uv, cargo); hostPath bind mount + + +@dataclass(frozen=True) +class MountSpec: + container_path: str + kind: MountKind = MountKind.CACHE + read_only: bool = False + size_bytes: int = 0 # 0 = no limit; tmpfs size / emptyDir sizeLimit + + @dataclass class ContainerConfig: """Configuration for running a container.""" @@ -65,8 +79,9 @@ class ContainerConfig: workdir: str = "/app" resources: cluster_pb2.ResourceSpecProto | None = None timeout_seconds: int | None = None - mounts: list[tuple[str, str, str]] = field(default_factory=list) # (host, container, mode) + mounts: list[MountSpec] = field(default_factory=list) network_mode: str = "host" # e.g. "host" for --network=host + workdir_host_path: Path | None = None task_id: str | None = None attempt_id: int | None = None job_id: str | None = None @@ -208,6 +223,14 @@ def stats(self) -> ContainerStats: """Get resource usage statistics.""" ... + def disk_usage_mb(self) -> int: + """Return disk usage in MB for this container's workdir. + + Docker/Process: shutil.disk_usage on the host workdir path. + K8s: 0 (workdir lives inside the pod, not on the worker node). + """ + ... + def profile(self, duration_seconds: int, profile_type: cluster_pb2.ProfileType) -> bytes: """Profile the running process using py-spy (CPU), memray (memory), or thread dump. @@ -243,6 +266,14 @@ def create_container(self, config: ContainerConfig) -> ContainerHandle: """ ... + def prepare_workdir(self, workdir: Path, disk_bytes: int) -> None: + """Prepare the task workdir before bundle staging. + + Docker: mounts a per-task tmpfs for quota enforcement. + Process/K8s: no-op. + """ + ... + def stage_bundle( self, *, diff --git a/lib/iris/src/iris/cluster/worker/env_probe.py b/lib/iris/src/iris/cluster/worker/env_probe.py index f498c68c78..23dd3a9192 100644 --- a/lib/iris/src/iris/cluster/worker/env_probe.py +++ b/lib/iris/src/iris/cluster/worker/env_probe.py @@ -186,27 +186,6 @@ def _get_disk_bytes() -> int: return 100 * 1024**3 # Default 100GB -def collect_workdir_size_mb(workdir: Path) -> int: - """Calculate workdir size in MB using du -sm.""" - if not workdir.exists(): - return 0 - - result = subprocess.run( - ["du", "-sm", str(workdir)], - capture_output=True, - text=True, - ) - - if result.returncode != 0: - return 0 - - # du -sm output format: "SIZE\tPATH" - output = result.stdout.strip() - size_str = output.split("\t")[0] - - return int(size_str) - - def _build_worker_attributes( *, accelerator_type: int, diff --git a/lib/iris/src/iris/cluster/worker/main.py b/lib/iris/src/iris/cluster/worker/main.py index 64fc28da38..7af95685f5 100644 --- a/lib/iris/src/iris/cluster/worker/main.py +++ b/lib/iris/src/iris/cluster/worker/main.py @@ -71,7 +71,7 @@ def resolve_image(image: str) -> str: if wc_proto.runtime == "kubernetes": container_runtime = KubernetesRuntime() else: - container_runtime = DockerRuntime() + container_runtime = DockerRuntime(cache_dir=config.cache_dir) worker = Worker(config, container_runtime=container_runtime) diff --git a/lib/iris/src/iris/cluster/worker/task_attempt.py b/lib/iris/src/iris/cluster/worker/task_attempt.py index 654a78523d..8ab6858183 100644 --- a/lib/iris/src/iris/cluster/worker/task_attempt.py +++ b/lib/iris/src/iris/cluster/worker/task_attempt.py @@ -9,7 +9,6 @@ import json import logging -import os import shutil import socket import threading @@ -29,6 +28,8 @@ ContainerPhase, ContainerRuntime, RuntimeLogReader, + MountKind, + MountSpec, ) from iris.cluster.types import ( JobName, @@ -36,7 +37,6 @@ is_task_finished, ) from iris.cluster.bundle import BundleStore -from iris.cluster.worker.env_probe import collect_workdir_size_mb from iris.cluster.worker.port_allocator import PortAllocator from iris.cluster.log_store import LogCursor, LogStore, task_log_key from iris.logging import parse_log_level, str_to_log_level @@ -81,32 +81,7 @@ def _format_exit_error(exit_code: int | None, oom_killed: bool = False) -> str: return f"Exit code: {exit_code}" -# GCE persistent disks (pd-standard) provide only ~68 read / ~135 write IOPS on -# small volumes, making uv sync extremely slow. /dev/shm is tmpfs backed by RAM -# and provides memory-speed IOPS. The bootstrap script bind-mounts -# /dev/shm/iris into the worker container so this path is available on GCE VMs. -_TMPFS_DIR = Path("/dev/shm/iris") -_TMPFS_MIN_FREE_BYTES = 1 * 1024 * 1024 * 1024 # 1 GB - - -def get_fast_io_dir(cache_dir: Path) -> Path: - """Return a fast IO directory for ephemeral task data. - - Prefers /dev/shm/iris (tmpfs) for memory-speed IOPS when available and has - sufficient free space. Falls back to *cache_dir* on persistent disk. - """ - try: - if _TMPFS_DIR.is_dir(): - stat = os.statvfs(_TMPFS_DIR) - free_bytes = stat.f_bavail * stat.f_frsize - if free_bytes >= _TMPFS_MIN_FREE_BYTES: - logger.info("Using tmpfs at %s for fast IO (%d MB free)", _TMPFS_DIR, free_bytes // (1024 * 1024)) - return _TMPFS_DIR - except OSError: - logger.warning("OSError checking tmpfs at %s, falling back to persistent disk", _TMPFS_DIR, exc_info=True) - else: - logger.warning("Fast IO (tmpfs) not available at %s, falling back to persistent disk", _TMPFS_DIR) - return cache_dir +_DISK_CHECK_INTERVAL_SECONDS = 60.0 class TaskCancelled(Exception): @@ -464,14 +439,16 @@ def _setup(self) -> None: allocated_ports = self._port_allocator.allocate(len(port_names)) if port_names else [] self.ports = dict(zip(port_names, allocated_ports, strict=True)) - # Resolve fast IO directory (tmpfs when available, else cache_dir) - self._fast_io_dir = get_fast_io_dir(self._cache_dir) - # Create task working directory with attempt isolation safe_task_id = self.task_id.to_safe_token() - self.workdir = self._fast_io_dir / "workdirs" / f"{safe_task_id}_attempt_{self.attempt_id}" + self.workdir = self._cache_dir / "workdirs" / f"{safe_task_id}_attempt_{self.attempt_id}" self.workdir.mkdir(parents=True, exist_ok=True) + # Mount tmpfs on workdir for quota enforcement (Docker only; no-op for process/k8s). + # Must happen before _download_bundle() so staged files land on the tmpfs. + disk_bytes = self.request.resources.disk_bytes if self.request.HasField("resources") else 0 + self._runtime.prepare_workdir(self.workdir, disk_bytes) + def run(self) -> None: """Execute the full task lifecycle. Intended to run in a background thread. @@ -614,20 +591,11 @@ def _create_container(self) -> None: assert self.workdir is not None job_id, _ = self.task_id.require_task() - # Pre-create cache mount directories so Docker doesn't create them as root. - # Use tmpfs-backed fast IO dir when available for better IOPS. - uv_cache = self._fast_io_dir / "uv" - cargo_cache = self._fast_io_dir / "cargo" - cargo_target = self._fast_io_dir / "cargo-target" - uv_cache.mkdir(parents=True, exist_ok=True) - cargo_cache.mkdir(parents=True, exist_ok=True) - cargo_target.mkdir(parents=True, exist_ok=True) - mounts = [ - (str(self.workdir), "/app", "rw"), - (str(uv_cache), "/uv/cache", "rw"), - (str(cargo_cache), "/root/.cargo/registry", "rw"), - (str(cargo_target), "/root/.cargo/target", "rw"), + MountSpec("/app", kind=MountKind.WORKDIR), + MountSpec("/uv/cache", kind=MountKind.CACHE), + MountSpec("/root/.cargo/registry", kind=MountKind.CACHE), + MountSpec("/root/.cargo/target", kind=MountKind.CACHE), ] config = ContainerConfig( @@ -637,6 +605,7 @@ def _create_container(self) -> None: resources=self.request.resources if self.request.HasField("resources") else None, timeout_seconds=timeout_seconds, mounts=mounts, + workdir_host_path=self.workdir, task_id=self.task_id.to_wire(), attempt_id=self.attempt_id, job_id=job_id.to_wire(), @@ -711,6 +680,7 @@ def _monitor_loop( log_reader: RuntimeLogReader, deadline: Deadline | None, ) -> None: + last_disk_check = 0.0 while True: if rule := chaos("worker.task_monitor"): time.sleep(rule.delay_seconds) @@ -794,8 +764,10 @@ def _monitor_loop( if stats.memory_mb > self.peak_memory_mb: self.peak_memory_mb = stats.memory_mb - if self.workdir: - self.disk_mb = collect_workdir_size_mb(self.workdir) + now = time.monotonic() + if now - last_disk_check >= _DISK_CHECK_INTERVAL_SECONDS: + self.disk_mb = handle.disk_usage_mb() + last_disk_check = now except Exception: logger.debug("Stats collection failed for task %s", self.task_id, exc_info=True) @@ -845,7 +817,7 @@ def _cleanup(self) -> None: except Exception as e: logger.warning("Failed to release ports for task %s: %s", self.task_id, e) - # Remove working directory + # Remove working directory (handle.cleanup() already released backing storage) if self.workdir and self.workdir.exists(): try: shutil.rmtree(self.workdir) diff --git a/lib/iris/src/iris/cluster/worker/worker.py b/lib/iris/src/iris/cluster/worker/worker.py index 9744f2701c..3413e59320 100644 --- a/lib/iris/src/iris/cluster/worker/worker.py +++ b/lib/iris/src/iris/cluster/worker/worker.py @@ -142,7 +142,7 @@ def __init__( controller_address=config.controller_address, max_items=100, ) - self._runtime = container_runtime or DockerRuntime() + self._runtime = container_runtime or DockerRuntime(cache_dir=self._cache_dir) self._port_allocator = port_allocator or PortAllocator(config.port_range) # Resolve worker metadata: explicit > environment_provider > hardware probe diff --git a/lib/iris/tests/cluster/runtime/test_docker_runtime.py b/lib/iris/tests/cluster/runtime/test_docker_runtime.py new file mode 100644 index 0000000000..9d84efea62 --- /dev/null +++ b/lib/iris/tests/cluster/runtime/test_docker_runtime.py @@ -0,0 +1,96 @@ +# Copyright The Marin Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for DockerRuntime resolve_mounts, prepare_workdir, and stage_bundle.""" + +import subprocess +from unittest.mock import Mock + +import pytest + +from iris.cluster.bundle import BundleStore +from iris.cluster.runtime.docker import DockerRuntime +from iris.cluster.runtime.types import MountKind, MountSpec + + +@pytest.fixture +def runtime(tmp_path): + return DockerRuntime(cache_dir=tmp_path / "cache") + + +@pytest.fixture +def mock_bundle_store(): + store = Mock(spec=BundleStore) + store.extract_bundle_to = Mock() + store.write_workdir_files = Mock() + return store + + +def test_resolve_mounts_workdir(monkeypatch, tmp_path, runtime): + """resolve_mounts resolves WORKDIR to the given host path.""" + calls: list[list[str]] = [] + + def fake_run(cmd, **kwargs): + calls.append(cmd) + return subprocess.CompletedProcess(args=cmd, returncode=0, stdout="", stderr="") + + monkeypatch.setattr("iris.cluster.runtime.docker.subprocess.run", fake_run) + + workdir = tmp_path / "task-workdir" + workdir.mkdir() + mounts = [MountSpec(container_path="/app", kind=MountKind.WORKDIR, size_bytes=1024 * 1024 * 512)] + resolved = runtime.resolve_mounts(mounts, workdir_host_path=workdir) + + assert len(calls) == 0 + assert len(resolved) == 1 + assert resolved[0].host_path == str(workdir) + assert resolved[0].container_path == "/app" + assert resolved[0].kind == MountKind.WORKDIR + + +def test_resolve_mounts_cache_uses_cache_dir(tmp_path, runtime): + """CACHE mounts resolve to subdirectories under cache_dir.""" + mounts = [MountSpec(container_path="/root/.cache/uv", kind=MountKind.CACHE)] + resolved = runtime.resolve_mounts(mounts) + + assert len(resolved) == 1 + assert resolved[0].host_path.startswith(str(tmp_path / "cache")) + assert resolved[0].container_path == "/root/.cache/uv" + assert resolved[0].kind == MountKind.CACHE + + +def test_resolve_mounts_workdir_requires_host_path(tmp_path): + """WORKDIR mount without workdir_host_path raises RuntimeError.""" + runtime = DockerRuntime(cache_dir=tmp_path / "cache") + mounts = [MountSpec(container_path="/app", kind=MountKind.WORKDIR)] + with pytest.raises(RuntimeError, match="workdir_host_path"): + runtime.resolve_mounts(mounts) + + +def test_prepare_workdir_is_noop(tmp_path, runtime): + """prepare_workdir is a no-op since cache_dir is already on /dev/shm.""" + workdir = tmp_path / "task-workdir" + workdir.mkdir() + # Should return without doing anything + runtime.prepare_workdir(workdir, disk_bytes=1024 * 1024 * 512) + + +def test_stage_bundle(monkeypatch, tmp_path, runtime, mock_bundle_store): + """stage_bundle extracts bundle and writes workdir files.""" + calls: list = [] + monkeypatch.setattr( + "iris.cluster.runtime.docker.subprocess.run", + lambda cmd, **kw: calls.append(cmd) or subprocess.CompletedProcess(cmd, 0), + ) + + workdir = tmp_path / "w" + workdir.mkdir() + runtime.stage_bundle( + bundle_id="abc", + workdir=workdir, + workdir_files={}, + bundle_store=mock_bundle_store, + ) + assert len(calls) == 0 + mock_bundle_store.extract_bundle_to.assert_called_once_with("abc", workdir) + mock_bundle_store.write_workdir_files.assert_called_once_with(workdir, {}) diff --git a/lib/iris/tests/cluster/runtime/test_kubernetes_runtime.py b/lib/iris/tests/cluster/runtime/test_kubernetes_runtime.py index 79093d6056..1a5bdf5e24 100644 --- a/lib/iris/tests/cluster/runtime/test_kubernetes_runtime.py +++ b/lib/iris/tests/cluster/runtime/test_kubernetes_runtime.py @@ -20,7 +20,7 @@ from iris.cluster.bundle import BundleStore from iris.cluster.runtime.kubernetes import KubernetesRuntime -from iris.cluster.runtime.types import ContainerConfig, ContainerErrorKind, ContainerPhase +from iris.cluster.runtime.types import ContainerConfig, ContainerErrorKind, ContainerPhase, MountKind, MountSpec from iris.rpc import cluster_pb2 @@ -412,6 +412,48 @@ def fake_get_json(resource: str, name: str): assert status.phase == ContainerPhase.RUNNING +def test_disk_bytes_sets_emptydir_sizelimit(monkeypatch): + """When a WORKDIR MountSpec has size_bytes, the emptyDir volume should have a sizeLimit.""" + manifests = _capture_manifest(monkeypatch) + + device = cluster_pb2.DeviceConfig(gpu=cluster_pb2.GpuDevice(count=0)) + resources = cluster_pb2.ResourceSpecProto(device=device, disk_bytes=10 * 1024**3) + config = ContainerConfig( + image="ghcr.io/example/task:latest", + entrypoint=_make_entrypoint(["python", "-c", "print('ok')"]), + env={"FOO": "bar"}, + workdir="/app", + task_id="job/task/0", + network_mode="host", + resources=resources, + mounts=[MountSpec(container_path="/app", kind=MountKind.WORKDIR, size_bytes=10 * 1024**3)], + ) + + runtime = KubernetesRuntime(namespace="iris") + handle = runtime.create_container(config) + handle.run() + + pod = _pod_manifest(manifests) + workdir_vol = next(v for v in pod["spec"]["volumes"] if "emptyDir" in v) + assert "sizeLimit" in workdir_vol["emptyDir"] + assert workdir_vol["emptyDir"]["sizeLimit"] == str(10 * 1024**3) + + +def test_no_disk_bytes_emptydir_has_no_sizelimit(monkeypatch): + """When a WORKDIR MountSpec has size_bytes=0, the emptyDir volume should have no sizeLimit.""" + manifests = _capture_manifest(monkeypatch) + + config = _make_config() + config.mounts = [MountSpec(container_path="/app", kind=MountKind.WORKDIR, size_bytes=0)] + runtime = KubernetesRuntime(namespace="iris") + handle = runtime.create_container(config) + handle.run() + + pod = _pod_manifest(manifests) + workdir_vol = next(v for v in pod["spec"]["volumes"] if "emptyDir" in v) + assert workdir_vol["emptyDir"] == {} + + def _make_zip(entries: dict[str, bytes]) -> bytes: output = io.BytesIO() with zipfile.ZipFile(output, "w", compression=zipfile.ZIP_DEFLATED) as zf: diff --git a/lib/iris/tests/cluster/worker/conftest.py b/lib/iris/tests/cluster/worker/conftest.py index 5a062dd810..58a6461378 100644 --- a/lib/iris/tests/cluster/worker/conftest.py +++ b/lib/iris/tests/cluster/worker/conftest.py @@ -9,8 +9,8 @@ @pytest.fixture -def docker_runtime(): +def docker_runtime(tmp_path): """DockerRuntime that cleans up its own containers after the test.""" - rt = DockerRuntime() + rt = DockerRuntime(cache_dir=tmp_path / "cache") yield rt rt.cleanup() diff --git a/lib/iris/tests/cluster/worker/test_dashboard.py b/lib/iris/tests/cluster/worker/test_dashboard.py index 5057c1689c..06bc757509 100644 --- a/lib/iris/tests/cluster/worker/test_dashboard.py +++ b/lib/iris/tests/cluster/worker/test_dashboard.py @@ -50,6 +50,7 @@ def create_mock_container_handle(): handle.stop = Mock() handle.logs = Mock(return_value=[]) handle.stats = Mock(return_value=ContainerStats(memory_mb=100, cpu_percent=50, process_count=1, available=True)) + handle.disk_usage_mb = Mock(return_value=0) handle.cleanup = Mock() return handle diff --git a/lib/iris/tests/cluster/worker/test_env_probe.py b/lib/iris/tests/cluster/worker/test_env_probe.py index f3daed86a6..d09d40d9dc 100644 --- a/lib/iris/tests/cluster/worker/test_env_probe.py +++ b/lib/iris/tests/cluster/worker/test_env_probe.py @@ -363,6 +363,9 @@ def fake_read_net(): assert snapshot2.net_sent_bps == 2000 +# --- Network metrics --- + + def test_host_metrics_collector_network_graceful_on_non_linux(monkeypatch): """Network collection silently returns 0 on systems without /proc/net/dev.""" monkeypatch.setattr(env_probe, "_read_net_dev_bytes", lambda: (_ for _ in ()).throw(OSError("no /proc/net/dev"))) diff --git a/lib/iris/tests/cluster/worker/test_fast_io.py b/lib/iris/tests/cluster/worker/test_fast_io.py deleted file mode 100644 index 872c70a7b9..0000000000 --- a/lib/iris/tests/cluster/worker/test_fast_io.py +++ /dev/null @@ -1,74 +0,0 @@ -# Copyright The Marin Authors -# SPDX-License-Identifier: Apache-2.0 - -"""Tests for tmpfs-based fast IO directory selection.""" - -import os -from pathlib import Path -from unittest.mock import patch - -from iris.cluster.worker.task_attempt import get_fast_io_dir - - -def test_fast_io_dir_uses_tmpfs_when_available(tmp_path: Path) -> None: - """When tmpfs directory exists and has sufficient space, use it.""" - fake_tmpfs = tmp_path / "shm" / "iris" - fake_tmpfs.mkdir(parents=True) - - with ( - patch.object( - os, - "statvfs", - return_value=os.statvfs_result((4096, 4096, 1000000, 900000, 800000, 1000000, 900000, 800000, 0, 255)), - ), - patch("iris.cluster.worker.task_attempt._TMPFS_DIR", fake_tmpfs), - ): - result = get_fast_io_dir(tmp_path / "cache") - - assert result == fake_tmpfs - - -def test_fast_io_dir_falls_back_when_tmpfs_too_small(tmp_path: Path) -> None: - """When tmpfs has insufficient space, fall back to cache_dir.""" - fake_tmpfs = tmp_path / "shm" / "iris" - fake_tmpfs.mkdir(parents=True) - cache_dir = tmp_path / "cache" - - # f_bavail * f_frsize < _TMPFS_MIN_FREE_BYTES - with ( - patch.object( - os, - "statvfs", - return_value=os.statvfs_result((4096, 4096, 100, 50, 50, 100, 50, 50, 0, 255)), - ), - patch("iris.cluster.worker.task_attempt._TMPFS_DIR", fake_tmpfs), - ): - result = get_fast_io_dir(cache_dir) - - assert result == cache_dir - - -def test_fast_io_dir_falls_back_when_tmpfs_missing(tmp_path: Path) -> None: - """When tmpfs directory doesn't exist, fall back to cache_dir.""" - cache_dir = tmp_path / "cache" - nonexistent = tmp_path / "nonexistent" / "iris" - - with patch("iris.cluster.worker.task_attempt._TMPFS_DIR", nonexistent): - result = get_fast_io_dir(cache_dir) - - assert result == cache_dir - - -def test_fast_io_dir_falls_back_on_oserror(tmp_path: Path) -> None: - """When statvfs raises OSError, fall back to cache_dir.""" - fake_tmpfs = tmp_path / "shm" / "iris" - fake_tmpfs.mkdir(parents=True) - cache_dir = tmp_path / "cache" - - with ( - patch.object(os, "statvfs", side_effect=OSError("permission denied")), - patch("iris.cluster.worker.task_attempt._TMPFS_DIR", fake_tmpfs), - ): - result = get_fast_io_dir(cache_dir) - - assert result == cache_dir diff --git a/lib/iris/tests/cluster/worker/test_worker.py b/lib/iris/tests/cluster/worker/test_worker.py index 69ea7aa771..caea57034f 100644 --- a/lib/iris/tests/cluster/worker/test_worker.py +++ b/lib/iris/tests/cluster/worker/test_worker.py @@ -136,6 +136,7 @@ def status_side_effect(): handle.log_reader = Mock(return_value=log_reader_mock) handle.stats = Mock(return_value=ContainerStats(memory_mb=100, cpu_percent=50, process_count=5, available=True)) + handle.disk_usage_mb = Mock(return_value=0) handle.cleanup = Mock() return handle @@ -794,7 +795,7 @@ def test_bundle(tmp_path): @pytest.fixture def real_worker(cache_dir): """Create Worker with real components (not mocks).""" - runtime = DockerRuntime() + runtime = DockerRuntime(cache_dir=cache_dir) config = WorkerConfig( port=0, cache_dir=cache_dir, diff --git a/lib/iris/tests/e2e/_docker_cluster.py b/lib/iris/tests/e2e/_docker_cluster.py index cc97d25321..4b069f624d 100644 --- a/lib/iris/tests/e2e/_docker_cluster.py +++ b/lib/iris/tests/e2e/_docker_cluster.py @@ -154,7 +154,7 @@ def __enter__(self): controller_address=f"http://127.0.0.1:{self._controller_port}", max_items=10, ) - self._container_runtime = DockerRuntime() + self._container_runtime = DockerRuntime(cache_dir=cache_path) container_runtime = self._container_runtime for i in range(self._num_workers): diff --git a/lib/iris/tests/e2e/conftest.py b/lib/iris/tests/e2e/conftest.py index 469e76d6c1..784dcf554c 100644 --- a/lib/iris/tests/e2e/conftest.py +++ b/lib/iris/tests/e2e/conftest.py @@ -12,6 +12,7 @@ autouse fixture. """ +import fcntl import logging import os import shutil @@ -48,16 +49,28 @@ @pytest.fixture(scope="session", autouse=True) -def _ensure_dashboard_built(): - """Build dashboard assets once per session so dashboard tests have content to render.""" +def _ensure_dashboard_built(tmp_path_factory): + """Build dashboard assets once per session so dashboard tests have content to render. + + With pytest-xdist each worker gets its own session fixture, so all 8 workers + race to run ``npm ci`` in the same directory — corrupting node_modules. + A filelock serialises this so only one worker installs at a time. + """ dashboard_dir = IRIS_ROOT / "dashboard" if not (dashboard_dir / "package.json").exists(): return if shutil.which("npm") is None: logging.getLogger(__name__).warning("npm not found, skipping dashboard build for tests") return - subprocess.run(["npm", "ci"], cwd=dashboard_dir, check=True, capture_output=True) - subprocess.run(["npm", "run", "build"], cwd=dashboard_dir, check=True, capture_output=True) + + lock_path = tmp_path_factory.getbasetemp().parent / "dashboard_build.lock" + with open(lock_path, "w") as lock_fd: + fcntl.flock(lock_fd, fcntl.LOCK_EX) + try: + subprocess.run(["npm", "ci"], cwd=dashboard_dir, check=True, capture_output=True) + subprocess.run(["npm", "run", "build"], cwd=dashboard_dir, check=True, capture_output=True) + finally: + fcntl.flock(lock_fd, fcntl.LOCK_UN) def pytest_addoption(parser): diff --git a/lib/iris/tests/e2e/test_smoke.py b/lib/iris/tests/e2e/test_smoke.py index b43687c985..43833a5646 100644 --- a/lib/iris/tests/e2e/test_smoke.py +++ b/lib/iris/tests/e2e/test_smoke.py @@ -157,7 +157,9 @@ def _cloud_smoke_cluster(config_path: str, mode: str, label_prefix: str | None = iris_config = IrisConfig(config) platform = iris_config.platform() - # Tear down any existing cluster for a clean slate + # Tear down any existing cluster for a clean slate. + # GCE controller deletion is synchronous, so the old controller is fully + # gone before we clear remote state (no stale checkpoint race). logger.info("Stopping any existing cluster...") try: platform.stop_all(config) @@ -874,7 +876,7 @@ def test_gpu_worker_metadata(tmp_path): ) worker = Worker( worker_config, - container_runtime=ProcessRuntime(), + container_runtime=ProcessRuntime(cache_dir=cache_dir), environment_provider=env_provider, threads=threads, )