Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/11222.enhance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pin container memory to the CPU's NUMA node via `CpusetMems` when an allocation fits on a single node. Most effective with `affinity_policy=PREFER_SINGLE_NODE`; under the default `INTERLEAVED` policy, multi-core allocations will typically span nodes and fall back to the kernel's default placement.
40 changes: 35 additions & 5 deletions src/ai/backend/agent/docker/intrinsic.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,19 +426,49 @@ async def get_hooks(self, distro: str, arch: str) -> Sequence[Path]:
# TODO: move the sysconf hook in libbaihook.so here
return []

@staticmethod
def _resolve_node_local_mem(cores: list[int]) -> str | None:
"""Return the NUMA node id (as a string suitable for ``CpusetMems``) when
every core in ``cores`` is on the same node, otherwise ``None``.

Returns ``None`` when:
- NUMA is unsupported (non-Linux hosts, Linux without libnuma.so,
Docker Desktop, WSL, etc.) or the host exposes a single node —
otherwise ``libnuma.node_of_cpu`` would fall back to ``0`` and every
container would be pinned to ``CpusetMems="0"``.
- ``libnuma.node_of_cpu`` cannot resolve a core (returns a negative id).
- The allocation spans multiple NUMA nodes — in which case we
intentionally leave ``CpusetMems`` unset so Docker / the kernel
default NUMA memory placement policy can apply.
"""
if libnuma.num_nodes() <= 1:
return None
allocated_nodes: set[int] = set()
for core in cores:
node = libnuma.node_of_cpu(core)
if node < 0:
return None
allocated_nodes.add(node)
if len(allocated_nodes) != 1:
return None
return str(next(iter(allocated_nodes)))

async def generate_docker_args(
self,
docker: Docker,
device_alloc: Mapping[SlotName, Mapping[DeviceId, Decimal]],
) -> Mapping[str, Any]:
cores = [*map(int, device_alloc[SlotName("cpu")].keys())]
sorted_core_ids = [*map(str, sorted(cores))]
host_config: dict[str, Any] = {
"Cpus": len(cores),
"CpusetCpus": ",".join(sorted_core_ids),
}
cpuset_mems = self._resolve_node_local_mem(cores)
if cpuset_mems is not None:
host_config["CpusetMems"] = cpuset_mems
return {
"HostConfig": {
"Cpus": len(cores),
"CpusetCpus": ",".join(sorted_core_ids),
# 'CpusetMems': f'{resource_spec.numa_node}',
},
"HostConfig": host_config,
}
Comment thread
rapsealk marked this conversation as resolved.

async def restore_from_container(
Expand Down
5 changes: 4 additions & 1 deletion src/ai/backend/agent/dummy/intrinsic.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ async def generate_docker_args(
docker: aiodocker.docker.Docker,
device_alloc: DeviceAllocation,
) -> Mapping[str, Any]:
# The Docker backend pins ``CpusetMems`` to the allocation's NUMA node
# when the allocation is node-local. The dummy backend intentionally
# skips that because it never actually runs containers; the output is
# only inspected by tests that exercise the plumbing, not NUMA policy.
cores = [*map(int, device_alloc[SlotName("cpu")].keys())]
sorted_core_ids = [*map(str, sorted(cores))]
return {
Expand All @@ -145,7 +149,6 @@ async def generate_docker_args(
"CpuQuota": int(100_000 * len(cores)),
"Cpus": ",".join(sorted_core_ids),
"CpusetCpus": ",".join(sorted_core_ids),
# 'CpusetMems': f'{resource_spec.numa_node}',
},
}

Expand Down
6 changes: 5 additions & 1 deletion src/ai/backend/agent/kubernetes/intrinsic.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ async def generate_docker_args(
device_alloc: Mapping[SlotName, Mapping[DeviceId, Decimal]],
) -> Mapping[str, Any]:
# This function might be needed later to apply fine-grained tuning for
# K8s resource allocation
# K8s resource allocation. NUMA memory pinning is not mirrored from the
# Docker backend because this backend currently does not emit
# per-container CPU/memory requests/limits; node-local placement on
# Kubernetes would require Guaranteed-QoS pod specs plus cluster-level
# Topology Manager configuration, which is out of scope here.
return {}

async def restore_from_container(
Expand Down
133 changes: 133 additions & 0 deletions tests/unit/agent/test_docker_intrinsic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from collections.abc import Generator
from contextlib import contextmanager
from dataclasses import dataclass
from decimal import Decimal
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
Expand All @@ -17,6 +18,30 @@
read_proc_net_dev,
)
from ai.backend.agent.stats import StatModes
from ai.backend.common.types import DeviceId, SlotName


@contextmanager
def _patched_libnuma(
core_to_node: dict[int, int],
num_nodes: int,
) -> Generator[None, None, None]:
"""Patch ``libnuma.num_nodes`` and ``libnuma.node_of_cpu`` used by
``CPUPlugin._resolve_node_local_mem``. ``node_of_cpu`` returns ``-1`` for
cores missing from the map, matching real libnuma's behavior when NUMA
info is unavailable.
"""
with (
patch(
"ai.backend.agent.docker.intrinsic.libnuma.num_nodes",
return_value=num_nodes,
),
patch(
"ai.backend.agent.docker.intrinsic.libnuma.node_of_cpu",
side_effect=lambda core: core_to_node.get(core, -1),
),
):
yield


class BaseDockerIntrinsicTest:
Expand Down Expand Up @@ -610,3 +635,111 @@ def test_raises_oserror_for_nonexistent_pid(self) -> None:
"""Raises OSError when /proc/[pid]/net/dev does not exist."""
with pytest.raises(OSError):
read_proc_net_dev(999999999)


@dataclass(frozen=True)
class _NumaScenario:
num_nodes: int
core_to_node: dict[int, int]
cores: list[int]
expected_cpuset_mems: str | None


_NUMA_SCENARIOS: dict[str, _NumaScenario] = {
# All allocated cores on node 0 → pin memory to "0".
"node_local_allocation": _NumaScenario(
num_nodes=2,
core_to_node={0: 0, 1: 0, 2: 1, 3: 1},
cores=[0, 1],
expected_cpuset_mems="0",
),
# Cores span nodes 0 and 1 → let the kernel default policy apply.
"multi_node_allocation": _NumaScenario(
num_nodes=2,
core_to_node={0: 0, 1: 0, 2: 1, 3: 1},
cores=[0, 2],
expected_cpuset_mems=None,
),
# libnuma can't resolve core 1 → side_effect returns -1 for unmapped cores.
"unknown_core": _NumaScenario(
num_nodes=2,
core_to_node={0: 0},
cores=[0, 1],
expected_cpuset_mems=None,
),
# libnuma reports -1 for a known core (NUMA info unavailable).
"negative_node_id": _NumaScenario(
num_nodes=2,
core_to_node={0: 0, 1: -1},
cores=[0, 1],
expected_cpuset_mems=None,
),
# Non-NUMA host (macOS, Docker Desktop, WSL, Linux w/o libnuma.so):
# num_nodes==1 must short-circuit before inspecting per-core nodes so
# containers are not unconditionally pinned to CpusetMems="0".
"non_numa_host": _NumaScenario(
num_nodes=1,
core_to_node={0: 0, 1: 0},
cores=[0, 1],
expected_cpuset_mems=None,
),
# Empty allocation → no cores to pin; must not produce CpusetMems="" or
# a spurious node id. Locks in the defensive behavior so a future
# refactor of _resolve_node_local_mem cannot regress it.
"empty_allocation": _NumaScenario(
num_nodes=2,
core_to_node={0: 0, 1: 0},
cores=[],
expected_cpuset_mems=None,
),
}


@pytest.fixture(params=list(_NUMA_SCENARIOS), ids=list(_NUMA_SCENARIOS))
def numa_scenario(request: pytest.FixtureRequest) -> _NumaScenario:
return _NUMA_SCENARIOS[request.param]


class TestCPUPluginResolveNodeLocalMem:
"""Unit tests for ``CPUPlugin._resolve_node_local_mem`` NUMA-locality logic."""

def test_returns_expected_cpuset_mems(self, numa_scenario: _NumaScenario) -> None:
with _patched_libnuma(numa_scenario.core_to_node, numa_scenario.num_nodes):
result = CPUPlugin._resolve_node_local_mem(numa_scenario.cores)

assert result == numa_scenario.expected_cpuset_mems


class TestCPUPluginGenerateDockerArgsNumaLocality:
"""Integration tests for ``CPUPlugin.generate_docker_args`` covering the
end-to-end wiring of ``_resolve_node_local_mem`` into ``HostConfig``.
"""

@pytest.fixture
def cpu_plugin(self) -> CPUPlugin:
return CPUPlugin.__new__(CPUPlugin)

@staticmethod
def _device_alloc(core_ids: list[int]) -> dict[SlotName, dict[DeviceId, Decimal]]:
return {
SlotName("cpu"): {DeviceId(str(cid)): Decimal("1") for cid in core_ids},
}

async def test_host_config_matches_scenario(
self,
cpu_plugin: CPUPlugin,
numa_scenario: _NumaScenario,
) -> None:
with _patched_libnuma(numa_scenario.core_to_node, numa_scenario.num_nodes):
result = await cpu_plugin.generate_docker_args(
AsyncMock(),
self._device_alloc(numa_scenario.cores),
)

host_config = result["HostConfig"]
assert host_config["Cpus"] == len(numa_scenario.cores)
assert host_config["CpusetCpus"] == ",".join(str(c) for c in sorted(numa_scenario.cores))
if numa_scenario.expected_cpuset_mems is None:
assert "CpusetMems" not in host_config
else:
assert host_config["CpusetMems"] == numa_scenario.expected_cpuset_mems
Loading