Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changes/11223.enhance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Default to cgroup (sysfs) stat collection on native Linux hosts, falling back to the Docker API only on linuxkit or read failure.
Note: reported container memory usage may step down on hosts previously using `stats-type: docker`, because sysfs excludes inactive file cache (matching `docker stats`).
Block-I/O readings may also shift on cgroup v1 hosts, because the sysfs path reads `blkio.throttle.io_service_bytes` while the Docker API path reads `blkio_stats.io_service_bytes_recursive` (which sums across nested cgroups).
Dashboards or autoscaling thresholds tuned to the old values should be re-evaluated after upgrade.
10 changes: 6 additions & 4 deletions src/ai/backend/agent/config/unified.py
Original file line number Diff line number Diff line change
Expand Up @@ -1335,16 +1335,18 @@ class OverridableContainerConfig(BaseConfigSchema):
stats_type: Annotated[
StatModes | None,
Field(
default=StatModes.DOCKER,
default=None,
validation_alias=AliasChoices("stats-type", "stats_type"),
serialization_alias="stats-type",
),
BackendAIConfigMeta(
description=(
"Method for collecting container resource statistics. "
"'docker' uses Docker's stats API (most compatible). "
"'cgroup' reads from cgroup filesystem directly (more accurate, requires root). "
"'null' disables statistics collection."
"'cgroup' reads from cgroup filesystem directly (faster and more accurate, "
"requires root on Linux). "
"When unset, the agent auto-selects: cgroup on native Linux hosts "
"(with Docker API fallback on read failure) and docker elsewhere."
Comment on lines 1344 to +1349
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says there are no changes to network/IO stat collection, but switching the default stats_type to auto-select cgroup on native Linux will cause MemoryPlugin.gather_container_measures() to use its sysfs implementation by default (which collects io/net via cgroup + /proc), rather than Docker stats API. If the intent is to keep network/IO coming from the Docker stats API by default, the mode selection likely needs to be split (CPU/memory via cgroup, net/IO via API) or the PR description should be updated to reflect the behavior change.

Copilot uses AI. Check for mistakes.
),
added_version="25.12.0",
example=ConfigExample(local="docker", prod="docker"),
Expand Down Expand Up @@ -1531,7 +1533,7 @@ def _validate_sandbox_type(cls, sandbox_type: ContainerSandboxType) -> Container

@field_validator("stats_type", mode="after")
@classmethod
def _validate_stats_type(cls, stats_type: StatModes) -> StatModes:
def _validate_stats_type(cls, stats_type: StatModes | None) -> StatModes | None:
if stats_type == StatModes.CGROUP:
if os.getuid() != 0:
raise ValueError(
Expand Down
72 changes: 59 additions & 13 deletions src/ai/backend/agent/docker/intrinsic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from collections.abc import Collection, Iterable, Mapping, Sequence
from decimal import Decimal
from pathlib import Path
from typing import Any, cast
from typing import Any, assert_never, cast

import aiohttp
import psutil
from aiodocker.docker import Docker, DockerContainer
from aiodocker.exceptions import DockerError
from cachetools import LRUCache

from ai.backend.agent import __version__ # pants: no-infer-dep
from ai.backend.agent.alloc_map import AllocationStrategy
Expand Down Expand Up @@ -73,6 +74,33 @@
_CONTAINER_STAT_TIMEOUT: float = 2.0
_INVALID_PID: int = 0

# Tracks containers for which we have already logged a cgroup->Docker-API
# fallback warning, to avoid log spam on persistent read failures.
# Bounded LRU cache to prevent unbounded growth on hosts with high container churn;
# oldest entries are evicted on overflow, so a recreated container may re-warn.
# Shared across CPUPlugin / MemoryPlugin since keys are namespaced via the `plugin:` prefix.
_CGROUP_FALLBACK_WARN_CACHE_SIZE = 1024
_cgroup_fallback_warned: LRUCache[str, None] = LRUCache(
maxsize=_CGROUP_FALLBACK_WARN_CACHE_SIZE,
)


def _warn_cgroup_fallback_once(plugin: str, container_id: str) -> None:
key = f"{plugin}:{container_id}"
if key in _cgroup_fallback_warned:
return
_cgroup_fallback_warned[key] = None
log.warning(
"{0}: cgroup sysfs read failed for container {1}; falling back to Docker API",
plugin,
container_id[:7],
)
Comment on lines +77 to +97
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_cgroup_fallback_warned is a process-global set that only grows and is never pruned. On agents with high container churn this can become an unbounded memory sink over time. Consider bounding it (e.g., LRU/TTL cache) or clearing entries when containers are removed / after some time window.

Copilot uses AI. Check for mistakes.

Comment on lines +88 to +98
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_warn_cgroup_fallback_once() is meant to prevent log spam on persistent sysfs failures, but sysfs_impl() already logs a warning on every OSError before returning None. With the new fallback, persistent failures will still spam the existing sysfs warning (and add one more warning once). Consider moving the per-container warning solely to the fallback layer (or otherwise suppressing/reducing repeated sysfs warnings) so the anti-spam behavior is effective.

Copilot uses AI. Check for mistakes.

def _is_linuxkit(local_config: Mapping[str, Any]) -> bool:
return cast(str, local_config["agent"]["docker-mode"]) == "linuxkit"


# The list of pruned fstype when checking the filesystem usage statistics.
pruned_disk_types = frozenset([
"vfat",
Expand Down Expand Up @@ -306,12 +334,20 @@ async def api_impl(container_id: str) -> float | None:
cpu_usage = cast(float, nmget(ret, "cpu_stats.cpu_usage.total_usage", 0))
return cpu_usage / 1e6

if ctx.mode == StatModes.CGROUP:
impl = sysfs_impl
elif ctx.mode == StatModes.DOCKER:
impl = api_impl
else:
raise RuntimeError("should not reach here")
async def cgroup_first_impl(container_id: str) -> float | None:
cpu_used = await sysfs_impl(container_id)
if cpu_used is None:
_warn_cgroup_fallback_once("CPUPlugin", container_id)
return await api_impl(container_id)
return cpu_used

match ctx.mode:
case StatModes.CGROUP if not _is_linuxkit(self.local_config):
impl = cgroup_first_impl
case StatModes.CGROUP | StatModes.DOCKER:
impl = api_impl
case _:
assert_never(ctx.mode)

tasks = []
for cid in container_ids:
Expand Down Expand Up @@ -819,12 +855,22 @@ async def api_impl(
scratch_sz,
)

if ctx.mode == StatModes.CGROUP:
impl = sysfs_impl
elif ctx.mode == StatModes.DOCKER:
impl = api_impl
else:
raise RuntimeError("should not reach here")
async def cgroup_first_impl(
container_id: str,
) -> tuple[int, int, int, int, int, int, int] | None:
result = await sysfs_impl(container_id)
if result is None:
_warn_cgroup_fallback_once("MemoryPlugin", container_id)
return await api_impl(container_id)
return result

match ctx.mode:
case StatModes.CGROUP if not _is_linuxkit(self.local_config):
impl = cgroup_first_impl
case StatModes.CGROUP | StatModes.DOCKER:
impl = api_impl
case _:
assert_never(ctx.mode)

per_container_mem_used_bytes = {}
per_container_io_read_bytes = {}
Expand Down
109 changes: 109 additions & 0 deletions tests/unit/agent/test_docker_intrinsic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

import pytest

from ai.backend.agent.docker import intrinsic
from ai.backend.agent.docker.intrinsic import (
ContainerNetStat,
CPUPlugin,
MemoryPlugin,
_warn_cgroup_fallback_once,
read_proc_net_dev,
)
from ai.backend.agent.stats import StatModes
Expand Down Expand Up @@ -139,6 +141,40 @@ async def test_sysfs_mode_does_not_use_docker(
await cpu_plugin.gather_container_measures(cpu_cgroup_context, container_ids)
mock_docker_cls.assert_not_called()

async def test_cgroup_mode_falls_back_to_api_on_sysfs_failure(
self,
cpu_plugin: CPUPlugin,
container_ids: list[str],
cgroup_stat_context: MagicMock,
mock_fetch_api_stats: MagicMock,
) -> None:
"""When sysfs read fails in CGROUP mode, the Docker API is used
as a per-read fallback instead of silently returning zero."""
# Arrange: cgroup version that triggers "return None" in sysfs_impl.
cgroup_stat_context.agent.docker_info = {"CgroupVersion": "invalid"}
cgroup_stat_context.agent.get_cgroup_path = MagicMock(return_value=MagicMock())

results = await cpu_plugin.gather_container_measures(cgroup_stat_context, container_ids)

assert mock_fetch_api_stats.call_count == len(container_ids)
# api_impl returns cpu_usage = 1_000_000_000 ns / 1e6 = 1000 msec
for cid in container_ids:
assert results[0].per_container[cid].value == 1000

async def test_linuxkit_forces_api_even_in_cgroup_mode(
self,
container_ids: list[str],
cpu_cgroup_context: MagicMock,
mock_fetch_api_stats: MagicMock,
) -> None:
"""On linuxkit hosts the API path is used even when mode is CGROUP."""
plugin = CPUPlugin.__new__(CPUPlugin)
plugin.local_config = {"agent": {"docker-mode": "linuxkit"}}
plugin._docker = AsyncMock()

await plugin.gather_container_measures(cpu_cgroup_context, container_ids)
assert mock_fetch_api_stats.call_count == len(container_ids)


class TestMemoryPluginDockerClientLifecycle(BaseDockerIntrinsicTest):
"""Tests for MemoryPlugin Docker client lifecycle management."""
Expand Down Expand Up @@ -243,6 +279,79 @@ async def test_sysfs_mode_uses_instance_docker_client(
await memory_plugin.gather_container_measures(memory_cgroup_context, container_ids)
mock_docker_cls.assert_not_called()

async def test_cgroup_mode_falls_back_to_api_on_sysfs_failure(
self,
memory_plugin: MemoryPlugin,
container_ids: list[str],
cgroup_stat_context: MagicMock,
mock_fetch_api_stats: MagicMock,
) -> None:
"""When sysfs read fails in CGROUP mode, the Docker API is used
as a per-read fallback instead of silently returning zero."""
# Arrange: cgroup version that triggers "return None" in sysfs_impl.
cgroup_stat_context.agent.get_cgroup_version = MagicMock(return_value="invalid")
cgroup_stat_context.agent.get_cgroup_path = MagicMock(return_value=MagicMock())

results = await memory_plugin.gather_container_measures(cgroup_stat_context, container_ids)

assert mock_fetch_api_stats.call_count == len(container_ids)
# api_impl returns mem_cur = 1024 * 1024 * 100 = 104857600 bytes
for cid in container_ids:
assert results[0].per_container[cid].value == 1024 * 1024 * 100

async def test_linuxkit_forces_api_even_in_cgroup_mode(
self,
container_ids: list[str],
cgroup_stat_context: MagicMock,
mock_fetch_api_stats: MagicMock,
) -> None:
"""On linuxkit hosts the API path is used even when mode is CGROUP."""
plugin = MemoryPlugin.__new__(MemoryPlugin)
plugin.local_config = {"agent": {"docker-mode": "linuxkit"}}
plugin._docker = AsyncMock()

await plugin.gather_container_measures(cgroup_stat_context, container_ids)
assert mock_fetch_api_stats.call_count == len(container_ids)


class TestWarnCgroupFallbackOnce:
"""Tests for _warn_cgroup_fallback_once() dedup and bounded-cache semantics."""

@pytest.fixture(autouse=True)
def _reset_warn_cache(self) -> Generator[None, None, None]:
"""Reset the module-level warn cache between tests to avoid bleed-through."""
intrinsic._cgroup_fallback_warned.clear()
yield
intrinsic._cgroup_fallback_warned.clear()

def test_deduplicates_per_container(
self,
caplog: pytest.LogCaptureFixture,
) -> None:
"""The helper logs once per (plugin, container_id) and stays silent on
subsequent calls for the same container."""
with caplog.at_level("WARNING", logger="ai.backend.agent.docker.intrinsic"):
_warn_cgroup_fallback_once("CPUPlugin", "container_abc")
_warn_cgroup_fallback_once("CPUPlugin", "container_abc")
_warn_cgroup_fallback_once("CPUPlugin", "container_abc")

warn_records = [r for r in caplog.records if r.levelname == "WARNING"]
assert len(warn_records) == 1

# A different container should still warn.
caplog.clear()
with caplog.at_level("WARNING", logger="ai.backend.agent.docker.intrinsic"):
_warn_cgroup_fallback_once("CPUPlugin", "container_xyz")
warn_records = [r for r in caplog.records if r.levelname == "WARNING"]
assert len(warn_records) == 1

# Same container under a different plugin namespace should also warn once.
caplog.clear()
with caplog.at_level("WARNING", logger="ai.backend.agent.docker.intrinsic"):
_warn_cgroup_fallback_once("MemoryPlugin", "container_abc")
warn_records = [r for r in caplog.records if r.levelname == "WARNING"]
assert len(warn_records) == 1


class TestMemoryPluginContainerPidValidation(BaseDockerIntrinsicTest):
"""Tests for container PID validation before reading /proc/[pid]/net/dev."""
Expand Down
Loading