Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/11234.enhance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Share a single `DockerStatsStreamer` across CPU/Memory intrinsic plugins so each container opens one persistent Docker stats stream instead of two.
46 changes: 15 additions & 31 deletions src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1417,41 +1417,25 @@ async def _handle_start_event(self, ev: ContainerLifecycleEvent) -> None:
if ev.container_id is not None:
kernel_obj.set_container_id(ev.container_id)
if ev.container_id is not None:
await self._notify_compute_plugins_container_started(ev.container_id)
await self._on_container_started(ev.container_id)
log.info("Kernel {0} started", ev.kernel_id)

async def _notify_compute_plugins_container_started(self, container_id: ContainerId) -> None:
"""Notify all compute plugins that ``container_id`` has started.
async def _on_container_started(self, container_id: ContainerId) -> None:
"""Hook for subclasses to react to a container transitioning to RUNNING.

Plugins may use this to eagerly initialise per-container state
(e.g. :class:`DockerStatsStreamer`). Plugin failures are logged but
never prevent the container from running.
The default implementation is a no-op. Concrete agents (e.g. the Docker
agent) override this to start per-container resources such as a
long-lived stats stream reader.
"""
short_cid = str(container_id)[:13]
for device_name, computer_ctx in self.computers.items():
try:
await computer_ctx.instance.notify_container_started(str(container_id))
except Exception as e:
log.warning(
"compute plugin {} notify_container_started failed (cid:{}): {!r}",
device_name,
short_cid,
e,
)
return

async def _notify_compute_plugins_container_destroyed(self, container_id: ContainerId) -> None:
"""Notify all compute plugins that ``container_id`` has been cleaned up."""
short_cid = str(container_id)[:13]
for device_name, computer_ctx in self.computers.items():
try:
await computer_ctx.instance.notify_container_destroyed(str(container_id))
except Exception as e:
log.warning(
"compute plugin {} notify_container_destroyed failed (cid:{}): {!r}",
device_name,
short_cid,
e,
)
async def _on_container_destroyed(self, container_id: ContainerId) -> None:
"""Hook for subclasses to react to a container being cleaned up.

The default implementation is a no-op. Concrete agents (e.g. the Docker
agent) override this to release per-container resources.
"""
return

async def _handle_destroy_event(self, ev: ContainerLifecycleEvent) -> None:
log.info(
Expand Down Expand Up @@ -1537,7 +1521,7 @@ async def _handle_clean_event(self, ev: ContainerLifecycleEvent) -> None:
await destruction_task
del destruction_task
if ev.container_id is not None:
await self._notify_compute_plugins_container_destroyed(ev.container_id)
await self._on_container_destroyed(ev.container_id)
await self.stat_ctx.remove_kernel_metric(ev.kernel_id, ev.container_id)
async with self.registry_lock:
try:
Expand Down
44 changes: 41 additions & 3 deletions src/ai/backend/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
ScanImagesResult,
)
from ai.backend.agent.config.unified import AgentUnifiedConfig, ContainerSandboxType, ScratchType
from ai.backend.agent.docker.intrinsic import (
DockerStatsStreamer,
)
from ai.backend.agent.etcd import AgentEtcdClientView
from ai.backend.agent.exception import (
ContainerCreationError,
Expand Down Expand Up @@ -1422,6 +1425,7 @@ class DockerAgent(AbstractAgent[DockerKernel, DockerKernelCreationContext]):
docker_ptask_group: aiotools.PersistentTaskGroup
gwbridge_subnet: str | None
checked_invalid_images: set[str]
_stats_streamer: DockerStatsStreamer

network_plugin_ctx: NetworkPluginContext

Expand Down Expand Up @@ -1517,6 +1521,28 @@ async def __ainit__(self) -> None:
)
self.docker_info = docker_info
await self._kernel_recovery_adapter.adapt_recovery_data()

# For legacy accelerator plugins
self.docker = Docker()

# Single DockerStatsStreamer shared across intrinsic compute plugins.
# Keeps one long-lived ``container.stats(stream=True)`` reader per
# container instead of one per plugin, cutting open connections to
# dockerd in half for CPU + Memory plugins.
#
# NOTE: the streamer MUST be created and attached to plugins BEFORE
# ``super().__ainit__()`` runs. ``AbstractAgent.__ainit__`` calls
# ``scan_running_kernels()`` and then starts the lifecycle handler
# task (``process_lifecycle_events``); on warm restart that pipeline
# can fire container-start events which land in
# ``_on_container_started`` -> ``self._stats_streamer.start(...)``
# before any code AFTER ``super().__ainit__()`` gets to run.
self._stats_streamer = DockerStatsStreamer(self.docker)
for computer_ctx in self.computers.values():
instance = computer_ctx.instance
if hasattr(instance, "attach_stats_streamer"):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer to avoid dynamic access patterns such as hasattr whenever possible.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — removed the hasattr probe in 7cf076a. Moved attach_stats_streamer onto AbstractComputePlugin with a no-op default so non-Docker plugins (K8s, Dummy, third-party accelerators) inherit it safely, and DockerAgent.__ainit__ now calls it unconditionally on every compute plugin.

instance.attach_stats_streamer(self._stats_streamer)

await super().__ainit__()
try:
async with Docker() as docker:
Expand Down Expand Up @@ -1556,9 +1582,6 @@ async def __ainit__(self) -> None:
self.monitor_docker_task = asyncio.create_task(self.monitor_docker_events())
self.docker_ptask_group = aiotools.PersistentTaskGroup()

# For legacy accelerator plugins
self.docker = Docker()

self.network_plugin_ctx = NetworkPluginContext(
self.etcd, self.local_config.model_dump(by_alias=True)
)
Expand All @@ -1576,6 +1599,13 @@ async def shutdown(self, stop_signal: signal.Signals) -> None:
if self.docker_ptask_group is not None:
await self.docker_ptask_group.shutdown()

# Close the shared stats streamer before the underlying Docker client
# so in-flight reader tasks can cleanly drain their stream iterators.
# ``_stats_streamer`` is declared non-Optional and is assigned
# synchronously at the top of ``__ainit__`` before any ``await``,
# so it always exists once the agent is constructed.
await self._stats_streamer.close()

try:
await super().shutdown(stop_signal)
finally:
Expand All @@ -1587,6 +1617,14 @@ async def shutdown(self, stop_signal: signal.Signals) -> None:
if self.docker:
await self.docker.close()

@override
async def _on_container_started(self, container_id: ContainerId) -> None:
self._stats_streamer.start(str(container_id))

@override
async def _on_container_destroyed(self, container_id: ContainerId) -> None:
await self._stats_streamer.stop(str(container_id))

@override
async def _load_kernel_registry_from_recovery(self) -> MutableMapping[KernelId, AbstractKernel]:
return await self._kernel_recovery.load_kernel_registry()
Expand Down
26 changes: 10 additions & 16 deletions src/ai/backend/agent/docker/intrinsic.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,21 +356,18 @@ class CPUPlugin(AbstractComputePlugin):

async def init(self, context: Any | None = None) -> None:
self._docker = Docker()
# TODO(#11232): Consolidate per-plugin streamer into a single shared instance owned by the agent.
self._stats_streamer = DockerStatsStreamer(self._docker)

async def cleanup(self) -> None:
await self._stats_streamer.close()
await self._docker.close()

async def update_plugin_config(self, new_plugin_config: Mapping[str, Any]) -> None:
pass

async def notify_container_started(self, container_id: str) -> None:
self._stats_streamer.start(container_id)

async def notify_container_destroyed(self, container_id: str) -> None:
await self._stats_streamer.stop(container_id)
def attach_stats_streamer(self, streamer: DockerStatsStreamer) -> None:
"""Attach the agent-owned :class:`DockerStatsStreamer` used for reading
per-container stats. Called once by :class:`DockerAgent` after plugin
init so the streamer is shared across intrinsic plugins."""
self._stats_streamer = streamer

async def list_devices(self) -> Collection[CPUDevice]:
cores = await libnuma.get_available_cores()
Expand Down Expand Up @@ -689,21 +686,18 @@ class MemoryPlugin(AbstractComputePlugin):

async def init(self, context: Any | None = None) -> None:
self._docker = Docker()
# TODO(#11232): Consolidate per-plugin streamer into a single shared instance owned by the agent.
self._stats_streamer = DockerStatsStreamer(self._docker)

async def cleanup(self) -> None:
await self._stats_streamer.close()
await self._docker.close()

async def update_plugin_config(self, new_plugin_config: Mapping[str, Any]) -> None:
pass

async def notify_container_started(self, container_id: str) -> None:
self._stats_streamer.start(container_id)

async def notify_container_destroyed(self, container_id: str) -> None:
await self._stats_streamer.stop(container_id)
def attach_stats_streamer(self, streamer: DockerStatsStreamer) -> None:
"""Attach the agent-owned :class:`DockerStatsStreamer` used for reading
per-container stats. Called once by :class:`DockerAgent` after plugin
init so the streamer is shared across intrinsic plugins."""
self._stats_streamer = streamer

async def list_devices(self) -> Collection[MemoryDevice]:
memory_size = psutil.virtual_memory().total
Expand Down
19 changes: 0 additions & 19 deletions src/ai/backend/agent/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,25 +451,6 @@ async def generate_resource_data(self, device_alloc: DeviceAllocation) -> Mappin
"""
return {}

async def notify_container_started(self, container_id: str) -> None:
"""
Lifecycle hook invoked by the agent when a container transitions to RUNNING.

Subclasses may override this to eagerly spin up per-container resources
(e.g. start a long-lived stats stream reader) instead of relying on
lazy initialisation from the next stat collection cycle. Default: no-op.
"""
return

async def notify_container_destroyed(self, container_id: str) -> None:
"""
Lifecycle hook invoked by the agent when a container is being cleaned up.

Subclasses may override this to release per-container resources
(e.g. cancel the long-lived stats stream reader task). Default: no-op.
"""
return

@abstractmethod
async def restore_from_container(
self,
Expand Down
Loading
Loading