diff --git a/changes/11222.enhance.md b/changes/11222.enhance.md new file mode 100644 index 00000000000..a750a344139 --- /dev/null +++ b/changes/11222.enhance.md @@ -0,0 +1 @@ +Pin container memory to the CPU's NUMA node via `CpusetMems` when an allocation fits on a single node. Most effective with `affinity_policy=PREFER_SINGLE_NODE`; under the default `INTERLEAVED` policy, multi-core allocations will typically span nodes and fall back to the kernel's default placement. diff --git a/src/ai/backend/agent/docker/intrinsic.py b/src/ai/backend/agent/docker/intrinsic.py index e49ed0589d9..95744e8d166 100644 --- a/src/ai/backend/agent/docker/intrinsic.py +++ b/src/ai/backend/agent/docker/intrinsic.py @@ -426,6 +426,33 @@ async def get_hooks(self, distro: str, arch: str) -> Sequence[Path]: # TODO: move the sysconf hook in libbaihook.so here return [] + @staticmethod + def _resolve_node_local_mem(cores: list[int]) -> str | None: + """Return the NUMA node id (as a string suitable for ``CpusetMems``) when + every core in ``cores`` is on the same node, otherwise ``None``. + + Returns ``None`` when: + - NUMA is unsupported (non-Linux hosts, Linux without libnuma.so, + Docker Desktop, WSL, etc.) or the host exposes a single node — + otherwise ``libnuma.node_of_cpu`` would fall back to ``0`` and every + container would be pinned to ``CpusetMems="0"``. + - ``libnuma.node_of_cpu`` cannot resolve a core (returns a negative id). + - The allocation spans multiple NUMA nodes — in which case we + intentionally leave ``CpusetMems`` unset so Docker / the kernel + default NUMA memory placement policy can apply. + """ + if libnuma.num_nodes() <= 1: + return None + allocated_nodes: set[int] = set() + for core in cores: + node = libnuma.node_of_cpu(core) + if node < 0: + return None + allocated_nodes.add(node) + if len(allocated_nodes) != 1: + return None + return str(next(iter(allocated_nodes))) + async def generate_docker_args( self, docker: Docker, @@ -433,12 +460,15 @@ async def generate_docker_args( ) -> Mapping[str, Any]: cores = [*map(int, device_alloc[SlotName("cpu")].keys())] sorted_core_ids = [*map(str, sorted(cores))] + host_config: dict[str, Any] = { + "Cpus": len(cores), + "CpusetCpus": ",".join(sorted_core_ids), + } + cpuset_mems = self._resolve_node_local_mem(cores) + if cpuset_mems is not None: + host_config["CpusetMems"] = cpuset_mems return { - "HostConfig": { - "Cpus": len(cores), - "CpusetCpus": ",".join(sorted_core_ids), - # 'CpusetMems': f'{resource_spec.numa_node}', - }, + "HostConfig": host_config, } async def restore_from_container( diff --git a/src/ai/backend/agent/dummy/intrinsic.py b/src/ai/backend/agent/dummy/intrinsic.py index 9a5576ac7d6..e48af2ff6cc 100644 --- a/src/ai/backend/agent/dummy/intrinsic.py +++ b/src/ai/backend/agent/dummy/intrinsic.py @@ -137,6 +137,10 @@ async def generate_docker_args( docker: aiodocker.docker.Docker, device_alloc: DeviceAllocation, ) -> Mapping[str, Any]: + # The Docker backend pins ``CpusetMems`` to the allocation's NUMA node + # when the allocation is node-local. The dummy backend intentionally + # skips that because it never actually runs containers; the output is + # only inspected by tests that exercise the plumbing, not NUMA policy. cores = [*map(int, device_alloc[SlotName("cpu")].keys())] sorted_core_ids = [*map(str, sorted(cores))] return { @@ -145,7 +149,6 @@ async def generate_docker_args( "CpuQuota": int(100_000 * len(cores)), "Cpus": ",".join(sorted_core_ids), "CpusetCpus": ",".join(sorted_core_ids), - # 'CpusetMems': f'{resource_spec.numa_node}', }, } diff --git a/src/ai/backend/agent/kubernetes/intrinsic.py b/src/ai/backend/agent/kubernetes/intrinsic.py index 32c49525678..e71364611c4 100644 --- a/src/ai/backend/agent/kubernetes/intrinsic.py +++ b/src/ai/backend/agent/kubernetes/intrinsic.py @@ -191,7 +191,11 @@ async def generate_docker_args( device_alloc: Mapping[SlotName, Mapping[DeviceId, Decimal]], ) -> Mapping[str, Any]: # This function might be needed later to apply fine-grained tuning for - # K8s resource allocation + # K8s resource allocation. NUMA memory pinning is not mirrored from the + # Docker backend because this backend currently does not emit + # per-container CPU/memory requests/limits; node-local placement on + # Kubernetes would require Guaranteed-QoS pod specs plus cluster-level + # Topology Manager configuration, which is out of scope here. return {} async def restore_from_container( diff --git a/tests/unit/agent/test_docker_intrinsic.py b/tests/unit/agent/test_docker_intrinsic.py index e28f3816330..8933e7a93a1 100644 --- a/tests/unit/agent/test_docker_intrinsic.py +++ b/tests/unit/agent/test_docker_intrinsic.py @@ -4,6 +4,7 @@ from collections.abc import Generator from contextlib import contextmanager from dataclasses import dataclass +from decimal import Decimal from pathlib import Path from typing import Any from unittest.mock import AsyncMock, MagicMock, patch @@ -17,6 +18,30 @@ read_proc_net_dev, ) from ai.backend.agent.stats import StatModes +from ai.backend.common.types import DeviceId, SlotName + + +@contextmanager +def _patched_libnuma( + core_to_node: dict[int, int], + num_nodes: int, +) -> Generator[None, None, None]: + """Patch ``libnuma.num_nodes`` and ``libnuma.node_of_cpu`` used by + ``CPUPlugin._resolve_node_local_mem``. ``node_of_cpu`` returns ``-1`` for + cores missing from the map, matching real libnuma's behavior when NUMA + info is unavailable. + """ + with ( + patch( + "ai.backend.agent.docker.intrinsic.libnuma.num_nodes", + return_value=num_nodes, + ), + patch( + "ai.backend.agent.docker.intrinsic.libnuma.node_of_cpu", + side_effect=lambda core: core_to_node.get(core, -1), + ), + ): + yield class BaseDockerIntrinsicTest: @@ -610,3 +635,111 @@ def test_raises_oserror_for_nonexistent_pid(self) -> None: """Raises OSError when /proc/[pid]/net/dev does not exist.""" with pytest.raises(OSError): read_proc_net_dev(999999999) + + +@dataclass(frozen=True) +class _NumaScenario: + num_nodes: int + core_to_node: dict[int, int] + cores: list[int] + expected_cpuset_mems: str | None + + +_NUMA_SCENARIOS: dict[str, _NumaScenario] = { + # All allocated cores on node 0 → pin memory to "0". + "node_local_allocation": _NumaScenario( + num_nodes=2, + core_to_node={0: 0, 1: 0, 2: 1, 3: 1}, + cores=[0, 1], + expected_cpuset_mems="0", + ), + # Cores span nodes 0 and 1 → let the kernel default policy apply. + "multi_node_allocation": _NumaScenario( + num_nodes=2, + core_to_node={0: 0, 1: 0, 2: 1, 3: 1}, + cores=[0, 2], + expected_cpuset_mems=None, + ), + # libnuma can't resolve core 1 → side_effect returns -1 for unmapped cores. + "unknown_core": _NumaScenario( + num_nodes=2, + core_to_node={0: 0}, + cores=[0, 1], + expected_cpuset_mems=None, + ), + # libnuma reports -1 for a known core (NUMA info unavailable). + "negative_node_id": _NumaScenario( + num_nodes=2, + core_to_node={0: 0, 1: -1}, + cores=[0, 1], + expected_cpuset_mems=None, + ), + # Non-NUMA host (macOS, Docker Desktop, WSL, Linux w/o libnuma.so): + # num_nodes==1 must short-circuit before inspecting per-core nodes so + # containers are not unconditionally pinned to CpusetMems="0". + "non_numa_host": _NumaScenario( + num_nodes=1, + core_to_node={0: 0, 1: 0}, + cores=[0, 1], + expected_cpuset_mems=None, + ), + # Empty allocation → no cores to pin; must not produce CpusetMems="" or + # a spurious node id. Locks in the defensive behavior so a future + # refactor of _resolve_node_local_mem cannot regress it. + "empty_allocation": _NumaScenario( + num_nodes=2, + core_to_node={0: 0, 1: 0}, + cores=[], + expected_cpuset_mems=None, + ), +} + + +@pytest.fixture(params=list(_NUMA_SCENARIOS), ids=list(_NUMA_SCENARIOS)) +def numa_scenario(request: pytest.FixtureRequest) -> _NumaScenario: + return _NUMA_SCENARIOS[request.param] + + +class TestCPUPluginResolveNodeLocalMem: + """Unit tests for ``CPUPlugin._resolve_node_local_mem`` NUMA-locality logic.""" + + def test_returns_expected_cpuset_mems(self, numa_scenario: _NumaScenario) -> None: + with _patched_libnuma(numa_scenario.core_to_node, numa_scenario.num_nodes): + result = CPUPlugin._resolve_node_local_mem(numa_scenario.cores) + + assert result == numa_scenario.expected_cpuset_mems + + +class TestCPUPluginGenerateDockerArgsNumaLocality: + """Integration tests for ``CPUPlugin.generate_docker_args`` covering the + end-to-end wiring of ``_resolve_node_local_mem`` into ``HostConfig``. + """ + + @pytest.fixture + def cpu_plugin(self) -> CPUPlugin: + return CPUPlugin.__new__(CPUPlugin) + + @staticmethod + def _device_alloc(core_ids: list[int]) -> dict[SlotName, dict[DeviceId, Decimal]]: + return { + SlotName("cpu"): {DeviceId(str(cid)): Decimal("1") for cid in core_ids}, + } + + async def test_host_config_matches_scenario( + self, + cpu_plugin: CPUPlugin, + numa_scenario: _NumaScenario, + ) -> None: + with _patched_libnuma(numa_scenario.core_to_node, numa_scenario.num_nodes): + result = await cpu_plugin.generate_docker_args( + AsyncMock(), + self._device_alloc(numa_scenario.cores), + ) + + host_config = result["HostConfig"] + assert host_config["Cpus"] == len(numa_scenario.cores) + assert host_config["CpusetCpus"] == ",".join(str(c) for c in sorted(numa_scenario.cores)) + if numa_scenario.expected_cpuset_mems is None: + assert "CpusetMems" not in host_config + else: + assert host_config["CpusetMems"] == numa_scenario.expected_cpuset_mems