Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/11224.enhance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Stream container stats from Docker via a long-lived reader instead of polling `container.stats(stream=False)` per collection cycle.
37 changes: 37 additions & 0 deletions src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1416,8 +1416,43 @@ async def _handle_start_event(self, ev: ContainerLifecycleEvent) -> None:
kernel_obj.state = KernelLifecycleStatus.RUNNING
if ev.container_id is not None:
kernel_obj.set_container_id(ev.container_id)
if ev.container_id is not None:
await self._notify_compute_plugins_container_started(ev.container_id)
log.info("Kernel {0} started", ev.kernel_id)

async def _notify_compute_plugins_container_started(self, container_id: ContainerId) -> None:
"""Notify all compute plugins that ``container_id`` has started.

Plugins may use this to eagerly initialise per-container state
(e.g. :class:`DockerStatsStreamer`). Plugin failures are logged but
never prevent the container from running.
"""
short_cid = str(container_id)[:13]
for device_name, computer_ctx in self.computers.items():
try:
await computer_ctx.instance.notify_container_started(str(container_id))
except Exception as e:
log.warning(
"compute plugin {} notify_container_started failed (cid:{}): {!r}",
device_name,
short_cid,
e,
)

async def _notify_compute_plugins_container_destroyed(self, container_id: ContainerId) -> None:
"""Notify all compute plugins that ``container_id`` has been cleaned up."""
short_cid = str(container_id)[:13]
for device_name, computer_ctx in self.computers.items():
try:
await computer_ctx.instance.notify_container_destroyed(str(container_id))
except Exception as e:
log.warning(
"compute plugin {} notify_container_destroyed failed (cid:{}): {!r}",
device_name,
short_cid,
e,
)

async def _handle_destroy_event(self, ev: ContainerLifecycleEvent) -> None:
log.info(
"Handling destroy event for kernel {0} with container {1}",
Expand Down Expand Up @@ -1501,6 +1536,8 @@ async def _handle_clean_event(self, ev: ContainerLifecycleEvent) -> None:
# let the destruction task finish first
await destruction_task
del destruction_task
if ev.container_id is not None:
await self._notify_compute_plugins_container_destroyed(ev.container_id)
await self.stat_ctx.remove_kernel_metric(ev.kernel_id, ev.container_id)
async with self.registry_lock:
try:
Expand Down
Loading
Loading