diff --git a/changes/11223.enhance.md b/changes/11223.enhance.md new file mode 100644 index 00000000000..e0b2a41ae98 --- /dev/null +++ b/changes/11223.enhance.md @@ -0,0 +1,4 @@ +Default to cgroup (sysfs) stat collection on native Linux hosts, falling back to the Docker API only on linuxkit or read failure. +Note: reported container memory usage may step down on hosts previously using `stats-type: docker`, because sysfs excludes inactive file cache (matching `docker stats`). +Block-I/O readings may also shift on cgroup v1 hosts, because the sysfs path reads `blkio.throttle.io_service_bytes` while the Docker API path reads `blkio_stats.io_service_bytes_recursive` (which sums across nested cgroups). +Dashboards or autoscaling thresholds tuned to the old values should be re-evaluated after upgrade. diff --git a/src/ai/backend/agent/config/unified.py b/src/ai/backend/agent/config/unified.py index 8e24177e4eb..e9bc2d5eba4 100644 --- a/src/ai/backend/agent/config/unified.py +++ b/src/ai/backend/agent/config/unified.py @@ -1335,7 +1335,7 @@ class OverridableContainerConfig(BaseConfigSchema): stats_type: Annotated[ StatModes | None, Field( - default=StatModes.DOCKER, + default=None, validation_alias=AliasChoices("stats-type", "stats_type"), serialization_alias="stats-type", ), @@ -1343,8 +1343,10 @@ class OverridableContainerConfig(BaseConfigSchema): description=( "Method for collecting container resource statistics. " "'docker' uses Docker's stats API (most compatible). " - "'cgroup' reads from cgroup filesystem directly (more accurate, requires root). " - "'null' disables statistics collection." + "'cgroup' reads from cgroup filesystem directly (faster and more accurate, " + "requires root on Linux). " + "When unset, the agent auto-selects: cgroup on native Linux hosts " + "(with Docker API fallback on read failure) and docker elsewhere." ), added_version="25.12.0", example=ConfigExample(local="docker", prod="docker"), @@ -1531,7 +1533,7 @@ def _validate_sandbox_type(cls, sandbox_type: ContainerSandboxType) -> Container @field_validator("stats_type", mode="after") @classmethod - def _validate_stats_type(cls, stats_type: StatModes) -> StatModes: + def _validate_stats_type(cls, stats_type: StatModes | None) -> StatModes | None: if stats_type == StatModes.CGROUP: if os.getuid() != 0: raise ValueError( diff --git a/src/ai/backend/agent/docker/intrinsic.py b/src/ai/backend/agent/docker/intrinsic.py index e49ed0589d9..097f7423995 100644 --- a/src/ai/backend/agent/docker/intrinsic.py +++ b/src/ai/backend/agent/docker/intrinsic.py @@ -8,12 +8,13 @@ from collections.abc import Collection, Iterable, Mapping, Sequence from decimal import Decimal from pathlib import Path -from typing import Any, cast +from typing import Any, assert_never, cast import aiohttp import psutil from aiodocker.docker import Docker, DockerContainer from aiodocker.exceptions import DockerError +from cachetools import LRUCache from ai.backend.agent import __version__ # pants: no-infer-dep from ai.backend.agent.alloc_map import AllocationStrategy @@ -73,6 +74,33 @@ _CONTAINER_STAT_TIMEOUT: float = 2.0 _INVALID_PID: int = 0 +# Tracks containers for which we have already logged a cgroup->Docker-API +# fallback warning, to avoid log spam on persistent read failures. +# Bounded LRU cache to prevent unbounded growth on hosts with high container churn; +# oldest entries are evicted on overflow, so a recreated container may re-warn. +# Shared across CPUPlugin / MemoryPlugin since keys are namespaced via the `plugin:` prefix. +_CGROUP_FALLBACK_WARN_CACHE_SIZE = 1024 +_cgroup_fallback_warned: LRUCache[str, None] = LRUCache( + maxsize=_CGROUP_FALLBACK_WARN_CACHE_SIZE, +) + + +def _warn_cgroup_fallback_once(plugin: str, container_id: str) -> None: + key = f"{plugin}:{container_id}" + if key in _cgroup_fallback_warned: + return + _cgroup_fallback_warned[key] = None + log.warning( + "{0}: cgroup sysfs read failed for container {1}; falling back to Docker API", + plugin, + container_id[:7], + ) + + +def _is_linuxkit(local_config: Mapping[str, Any]) -> bool: + return cast(str, local_config["agent"]["docker-mode"]) == "linuxkit" + + # The list of pruned fstype when checking the filesystem usage statistics. pruned_disk_types = frozenset([ "vfat", @@ -306,12 +334,20 @@ async def api_impl(container_id: str) -> float | None: cpu_usage = cast(float, nmget(ret, "cpu_stats.cpu_usage.total_usage", 0)) return cpu_usage / 1e6 - if ctx.mode == StatModes.CGROUP: - impl = sysfs_impl - elif ctx.mode == StatModes.DOCKER: - impl = api_impl - else: - raise RuntimeError("should not reach here") + async def cgroup_first_impl(container_id: str) -> float | None: + cpu_used = await sysfs_impl(container_id) + if cpu_used is None: + _warn_cgroup_fallback_once("CPUPlugin", container_id) + return await api_impl(container_id) + return cpu_used + + match ctx.mode: + case StatModes.CGROUP if not _is_linuxkit(self.local_config): + impl = cgroup_first_impl + case StatModes.CGROUP | StatModes.DOCKER: + impl = api_impl + case _: + assert_never(ctx.mode) tasks = [] for cid in container_ids: @@ -819,12 +855,22 @@ async def api_impl( scratch_sz, ) - if ctx.mode == StatModes.CGROUP: - impl = sysfs_impl - elif ctx.mode == StatModes.DOCKER: - impl = api_impl - else: - raise RuntimeError("should not reach here") + async def cgroup_first_impl( + container_id: str, + ) -> tuple[int, int, int, int, int, int, int] | None: + result = await sysfs_impl(container_id) + if result is None: + _warn_cgroup_fallback_once("MemoryPlugin", container_id) + return await api_impl(container_id) + return result + + match ctx.mode: + case StatModes.CGROUP if not _is_linuxkit(self.local_config): + impl = cgroup_first_impl + case StatModes.CGROUP | StatModes.DOCKER: + impl = api_impl + case _: + assert_never(ctx.mode) per_container_mem_used_bytes = {} per_container_io_read_bytes = {} diff --git a/tests/unit/agent/test_docker_intrinsic.py b/tests/unit/agent/test_docker_intrinsic.py index e28f3816330..2bae5f6f0de 100644 --- a/tests/unit/agent/test_docker_intrinsic.py +++ b/tests/unit/agent/test_docker_intrinsic.py @@ -10,10 +10,12 @@ import pytest +from ai.backend.agent.docker import intrinsic from ai.backend.agent.docker.intrinsic import ( ContainerNetStat, CPUPlugin, MemoryPlugin, + _warn_cgroup_fallback_once, read_proc_net_dev, ) from ai.backend.agent.stats import StatModes @@ -139,6 +141,40 @@ async def test_sysfs_mode_does_not_use_docker( await cpu_plugin.gather_container_measures(cpu_cgroup_context, container_ids) mock_docker_cls.assert_not_called() + async def test_cgroup_mode_falls_back_to_api_on_sysfs_failure( + self, + cpu_plugin: CPUPlugin, + container_ids: list[str], + cgroup_stat_context: MagicMock, + mock_fetch_api_stats: MagicMock, + ) -> None: + """When sysfs read fails in CGROUP mode, the Docker API is used + as a per-read fallback instead of silently returning zero.""" + # Arrange: cgroup version that triggers "return None" in sysfs_impl. + cgroup_stat_context.agent.docker_info = {"CgroupVersion": "invalid"} + cgroup_stat_context.agent.get_cgroup_path = MagicMock(return_value=MagicMock()) + + results = await cpu_plugin.gather_container_measures(cgroup_stat_context, container_ids) + + assert mock_fetch_api_stats.call_count == len(container_ids) + # api_impl returns cpu_usage = 1_000_000_000 ns / 1e6 = 1000 msec + for cid in container_ids: + assert results[0].per_container[cid].value == 1000 + + async def test_linuxkit_forces_api_even_in_cgroup_mode( + self, + container_ids: list[str], + cpu_cgroup_context: MagicMock, + mock_fetch_api_stats: MagicMock, + ) -> None: + """On linuxkit hosts the API path is used even when mode is CGROUP.""" + plugin = CPUPlugin.__new__(CPUPlugin) + plugin.local_config = {"agent": {"docker-mode": "linuxkit"}} + plugin._docker = AsyncMock() + + await plugin.gather_container_measures(cpu_cgroup_context, container_ids) + assert mock_fetch_api_stats.call_count == len(container_ids) + class TestMemoryPluginDockerClientLifecycle(BaseDockerIntrinsicTest): """Tests for MemoryPlugin Docker client lifecycle management.""" @@ -243,6 +279,79 @@ async def test_sysfs_mode_uses_instance_docker_client( await memory_plugin.gather_container_measures(memory_cgroup_context, container_ids) mock_docker_cls.assert_not_called() + async def test_cgroup_mode_falls_back_to_api_on_sysfs_failure( + self, + memory_plugin: MemoryPlugin, + container_ids: list[str], + cgroup_stat_context: MagicMock, + mock_fetch_api_stats: MagicMock, + ) -> None: + """When sysfs read fails in CGROUP mode, the Docker API is used + as a per-read fallback instead of silently returning zero.""" + # Arrange: cgroup version that triggers "return None" in sysfs_impl. + cgroup_stat_context.agent.get_cgroup_version = MagicMock(return_value="invalid") + cgroup_stat_context.agent.get_cgroup_path = MagicMock(return_value=MagicMock()) + + results = await memory_plugin.gather_container_measures(cgroup_stat_context, container_ids) + + assert mock_fetch_api_stats.call_count == len(container_ids) + # api_impl returns mem_cur = 1024 * 1024 * 100 = 104857600 bytes + for cid in container_ids: + assert results[0].per_container[cid].value == 1024 * 1024 * 100 + + async def test_linuxkit_forces_api_even_in_cgroup_mode( + self, + container_ids: list[str], + cgroup_stat_context: MagicMock, + mock_fetch_api_stats: MagicMock, + ) -> None: + """On linuxkit hosts the API path is used even when mode is CGROUP.""" + plugin = MemoryPlugin.__new__(MemoryPlugin) + plugin.local_config = {"agent": {"docker-mode": "linuxkit"}} + plugin._docker = AsyncMock() + + await plugin.gather_container_measures(cgroup_stat_context, container_ids) + assert mock_fetch_api_stats.call_count == len(container_ids) + + +class TestWarnCgroupFallbackOnce: + """Tests for _warn_cgroup_fallback_once() dedup and bounded-cache semantics.""" + + @pytest.fixture(autouse=True) + def _reset_warn_cache(self) -> Generator[None, None, None]: + """Reset the module-level warn cache between tests to avoid bleed-through.""" + intrinsic._cgroup_fallback_warned.clear() + yield + intrinsic._cgroup_fallback_warned.clear() + + def test_deduplicates_per_container( + self, + caplog: pytest.LogCaptureFixture, + ) -> None: + """The helper logs once per (plugin, container_id) and stays silent on + subsequent calls for the same container.""" + with caplog.at_level("WARNING", logger="ai.backend.agent.docker.intrinsic"): + _warn_cgroup_fallback_once("CPUPlugin", "container_abc") + _warn_cgroup_fallback_once("CPUPlugin", "container_abc") + _warn_cgroup_fallback_once("CPUPlugin", "container_abc") + + warn_records = [r for r in caplog.records if r.levelname == "WARNING"] + assert len(warn_records) == 1 + + # A different container should still warn. + caplog.clear() + with caplog.at_level("WARNING", logger="ai.backend.agent.docker.intrinsic"): + _warn_cgroup_fallback_once("CPUPlugin", "container_xyz") + warn_records = [r for r in caplog.records if r.levelname == "WARNING"] + assert len(warn_records) == 1 + + # Same container under a different plugin namespace should also warn once. + caplog.clear() + with caplog.at_level("WARNING", logger="ai.backend.agent.docker.intrinsic"): + _warn_cgroup_fallback_once("MemoryPlugin", "container_abc") + warn_records = [r for r in caplog.records if r.levelname == "WARNING"] + assert len(warn_records) == 1 + class TestMemoryPluginContainerPidValidation(BaseDockerIntrinsicTest): """Tests for container PID validation before reading /proc/[pid]/net/dev."""