Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/11219.enhance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Stream container stats from Docker via a long-lived reader instead of polling `container.stats(stream=False)` per collection cycle.
125 changes: 110 additions & 15 deletions src/ai/backend/agent/docker/intrinsic.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ def read_netns_net_dev(ns_path: Path) -> ContainerNetStat:
return _parse_proc_net_dev(content)


def _validate_stats_entry(entry: dict[str, Any]) -> dict[str, Any] | None:
if entry["read"].startswith("0001-01-01") or entry["preread"].startswith("0001-01-01"):
return None
return entry


async def fetch_api_stats(container: DockerContainer) -> dict[str, Any] | None:
short_cid = container.id[:7]
try:
Expand All @@ -149,7 +155,7 @@ async def fetch_api_stats(container: DockerContainer) -> dict[str, Any] | None:
)
return None
else:
entry = {"read": "0001-01-01"}
entry: dict[str, Any] = {"read": "0001-01-01"}
# aiodocker 0.16 or later returns a list of dict, even when not streaming.
match ret:
case list() if ret:
Expand All @@ -164,9 +170,102 @@ async def fetch_api_stats(container: DockerContainer) -> dict[str, Any] | None:
ret,
)
return None
if entry["read"].startswith("0001-01-01") or entry["preread"].startswith("0001-01-01"):
return _validate_stats_entry(entry)


class DockerStatsStreamer:
"""
Maintains one long-lived `container.stats(stream=True)` reader per container and
exposes the most recent decoded sample from an in-memory cache.

Callers read the cached sample via :meth:`get_latest` instead of issuing a new
HTTP round-trip every collection cycle. The first call for a previously-unseen
container lazily spawns the reader; no sample is returned until dockerd has
emitted at least one frame.

Readers self-terminate when the upstream iterator ends (container removed) or
raises a connection error. A subsequent :meth:`get_latest` call starts a fresh
reader if the container reappears.

TODO(#11219): Wire start/stop into the agent's container-lifecycle hooks
(_handle_start_event / _handle_clean_event) to avoid relying on lazy startup.
"""

_docker: Docker
_latest: dict[str, dict[str, Any]]
_tasks: dict[str, asyncio.Task[None]]
_closed: bool

def __init__(self, docker: Docker) -> None:
self._docker = docker
self._latest = {}
self._tasks = {}
self._closed = False

def get_latest(self, container_id: str) -> dict[str, Any] | None:
"""Return the most recent cached sample for `container_id`, starting a
reader if none is running. Returns None until the first frame arrives."""
if self._closed:
return None
return entry
task = self._tasks.get(container_id)
if task is None or task.done():
self._tasks[container_id] = asyncio.create_task(
self._read_stream(container_id),
name=f"docker-stats-stream:{container_id[:7]}",
)
return self._latest.get(container_id)
Comment on lines +146 to +207
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DockerStatsStreamer introduces new lifecycle/caching behavior (lazy task spawn, stream termination handling, close() cancellation), but the unit tests only mock it rather than exercising its real behavior. Adding focused tests for get_latest()/close() (and for stream termination cleaning up internal state like _tasks) would help prevent regressions and catch resource-leak scenarios early.

Copilot uses AI. Check for mistakes.

async def stop(self, container_id: str) -> None:
task = self._tasks.pop(container_id, None)
self._latest.pop(container_id, None)
if task is not None and not task.done():
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass

async def close(self) -> None:
self._closed = True
tasks = list(self._tasks.values())
self._tasks.clear()
self._latest.clear()
for task in tasks:
if not task.done():
task.cancel()
for task in tasks:
try:
await task
except (asyncio.CancelledError, Exception):
pass

async def _read_stream(self, container_id: str) -> None:
short_cid = container_id[:7]
container = DockerContainer(self._docker, id=container_id)
try:
async for frame in container.stats(stream=True):
validated = _validate_stats_entry(frame)
if validated is not None:
self._latest[container_id] = validated
except asyncio.CancelledError:
raise
except RuntimeError as e:
msg = str(e.args[0]).lower() if e.args else ""
if "event loop is closed" in msg or "session is closed" in msg:
return
log.warning(
"stats stream stopped unexpectedly (cid:{}): {!r}",
short_cid,
e,
)
except (DockerError, aiohttp.ClientError) as e:
log.debug(
"stats stream ended (cid:{}): {!r}",
short_cid,
e,
)
finally:
self._latest.pop(container_id, None)
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DockerStatsStreamer._read_stream() removes _latest in finally, but it never removes the completed task from _tasks. If containers churn, _tasks will grow unbounded with done tasks (and possibly their exception tracebacks), which is effectively a memory leak. Consider popping container_id from _tasks in finally (or attaching a task.add_done_callback to self-clean), and/or calling stop() when the stream ends.

Suggested change
self._latest.pop(container_id, None)
self._latest.pop(container_id, None)
current_task = asyncio.current_task()
if current_task is not None and self._tasks.get(container_id) is current_task:
self._tasks.pop(container_id, None)

Copilot uses AI. Check for mistakes.


# Pseudo-plugins for intrinsic devices (CPU and the main memory)
Expand All @@ -189,11 +288,14 @@ class CPUPlugin(AbstractComputePlugin):
]

_docker: Docker
_stats_streamer: DockerStatsStreamer

async def init(self, context: Any | None = None) -> None:
self._docker = Docker()
self._stats_streamer = DockerStatsStreamer(self._docker)

async def cleanup(self) -> None:
await self._stats_streamer.close()
await self._docker.close()
Comment on lines 355 to 362
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CPUPlugin creates its own DockerStatsStreamer (and its own Docker client). Since MemoryPlugin does the same, this will open two long-lived stats(stream=True) connections per container (one per plugin), which can negate the intended resource savings and put unnecessary load on dockerd for hosts with many containers. Consider sharing a single streamer (and ideally a single Docker client) across intrinsic plugins, e.g., via an agent-level/shared context or a module-level singleton keyed by the underlying Docker client.

Copilot uses AI. Check for mistakes.

async def update_plugin_config(self, new_plugin_config: Mapping[str, Any]) -> None:
Expand Down Expand Up @@ -295,12 +397,7 @@ async def sysfs_impl(container_id: str) -> float | None:
return cpu_used

async def api_impl(container_id: str) -> float | None:
container = DockerContainer(self._docker, id=container_id)
try:
async with asyncio.timeout(_CONTAINER_STAT_TIMEOUT):
ret = await fetch_api_stats(container)
except TimeoutError:
return None
ret = self._stats_streamer.get_latest(container_id)
if ret is None:
return None
cpu_usage = cast(float, nmget(ret, "cpu_stats.cpu_usage.total_usage", 0))
Expand Down Expand Up @@ -516,11 +613,14 @@ class MemoryPlugin(AbstractComputePlugin):
]

_docker: Docker
_stats_streamer: DockerStatsStreamer

async def init(self, context: Any | None = None) -> None:
self._docker = Docker()
self._stats_streamer = DockerStatsStreamer(self._docker)

async def cleanup(self) -> None:
await self._stats_streamer.close()
await self._docker.close()
Comment on lines 688 to 695
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemoryPlugin also instantiates its own DockerStatsStreamer, which (together with CPUPlugin’s streamer) results in duplicate per-container stats streams. To avoid 2× open connections per container, consider sharing a single streamer across intrinsic plugins (and/or reusing a shared Docker client).

Copilot uses AI. Check for mistakes.

async def update_plugin_config(self, new_plugin_config: Mapping[str, Any]) -> None:
Expand Down Expand Up @@ -785,12 +885,7 @@ async def sysfs_impl(
async def api_impl(
container_id: str,
) -> tuple[int, int, int, int, int, int, int] | None:
container = DockerContainer(self._docker, id=container_id)
try:
async with asyncio.timeout(_CONTAINER_STAT_TIMEOUT):
ret = await fetch_api_stats(container)
except TimeoutError:
return None
ret = self._stats_streamer.get_latest(container_id)
if ret is None:
return None
mem_cur_bytes = nmget(ret, "memory_stats.usage", 0)
Expand Down
21 changes: 17 additions & 4 deletions tests/unit/agent/test_docker_intrinsic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ai.backend.agent.docker.intrinsic import (
ContainerNetStat,
CPUPlugin,
DockerStatsStreamer,
MemoryPlugin,
read_proc_net_dev,
)
Expand Down Expand Up @@ -76,15 +77,24 @@ def mock_fetch_api_stats(
) as mock:
yield mock

def _make_prewarmed_streamer(
self,
sample: dict[str, Any] | None,
) -> MagicMock:
streamer = MagicMock(spec=DockerStatsStreamer)
streamer.get_latest = MagicMock(return_value=sample)
return streamer


class TestCPUPluginDockerClientLifecycle(BaseDockerIntrinsicTest):
"""Tests for CPUPlugin Docker client lifecycle management."""

@pytest.fixture
def cpu_plugin(self) -> CPUPlugin:
def cpu_plugin(self, docker_stats_response: dict[str, Any]) -> CPUPlugin:
plugin = CPUPlugin.__new__(CPUPlugin)
plugin.local_config = {"agent": {"docker-mode": "default"}}
plugin._docker = AsyncMock()
plugin._stats_streamer = self._make_prewarmed_streamer(docker_stats_response)
return plugin

@pytest.fixture
Expand Down Expand Up @@ -144,10 +154,11 @@ class TestMemoryPluginDockerClientLifecycle(BaseDockerIntrinsicTest):
"""Tests for MemoryPlugin Docker client lifecycle management."""

@pytest.fixture
def memory_plugin(self) -> MemoryPlugin:
def memory_plugin(self, docker_stats_response: dict[str, Any]) -> MemoryPlugin:
plugin = MemoryPlugin.__new__(MemoryPlugin)
plugin.local_config = {"agent": {"docker-mode": "default"}}
plugin._docker = AsyncMock()
plugin._stats_streamer = self._make_prewarmed_streamer(docker_stats_response)
return plugin

@pytest.fixture
Expand Down Expand Up @@ -248,10 +259,11 @@ class TestMemoryPluginContainerPidValidation(BaseDockerIntrinsicTest):
"""Tests for container PID validation before reading /proc/[pid]/net/dev."""

@pytest.fixture
def memory_plugin(self) -> MemoryPlugin:
def memory_plugin(self, docker_stats_response: dict[str, Any]) -> MemoryPlugin:
plugin = MemoryPlugin.__new__(MemoryPlugin)
plugin.local_config = {"agent": {"docker-mode": "default"}}
plugin._docker = AsyncMock()
plugin._stats_streamer = self._make_prewarmed_streamer(docker_stats_response)
return plugin

@contextmanager
Expand Down Expand Up @@ -387,10 +399,11 @@ class TestMemoryPluginSysfsTimeoutAndErrorIsolation(BaseDockerIntrinsicTest):
"""Tests for timeout protection and error isolation in MemoryPlugin sysfs_impl."""

@pytest.fixture
def memory_plugin(self) -> MemoryPlugin:
def memory_plugin(self, docker_stats_response: dict[str, Any]) -> MemoryPlugin:
plugin = MemoryPlugin.__new__(MemoryPlugin)
plugin.local_config = {"agent": {"docker-mode": "default"}}
plugin._docker = AsyncMock()
plugin._stats_streamer = self._make_prewarmed_streamer(docker_stats_response)
return plugin

@pytest.fixture
Expand Down
Loading