Skip to content

Commit 586a2aa

Browse files
rapsealkclaude
andcommitted
refactor(agent): Reuse the shared aiodocker client in DockerKernel methods
Follow-up to #11226: route DockerKernel's get_logs/commit/download_file/ download_single through the agent-owned Docker client instead of opening a throwaway connection per call. commit() previously used a bare Docker() with no closing_async - that leak is also fixed. Closes #11227 Refs #11216 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 75877e5 commit 586a2aa

7 files changed

Lines changed: 191 additions & 157 deletions

File tree

changes/11227.enhance.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Route DockerKernel's get_logs/commit/download_file/download_single through the agent-owned shared aiodocker client instead of opening a throwaway connection per call.

src/ai/backend/agent/docker/agent.py

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ def __init__(
316316
agent_sockpath: Path,
317317
resource_lock: asyncio.Lock,
318318
network_plugin_ctx: NetworkPluginContext,
319+
docker: Docker,
319320
restarting: bool = False,
320321
cluster_ssh_port_mapping: ClusterSSHPortMapping | None = None,
321322
gwbridge_subnet: str | None = None,
@@ -351,6 +352,7 @@ def __init__(
351352
self.gwbridge_subnet = gwbridge_subnet
352353

353354
self.network_plugin_ctx = network_plugin_ctx
355+
self._docker = docker
354356

355357
def _kernel_resource_spec_read(self, filename: Path | str) -> KernelResourceSpec:
356358
filepath = Path(filename)
@@ -1010,6 +1012,7 @@ def chown_idfile(uid: int | None, gid: int | None) -> None:
10101012
resource_spec=resource_spec,
10111013
environ=environ,
10121014
data={},
1015+
docker=self._docker,
10131016
)
10141017

10151018
@property
@@ -1482,47 +1485,49 @@ def __init__(
14821485
)
14831486

14841487
async def __ainit__(self) -> None:
1485-
async with closing_async(Docker()) as docker:
1486-
docker_host = ""
1487-
match docker.connector:
1488-
case aiohttp.TCPConnector():
1489-
if docker.docker_host is None:
1490-
raise InvalidArgumentError("docker_host is not set for TCP connector")
1491-
docker_host = docker.docker_host
1492-
case aiohttp.NamedPipeConnector() | aiohttp.UnixConnector() as connector:
1493-
docker_host = connector.path
1494-
case _:
1495-
docker_host = "(unknown)"
1496-
log.info("accessing the local Docker daemon via {}", docker_host)
1497-
docker_version = await docker.version()
1498-
log.info(
1499-
"running with Docker {0} with API {1}",
1500-
docker_version["Version"],
1501-
docker_version["ApiVersion"],
1502-
)
1503-
kernel_version = docker_version["KernelVersion"]
1504-
if "linuxkit" in kernel_version:
1505-
self.local_config.agent.docker_mode = "linuxkit"
1506-
else:
1507-
self.local_config.agent.docker_mode = "native"
1508-
docker_info = await docker.system.info()
1509-
docker_info = dict(docker_info)
1510-
# Assume cgroup v1 if CgroupVersion key is absent
1511-
if "CgroupVersion" not in docker_info:
1512-
docker_info["CgroupVersion"] = "1"
1513-
log.info(
1514-
"Cgroup Driver: {0}, Cgroup Version: {1}",
1515-
docker_info["CgroupDriver"],
1516-
docker_info["CgroupVersion"],
1517-
)
1518-
self.docker_info = docker_info
1488+
# Long-lived shared aiodocker client; must be available before kernel recovery
1489+
# loads any DockerKernel instances via `super().__ainit__()`.
1490+
self.docker = Docker()
1491+
docker = self.docker
1492+
docker_host = ""
1493+
match docker.connector:
1494+
case aiohttp.TCPConnector():
1495+
if docker.docker_host is None:
1496+
raise InvalidArgumentError("docker_host is not set for TCP connector")
1497+
docker_host = docker.docker_host
1498+
case aiohttp.NamedPipeConnector() | aiohttp.UnixConnector() as connector:
1499+
docker_host = connector.path
1500+
case _:
1501+
docker_host = "(unknown)"
1502+
log.info("accessing the local Docker daemon via {}", docker_host)
1503+
docker_version = await docker.version()
1504+
log.info(
1505+
"running with Docker {0} with API {1}",
1506+
docker_version["Version"],
1507+
docker_version["ApiVersion"],
1508+
)
1509+
kernel_version = docker_version["KernelVersion"]
1510+
if "linuxkit" in kernel_version:
1511+
self.local_config.agent.docker_mode = "linuxkit"
1512+
else:
1513+
self.local_config.agent.docker_mode = "native"
1514+
docker_info = await docker.system.info()
1515+
docker_info = dict(docker_info)
1516+
# Assume cgroup v1 if CgroupVersion key is absent
1517+
if "CgroupVersion" not in docker_info:
1518+
docker_info["CgroupVersion"] = "1"
1519+
log.info(
1520+
"Cgroup Driver: {0}, Cgroup Version: {1}",
1521+
docker_info["CgroupDriver"],
1522+
docker_info["CgroupVersion"],
1523+
)
1524+
self.docker_info = docker_info
15191525
await self._kernel_recovery_adapter.adapt_recovery_data()
15201526
await super().__ainit__()
15211527
try:
1522-
async with Docker() as docker:
1523-
gwbridge = await docker.networks.get("docker_gwbridge")
1524-
gwbridge_info = await gwbridge.show()
1525-
self.gwbridge_subnet = gwbridge_info["IPAM"]["Config"][0]["Subnet"]
1528+
gwbridge = await self.docker.networks.get("docker_gwbridge")
1529+
gwbridge_info = await gwbridge.show()
1530+
self.gwbridge_subnet = gwbridge_info["IPAM"]["Config"][0]["Subnet"]
15261531
except (DockerError, KeyError, IndexError):
15271532
self.gwbridge_subnet = None
15281533
ipc_base_path = self.local_config.agent.ipc_base_path
@@ -1556,9 +1561,6 @@ async def __ainit__(self) -> None:
15561561
self.monitor_docker_task = asyncio.create_task(self.monitor_docker_events())
15571562
self.docker_ptask_group = aiotools.PersistentTaskGroup()
15581563

1559-
# For legacy accelerator plugins
1560-
self.docker = Docker()
1561-
15621564
self.network_plugin_ctx = NetworkPluginContext(
15631565
self.etcd, self.local_config.model_dump(by_alias=True)
15641566
)
@@ -1589,7 +1591,11 @@ async def shutdown(self, stop_signal: signal.Signals) -> None:
15891591

15901592
@override
15911593
async def _load_kernel_registry_from_recovery(self) -> MutableMapping[KernelId, AbstractKernel]:
1592-
return await self._kernel_recovery.load_kernel_registry()
1594+
registry = await self._kernel_recovery.load_kernel_registry()
1595+
for kernel_obj in registry.values():
1596+
if isinstance(kernel_obj, DockerKernel):
1597+
kernel_obj.attach_docker(self.docker)
1598+
return registry
15931599

15941600
@override
15951601
async def _write_kernel_registry_to_recovery(
@@ -1999,6 +2005,7 @@ async def init_kernel_context(
19992005
self.agent_sockpath,
20002006
self.resource_lock,
20012007
self.network_plugin_ctx,
2008+
self.docker,
20022009
restarting=restarting,
20032010
cluster_ssh_port_mapping=cluster_ssh_port_mapping,
20042011
gwbridge_subnet=self.gwbridge_subnet,

0 commit comments

Comments
 (0)