|
10 | 10 |
|
11 | 11 | import pytest |
12 | 12 |
|
| 13 | +from ai.backend.agent.docker import intrinsic |
13 | 14 | from ai.backend.agent.docker.intrinsic import ( |
14 | 15 | ContainerNetStat, |
15 | 16 | CPUPlugin, |
16 | 17 | MemoryPlugin, |
| 18 | + _warn_cgroup_fallback_once, |
17 | 19 | read_proc_net_dev, |
18 | 20 | ) |
19 | 21 | from ai.backend.agent.stats import StatModes |
@@ -277,6 +279,101 @@ async def test_sysfs_mode_uses_instance_docker_client( |
277 | 279 | await memory_plugin.gather_container_measures(memory_cgroup_context, container_ids) |
278 | 280 | mock_docker_cls.assert_not_called() |
279 | 281 |
|
| 282 | + async def test_cgroup_mode_falls_back_to_api_on_sysfs_failure( |
| 283 | + self, |
| 284 | + memory_plugin: MemoryPlugin, |
| 285 | + container_ids: list[str], |
| 286 | + cgroup_stat_context: MagicMock, |
| 287 | + mock_fetch_api_stats: MagicMock, |
| 288 | + ) -> None: |
| 289 | + """When sysfs read fails in CGROUP mode, the Docker API is used |
| 290 | + as a per-read fallback instead of silently returning zero.""" |
| 291 | + # Arrange: cgroup version that triggers "return None" in sysfs_impl. |
| 292 | + cgroup_stat_context.agent.get_cgroup_version = MagicMock(return_value="invalid") |
| 293 | + cgroup_stat_context.agent.get_cgroup_path = MagicMock(return_value=MagicMock()) |
| 294 | + |
| 295 | + results = await memory_plugin.gather_container_measures(cgroup_stat_context, container_ids) |
| 296 | + |
| 297 | + assert mock_fetch_api_stats.call_count == len(container_ids) |
| 298 | + # api_impl returns mem_cur = 1024 * 1024 * 100 = 104857600 bytes |
| 299 | + for cid in container_ids: |
| 300 | + assert results[0].per_container[cid].value == 1024 * 1024 * 100 |
| 301 | + |
| 302 | + async def test_linuxkit_forces_api_even_in_cgroup_mode( |
| 303 | + self, |
| 304 | + container_ids: list[str], |
| 305 | + cgroup_stat_context: MagicMock, |
| 306 | + mock_fetch_api_stats: MagicMock, |
| 307 | + ) -> None: |
| 308 | + """On linuxkit hosts the API path is used even when mode is CGROUP.""" |
| 309 | + plugin = MemoryPlugin.__new__(MemoryPlugin) |
| 310 | + plugin.local_config = {"agent": {"docker-mode": "linuxkit"}} |
| 311 | + plugin._docker = AsyncMock() |
| 312 | + |
| 313 | + await plugin.gather_container_measures(cgroup_stat_context, container_ids) |
| 314 | + assert mock_fetch_api_stats.call_count == len(container_ids) |
| 315 | + |
| 316 | + |
| 317 | +class TestWarnCgroupFallbackOnce: |
| 318 | + """Tests for _warn_cgroup_fallback_once() dedup and bounded-cache semantics.""" |
| 319 | + |
| 320 | + @pytest.fixture(autouse=True) |
| 321 | + def _reset_warn_cache(self) -> Generator[None, None, None]: |
| 322 | + """Reset the module-level warn cache between tests to avoid bleed-through.""" |
| 323 | + intrinsic._cgroup_fallback_warned.clear() |
| 324 | + yield |
| 325 | + intrinsic._cgroup_fallback_warned.clear() |
| 326 | + |
| 327 | + def test_deduplicates_per_container( |
| 328 | + self, |
| 329 | + caplog: pytest.LogCaptureFixture, |
| 330 | + ) -> None: |
| 331 | + """The helper logs once per (plugin, container_id) and stays silent on |
| 332 | + subsequent calls for the same container.""" |
| 333 | + with caplog.at_level("WARNING", logger="ai.backend.agent.docker.intrinsic"): |
| 334 | + _warn_cgroup_fallback_once("CPUPlugin", "container_abc") |
| 335 | + _warn_cgroup_fallback_once("CPUPlugin", "container_abc") |
| 336 | + _warn_cgroup_fallback_once("CPUPlugin", "container_abc") |
| 337 | + |
| 338 | + warn_records = [r for r in caplog.records if r.levelname == "WARNING"] |
| 339 | + assert len(warn_records) == 1 |
| 340 | + |
| 341 | + # A different container should still warn. |
| 342 | + caplog.clear() |
| 343 | + with caplog.at_level("WARNING", logger="ai.backend.agent.docker.intrinsic"): |
| 344 | + _warn_cgroup_fallback_once("CPUPlugin", "container_xyz") |
| 345 | + warn_records = [r for r in caplog.records if r.levelname == "WARNING"] |
| 346 | + assert len(warn_records) == 1 |
| 347 | + |
| 348 | + # Same container under a different plugin namespace should also warn once. |
| 349 | + caplog.clear() |
| 350 | + with caplog.at_level("WARNING", logger="ai.backend.agent.docker.intrinsic"): |
| 351 | + _warn_cgroup_fallback_once("MemoryPlugin", "container_abc") |
| 352 | + warn_records = [r for r in caplog.records if r.levelname == "WARNING"] |
| 353 | + assert len(warn_records) == 1 |
| 354 | + |
| 355 | + def test_evicts_beyond_limit( |
| 356 | + self, |
| 357 | + caplog: pytest.LogCaptureFixture, |
| 358 | + ) -> None: |
| 359 | + """When the bounded cache overflows, the oldest entry is evicted and a |
| 360 | + previously-seen container may warn again.""" |
| 361 | + cap = intrinsic._CGROUP_FALLBACK_WARN_CACHE_SIZE |
| 362 | + first_cid = "first_container" |
| 363 | + |
| 364 | + with caplog.at_level("WARNING", logger="ai.backend.agent.docker.intrinsic"): |
| 365 | + # First warn for `first_cid`. |
| 366 | + _warn_cgroup_fallback_once("CPUPlugin", first_cid) |
| 367 | + # Fill the cache with `cap` distinct new entries to evict `first_cid`. |
| 368 | + for i in range(cap): |
| 369 | + _warn_cgroup_fallback_once("CPUPlugin", f"filler_{i}") |
| 370 | + # `first_cid` should have been evicted and now warn again. |
| 371 | + _warn_cgroup_fallback_once("CPUPlugin", first_cid) |
| 372 | + |
| 373 | + warn_records = [r for r in caplog.records if r.levelname == "WARNING"] |
| 374 | + # 1 (first) + cap (fillers) + 1 (re-warn of first) = cap + 2 |
| 375 | + assert len(warn_records) == cap + 2 |
| 376 | + |
280 | 377 |
|
281 | 378 | class TestMemoryPluginContainerPidValidation(BaseDockerIntrinsicTest): |
282 | 379 | """Tests for container PID validation before reading /proc/[pid]/net/dev.""" |
|
0 commit comments