refactor(BA-5859): Stream container stats from Docker instead of polling#11224
refactor(BA-5859): Stream container stats from Docker instead of polling#11224rapsealk wants to merge 6 commits into
Conversation
Callers of the Docker API stats path (CPUPlugin and MemoryPlugin in DOCKER mode) now read the most recent sample from an in-memory cache kept up-to-date by a long-lived `container.stats(stream=True)` reader per container, instead of issuing a fresh `stats(stream=False)` HTTP round-trip every collection cycle. This cuts per-container stat latency from ~100-500 ms to a memory read and removes the 2 s per-container timeout as a serial-blocker. Readers self-terminate when the upstream iterator ends or a connection error occurs; a subsequent lookup respawns the reader if the container reappears. Full close is tied to plugin cleanup. Closes #11219 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR improves Docker-mode container CPU/memory metric collection by switching from per-cycle container.stats(stream=False) polling to an in-process cache populated by long-lived stats(stream=True) readers, reducing per-container latency and avoiding serial blocking on timeouts.
Changes:
- Add
DockerStatsStreamerto maintain a per-container streamed stats reader and cache the latest sample in memory. - Update
CPUPluginandMemoryPluginto read stats from the streamer cache instead of callingfetch_api_stats()each cycle. - Update unit tests to inject a mocked/prewarmed stats streamer into the intrinsic plugins.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
src/ai/backend/agent/docker/intrinsic.py |
Introduces DockerStatsStreamer, wires it into CPU/Memory intrinsic plugins, and refactors the API stats path to read from cache. |
tests/unit/agent/test_docker_intrinsic.py |
Adjusts fixtures to provide a mocked DockerStatsStreamer so plugin tests don’t depend on live Docker stats calls. |
changes/11219.enhance.md |
Adds a changelog entry describing the streamed stats approach. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| e, | ||
| ) | ||
| finally: | ||
| self._latest.pop(container_id, None) |
There was a problem hiding this comment.
DockerStatsStreamer._read_stream() removes _latest in finally, but it never removes the completed task from _tasks. If containers churn, _tasks will grow unbounded with done tasks (and possibly their exception tracebacks), which is effectively a memory leak. Consider popping container_id from _tasks in finally (or attaching a task.add_done_callback to self-clean), and/or calling stop() when the stream ends.
| self._latest.pop(container_id, None) | |
| self._latest.pop(container_id, None) | |
| current_task = asyncio.current_task() | |
| if current_task is not None and self._tasks.get(container_id) is current_task: | |
| self._tasks.pop(container_id, None) |
| async def init(self, context: Any | None = None) -> None: | ||
| self._docker = Docker() | ||
| self._stats_streamer = DockerStatsStreamer(self._docker) | ||
|
|
||
| async def cleanup(self) -> None: | ||
| await self._stats_streamer.close() | ||
| await self._docker.close() |
There was a problem hiding this comment.
CPUPlugin creates its own DockerStatsStreamer (and its own Docker client). Since MemoryPlugin does the same, this will open two long-lived stats(stream=True) connections per container (one per plugin), which can negate the intended resource savings and put unnecessary load on dockerd for hosts with many containers. Consider sharing a single streamer (and ideally a single Docker client) across intrinsic plugins, e.g., via an agent-level/shared context or a module-level singleton keyed by the underlying Docker client.
| async def init(self, context: Any | None = None) -> None: | ||
| self._docker = Docker() | ||
| self._stats_streamer = DockerStatsStreamer(self._docker) | ||
|
|
||
| async def cleanup(self) -> None: | ||
| await self._stats_streamer.close() | ||
| await self._docker.close() |
There was a problem hiding this comment.
MemoryPlugin also instantiates its own DockerStatsStreamer, which (together with CPUPlugin’s streamer) results in duplicate per-container stats streams. To avoid 2× open connections per container, consider sharing a single streamer across intrinsic plugins (and/or reusing a shared Docker client).
| @@ -149,7 +155,7 @@ async def fetch_api_stats(container: DockerContainer) -> dict[str, Any] | None: | |||
| ) | |||
| return None | |||
| else: | |||
| entry = {"read": "0001-01-01"} | |||
| entry: dict[str, Any] = {"read": "0001-01-01"} | |||
| # aiodocker 0.16 or later returns a list of dict, even when not streaming. | |||
| match ret: | |||
| case list() if ret: | |||
| @@ -164,9 +170,102 @@ async def fetch_api_stats(container: DockerContainer) -> dict[str, Any] | None: | |||
| ret, | |||
| ) | |||
| return None | |||
| if entry["read"].startswith("0001-01-01") or entry["preread"].startswith("0001-01-01"): | |||
| return _validate_stats_entry(entry) | |||
|
|
|||
There was a problem hiding this comment.
fetch_api_stats() no longer appears to be used anywhere in the Docker agent path (call sites were replaced by DockerStatsStreamer.get_latest()), and repository-wide search shows no remaining references. Keeping this unused function increases maintenance surface and can confuse future changes; consider removing it (or clearly documenting it as a legacy fallback) and updating tests that still patch it.
| class DockerStatsStreamer: | ||
| """ | ||
| Maintains one long-lived `container.stats(stream=True)` reader per container and | ||
| exposes the most recent decoded sample from an in-memory cache. | ||
|
|
||
| Callers read the cached sample via :meth:`get_latest` instead of issuing a new | ||
| HTTP round-trip every collection cycle. The first call for a previously-unseen | ||
| container lazily spawns the reader; no sample is returned until dockerd has | ||
| emitted at least one frame. | ||
|
|
||
| Readers self-terminate when the upstream iterator ends (container removed) or | ||
| raises a connection error. A subsequent :meth:`get_latest` call starts a fresh | ||
| reader if the container reappears. | ||
|
|
||
| TODO(#11219): Wire start/stop into the agent's container-lifecycle hooks | ||
| (_handle_start_event / _handle_clean_event) to avoid relying on lazy startup. | ||
| """ | ||
|
|
||
| _docker: Docker | ||
| _latest: dict[str, dict[str, Any]] | ||
| _tasks: dict[str, asyncio.Task[None]] | ||
| _closed: bool | ||
|
|
||
| def __init__(self, docker: Docker) -> None: | ||
| self._docker = docker | ||
| self._latest = {} | ||
| self._tasks = {} | ||
| self._closed = False | ||
|
|
||
| def get_latest(self, container_id: str) -> dict[str, Any] | None: | ||
| """Return the most recent cached sample for `container_id`, starting a | ||
| reader if none is running. Returns None until the first frame arrives.""" | ||
| if self._closed: | ||
| return None | ||
| return entry | ||
| task = self._tasks.get(container_id) | ||
| if task is None or task.done(): | ||
| self._tasks[container_id] = asyncio.create_task( | ||
| self._read_stream(container_id), | ||
| name=f"docker-stats-stream:{container_id[:7]}", | ||
| ) | ||
| return self._latest.get(container_id) |
There was a problem hiding this comment.
DockerStatsStreamer introduces new lifecycle/caching behavior (lazy task spawn, stream termination handling, close() cancellation), but the unit tests only mock it rather than exercising its real behavior. Adding focused tests for get_latest()/close() (and for stream termination cleaning up internal state like _tasks) would help prevent regressions and catch resource-leak scenarios early.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…andling Addresses review blockers on PR #11224: - Eager start/stop of stream tasks from container create/destroy events instead of lazy-on-first-call, eliminating the cold-start cycle that returned None for CPU/memory on newly-created containers. - Re-raise asyncio.CancelledError from the stream reader; log other exceptions with container id; stop swallowing BackendAIError silently. - Reconnect with bounded exponential backoff on ClientConnectionError. - Ensure close() cancels all in-flight stream tasks. - Rename _CONTAINER_STAT_TIMEOUT to _CONTAINER_INSPECT_TIMEOUT to match its current usage. - Add lifecycle/reconnect/shutdown tests. Refs #11219 Refs #11224 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Review summary from an independent pass + blocker resolutions pushed to this branch (commit Verdict: ship with changes — blockers resolved.
Tests added (9 new): Coordination note: if #11223 (sysfs-first) merges first, the Docker stream is only needed for network/IO fields — streaming all fields on every container is wasteful. Worth a follow-up to narrow the streamed fields, but out of scope here. Deferred: |
- Drop DockerStatsStreamer.wait_for_first_sample and its backing first-sample events/constant/test. It was an opt-in helper with no production caller. - Drop the orphaned fetch_api_stats + mock_fetch_api_stats fixture; both api_impl paths now go through self._stats_streamer.get_latest(). Kubernetes keeps its own copy. - Delete ContainerStatsStreamError: zero raise-sites, only instantiated to harvest an error_code for a log line. Replace with a plain log.error. Refs #11219 Refs #11224 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Add TODO(#11223) notes on CPU/Memory api_impl paths: after sysfs-first lands, the Docker stream on these plugins is only needed for network and blkio (which neither consumes), so the stream should migrate to a network/IO consumer. - Add TODO(#11232) at the two per-plugin DockerStatsStreamer instantiation sites flagging the shared-streamer refactor. - Add a test covering the DockerError 404 branch of _read_stream: reader exits cleanly on container-gone without spinning retries. Refs #11219 Refs #11223 Refs #11224 Refs #11232 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
For cumulative counters, if the Docker client in DockerStatsStreamer fails for an extended period, the same value may be repeatedly collected in latest. This can lead to a spike immediately after recovery. |
Closes #11219 (BA-5859)
Refs #11216
Summary
DockerStatsStreamer: a long-lived reader keyed by container ID. Callers read the most recent sample from memory instead ofcontainer.stats(stream=False)per collection cycle.start()/stop()wired through container-start / container-clean events on the agent (no lazy-on-first-get race).ClientConnectionError/TimeoutError.CancelledErrorre-raised; non-transientDockerError(e.g. 404 for container-gone) exits cleanly without retry spin._CONTAINER_STAT_TIMEOUT→_CONTAINER_INSPECT_TIMEOUTto reflect its remaining scope.Why
Cuts API-path stat latency from 100-500 ms per container to a memory read, and removes the 2s per-container timeout as a serial-blocker. Part of epic #11216.
Known trade-offs (tracked as follow-ups)
DockerStatsStreamer→ 2 streams per container. Consolidation tracked as Share a single DockerStatsStreamer across CPU/Memory intrinsic plugins #11232 / PR refactor(BA-5861): Share a single DockerStatsStreamer across CPU/Memory plugins #11234.api_implpaths will be sysfs-only and the stream migrates to network/IO consumers.TODO(#11223)comments mark the affected paths.Test plan
pants test tests/unit/agent::passes (lifecycle, reconnect, shutdown, 404 cleanup)get_latest()returnsNonedocker restartmid-session; confirm reconnect kicks in and stats resume