Skip to content

Commit dda704d

Browse files
rapsealkclaude
andcommitted
refactor(agent): Flag streamer follow-ups and cover 404 cleanup path
- Add TODO(#11223) notes on CPU/Memory api_impl paths: after sysfs-first lands, the Docker stream on these plugins is only needed for network and blkio (which neither consumes), so the stream should migrate to a network/IO consumer. - Add TODO(#11232) at the two per-plugin DockerStatsStreamer instantiation sites flagging the shared-streamer refactor. - Add a test covering the DockerError 404 branch of _read_stream: reader exits cleanly on container-gone without spinning retries. Refs #11219 Refs #11223 Refs #11224 Refs #11232 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 339d6a8 commit dda704d

2 files changed

Lines changed: 61 additions & 0 deletions

File tree

src/ai/backend/agent/docker/intrinsic.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ class CPUPlugin(AbstractComputePlugin):
356356

357357
async def init(self, context: Any | None = None) -> None:
358358
self._docker = Docker()
359+
# TODO(#11232): Consolidate per-plugin streamer into a single shared instance owned by the agent.
359360
self._stats_streamer = DockerStatsStreamer(self._docker)
360361

361362
async def cleanup(self) -> None:
@@ -466,6 +467,7 @@ async def sysfs_impl(container_id: str) -> float | None:
466467
return None
467468
return cpu_used
468469

470+
# TODO(#11223): After sysfs-first lands, CPU is sourced from sysfs and this stream becomes redundant here; migrate to a network/IO consumer.
469471
async def api_impl(container_id: str) -> float | None:
470472
ret = self._stats_streamer.get_latest(container_id)
471473
if ret is None:
@@ -687,6 +689,7 @@ class MemoryPlugin(AbstractComputePlugin):
687689

688690
async def init(self, context: Any | None = None) -> None:
689691
self._docker = Docker()
692+
# TODO(#11232): Consolidate per-plugin streamer into a single shared instance owned by the agent.
690693
self._stats_streamer = DockerStatsStreamer(self._docker)
691694

692695
async def cleanup(self) -> None:
@@ -958,6 +961,7 @@ async def sysfs_impl(
958961
scratch_sz,
959962
)
960963

964+
# TODO(#11223): After sysfs-first lands, memory is sourced from sysfs and this stream becomes redundant here; migrate to a network/IO consumer.
961965
async def api_impl(
962966
container_id: str,
963967
) -> tuple[int, int, int, int, int, int, int] | None:

tests/unit/agent/test_docker_intrinsic.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import logging
45
from collections.abc import Generator
56
from contextlib import contextmanager
67
from dataclasses import dataclass
@@ -10,6 +11,7 @@
1011

1112
import aiohttp
1213
import pytest
14+
from aiodocker.exceptions import DockerError
1315

1416
from ai.backend.agent.agent import AbstractAgent
1517
from ai.backend.agent.docker.intrinsic import (
@@ -820,6 +822,61 @@ def fake_container_cls(docker: Any, id: str) -> _FakeDockerContainer:
820822
assert task.done()
821823
assert task.cancelled() or task.exception() is None
822824

825+
async def test_reader_exits_cleanly_on_container_gone_404(
826+
self,
827+
caplog: pytest.LogCaptureFixture,
828+
) -> None:
829+
"""DockerError 404 from the stats stream means the container is gone:
830+
the reader must exit cleanly (no retry spin, no warning-level log)."""
831+
832+
call_count = 0
833+
834+
async def frames(_cid: str) -> Any:
835+
nonlocal call_count
836+
call_count += 1
837+
# Raise on the very first iteration to simulate an already-gone
838+
# container. A no-op ``yield`` in an unreachable branch keeps this
839+
# function an async generator without tripping ``unreachable`` mypy.
840+
if call_count < 0:
841+
yield {}
842+
raise DockerError(404, {"message": "No such container"})
843+
844+
def fake_container_cls(docker: Any, id: str) -> _FakeDockerContainer:
845+
return _FakeDockerContainer(frames, id)
846+
847+
with (
848+
patch(
849+
"ai.backend.agent.docker.intrinsic.DockerContainer",
850+
side_effect=fake_container_cls,
851+
),
852+
caplog.at_level(logging.DEBUG, logger="ai.backend.agent.docker.intrinsic"),
853+
):
854+
streamer = DockerStatsStreamer(AsyncMock())
855+
streamer.start("cid_gone")
856+
task = streamer._tasks["cid_gone"]
857+
# The reader must exit on its own; 404 must NOT spin in the retry loop.
858+
await asyncio.wait_for(task, timeout=2.0)
859+
assert task.done()
860+
assert not task.cancelled()
861+
assert task.exception() is None
862+
# Only one stream-open attempt; no reconnect/retry on 404.
863+
assert call_count == 1
864+
# ``get_latest`` may re-spawn a reader as a safety net, but the
865+
# cached sample was dropped in the reader's finally block and the
866+
# respawned reader will hit the same 404 path, so the return value
867+
# is still ``None``.
868+
assert streamer._latest.get("cid_gone") is None
869+
await streamer.close()
870+
# 404 is a debug-level path; container-gone is not an error.
871+
intrinsic_records = [
872+
r for r in caplog.records if r.name == "ai.backend.agent.docker.intrinsic"
873+
]
874+
for record in intrinsic_records:
875+
assert record.levelno < logging.WARNING, (
876+
f"unexpected warning-or-above log: {record.levelname} {record.getMessage()}"
877+
)
878+
assert streamer.get_latest("cid_gone") is None
879+
823880

824881
class _AgentStub:
825882
"""Minimal stand-in exposing only the attributes the notify helpers read."""

0 commit comments

Comments
 (0)