diff --git a/.github/workflows/unit_test_sandbox.yml b/.github/workflows/unit_test_sandbox.yml index b9c96d80a..06a0d335c 100644 --- a/.github/workflows/unit_test_sandbox.yml +++ b/.github/workflows/unit_test_sandbox.yml @@ -86,7 +86,7 @@ jobs: - name: Run tests with coverage (default) run: | - coverage run -m pytest tests/sandbox/test_sandbox.py tests/sandbox/test_sandbox_service.py tests/sandbox/test_heartbeat.py + coverage run -m pytest tests/sandbox/test_sandbox.py tests/sandbox/test_sandbox_service.py tests/sandbox/test_heartbeat.py tests/sandbox/test_heartbeat_timeout_restore.py - name: Generate coverage report run: | diff --git a/README_zh.md b/README_zh.md index b2ffff637..a959a8632 100644 --- a/README_zh.md +++ b/README_zh.md @@ -14,7 +14,8 @@ [![GitHub Forks](https://img.shields.io/github/forks/agentscope-ai/agentscope-runtime?style=flat&logo=github&color=purple&label=Forks)](https://github.com/agentscope-ai/agentscope-runtime/network) [![Build Status](https://img.shields.io/badge/build-passing-brightgreen.svg?logo=githubactions&label=Build)](https://github.com/agentscope-ai/agentscope-runtime/actions) [![Cookbook](https://img.shields.io/badge/📚_Cookbook-English|中文-teal.svg)](https://runtime.agentscope.io) -[![DeepWiki](https://img.shields.io/badge/DeepWiki-agentscope--runtime-navy.svg?logo=data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACwAAAAyCAYAAAAnWDnqAAAAAXNSR0IArs4c6QAAA05JREFUaEPtmUtyEzEQhtWTQyQLHNak2AB7ZnyXZMEjXMGeK/AIi+QuHrMnbChYY7MIh8g01fJoopFb0uhhEqqcbWTp06/uv1saEDv4O3n3dV60RfP947Mm9/SQc0ICFQgzfc4CYZoTPAswgSJCCUJUnAAoRHOAUOcATwbmVLWdGoH//PB8mnKqScAhsD0kYP3j/Yt5LPQe2KvcXmGvRHcDnpxfL2zOYJ1mFwrryWTz0advv1Ut4CJgf5uhDuDj5eUcAUoahrdY/56ebRWeraTjMt/00Sh3UDtjgHtQNHwcRGOC98BJEAEymycmYcWwOprTgcB6VZ5JK5TAJ+fXGLBm3FDAmn6oPPjR4rKCAoJCal2eAiQp2x0vxTPB3ALO2CRkwmDy5WohzBDwSEFKRwPbknEggCPB/imwrycgxX2NzoMCHhPkDwqYMr9tRcP5qNrMZHkVnOjRMWwLCcr8ohBVb1OMjxLwGCvjTikrsBOiA6fNyCrm8V1rP93iVPpwaE+gO0SsWmPiXB+jikdf6SizrT5qKasx5j8ABbHpFTx+vFXp9EnYQmLx02h1QTTrl6eDqxLnGjporxl3NL3agEvXdT0WmEost648sQOYAeJS9Q7bfUVoMGnjo4AZdUMQku50McDcMWcBPvr0SzbTAFDfvJqwLzgxwATnCgnp4wDl6Aa+Ax283gghmj+vj7feE2KBBRMW3FzOpLOADl0Isb5587h/U4gGvkt5v60Z1VLG8BhYjbzRwyQZemwAd6cCR5/XFWLYZRIMpX39AR0tjaGGiGzLVyhse5C9RKC6ai42ppWPKiBagOvaYk8lO7DajerabOZP46Lby5wKjw1HCRx7p9sVMOWGzb/vA1hwiWc6jm3MvQDTogQkiqIhJV0nBQBTU+3okKCFDy9WwferkHjtxib7t3xIUQtHxnIwtx4mpg26/HfwVNVDb4oI9RHmx5WGelRVlrtiw43zboCLaxv46AZeB3IlTkwouebTr1y2NjSpHz68WNFjHvupy3q8TFn3Hos2IAk4Ju5dCo8B3wP7VPr/FGaKiG+T+v+TQqIrOqMTL1VdWV1DdmcbO8KXBz6esmYWYKPwDL5b5FA1a0hwapHiom0r/cKaoqr+27/XcrS5UwSMbQAAAABJRU5ErkJggg==)](https://deepwiki.com/agentscope-ai/agentscope-runtime)[![A2A](https://img.shields.io/badge/A2A-Agent_to_Agent-blue.svg?label=A2A)](https://a2a-protocol.org/) +[![DeepWiki](https://img.shields.io/badge/DeepWiki-Ask_Devin-navy.svg?logo=data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACwAAAAyCAYAAAAnWDnqAAAAAXNSR0IArs4c6QAAA05JREFUaEPtmUtyEzEQhtWTQyQLHNak2AB7ZnyXZMEjXMGeK/AIi+QuHrMnbChYY7MIh8g01fJoopFb0uhhEqqcbWTp06/uv1saEDv4O3n3dV60RfP947Mm9/SQc0ICFQgzfc4CYZoTPAswgSJCCUJUnAAoRHOAUOcATwbmVLWdGoH//PB8mnKqScAhsD0kYP3j/Yt5LPQe2KvcXmGvRHcDnpxfL2zOYJ1mFwrryWTz0advv1Ut4CJgf5uhDuDj5eUcAUoahrdY/56ebRWeraTjMt/00Sh3UDtjgHtQNHwcRGOC98BJEAEymycmYcWwOprTgcB6VZ5JK5TAJ+fXGLBm3FDAmn6oPPjR4rKCAoJCal2eAiQp2x0vxTPB3ALO2CRkwmDy5WohzBDwSEFKRwPbknEggCPB/imwrycgxX2NzoMCHhPkDwqYMr9tRcP5qNrMZHkVnOjRMWwLCcr8ohBVb1OMjxLwGCvjTikrsBOiA6fNyCrm8V1rP93iVPpwaE+gO0SsWmPiXB+jikdf6SizrT5qKasx5j8ABbHpFTx+vFXp9EnYQmLx02h1QTTrl6eDqxLnGjporxl3NL3agEvXdT0WmEost648sQOYAeJS9Q7bfUVoMGnjo4AZdUMQku50McDcMWcBPvr0SzbTAFDfvJqwLzgxwATnCgnp4wDl6Aa+Ax283gghmj+vj7feE2KBBRMW3FzOpLOADl0Isb5587h/U4gGvkt5v60Z1VLG8BhYjbzRwyQZemwAd6cCR5/XFWLYZRIMpX39AR0tjaGGiGzLVyhse5C9RKC6ai42ppWPKiBagOvaYk8lO7DajerabOZP46Lby5wKjw1HCRx7p9sVMOWGzb/vA1hwiWc6jm3MvQDTogQkiqIhJV0nBQBTU+3okKCFDy9WwferkHjtxib7t3xIUQtHxnIwtx4mpg26/HfwVNVDb4oI9RHmx5WGelRVlrtiw43zboCLaxv46AZeB3IlTkwouebTr1y2NjSpHz68WNFjHvupy3q8TFn3Hos2IAk4Ju5dCo8B3wP7VPr/FGaKiG+T+v+TQqIrOqMTL1VdWV1DdmcbO8KXBz6esmYWYKPwDL5b5FA1a0hwapHiom0r/cKaoqr+27/XcrS5UwSMbQAAAABJRU5ErkJggg==)](https://deepwiki.com/agentscope-ai/agentscope-runtime) +[![A2A](https://img.shields.io/badge/A2A-Agent_to_Agent-blue.svg?label=A2A)](https://a2a-protocol.org/) [![MCP](https://img.shields.io/badge/MCP-Model_Context_Protocol-purple.svg?logo=plug&label=MCP)](https://modelcontextprotocol.io/) [![Discord](https://img.shields.io/badge/Discord-Join_Us-blueviolet.svg?logo=discord)](https://discord.gg/eYMpfnkG8h) [![DingTalk](https://img.shields.io/badge/DingTalk-Join_Us-orange.svg)](https://qr.dingtalk.com/action/joingroup?code=v1,k1,OmDlBXpjW+I2vWjKDsjvI9dhcXjGZi3bQiojOq3dlDw=&_dt_no_comment=1&origin=11) diff --git a/cookbook/en/sandbox/advanced.md b/cookbook/en/sandbox/advanced.md index a5482161e..20b982d7d 100644 --- a/cookbook/en/sandbox/advanced.md +++ b/cookbook/en/sandbox/advanced.md @@ -121,8 +121,9 @@ KUBECONFIG_PATH= | `READONLY_MOUNTS` | Read-only directory mounts | `None` | A dictionary mapping **host paths** to **container paths**, mounted in **read-only** mode. Used to share files/configurations without allowing container writes. Example:
`{"\/Users\/alice\/data": "\/data"}` mounts the host's `/Users/alice/data` to `/data` inside the container as read-only. | | `PORT_RANGE` | Available port range | `[49152,59152]` | For service port allocation | | `HEARTBEAT_TIMEOUT` | Session heartbeat timeout (seconds) | `300` | If a `session_ctx_id` has no “touch” activities (e.g., list_tools/call_tool/check_health/add_mcp_servers) within this period, it is considered idle and can be reaped by the scanner. | -| `HEARTBEAT_SCAN_INTERVAL` | Heartbeat scan interval (seconds) | `0` | Interval for the background watcher to run the heartbeat scan. Set to `0` to disable the watcher (you may run `scan_heartbeat_once()` via an external cron instead). | | `HEARTBEAT_LOCK_TTL` | Distributed lock TTL for scan/reap (seconds) | `120` | In multi-instance deployments, used to ensure only one instance reaps a given `session_ctx_id` at a time. Should be larger than the typical reap duration; too small may cause duplicate reaping after lock expiry. | +| `WATCHER_SCAN_INTERVAL` | Background watcher scan interval (seconds) | `1` | Interval for the background watcher loop. The watcher performs: (1) session heartbeat scan/reap, (2) pre-warmed pool replenishment, and (3) cleanup of expired RELEASED container records. Set to `0` to disable the watcher (you may run the scan functions via an external cron instead). | +| `RELEASED_KEY_TTL` | TTL for RELEASED container records (seconds) | `3600` | Container records in `container_mapping` with state `RELEASED` will be deleted after this TTL to prevent unbounded key growth. Set to `0` to disable cleanup. | | `MAX_SANDBOX_INSTANCES` | Maximum sandbox instances (total container cap) | `0` | Limits the total number of sandbox instances (containers) the SandboxManager can create/keep. When the current container count reaches or exceeds this value, new creation requests are denied (e.g., returning `None` or raising an exception, depending on implementation). Values: • `0`: unlimited • `N>0`: at most `N` instances Examples: • `MAX_SANDBOX_INSTANCES=20` | ##### Backend Comparison diff --git a/cookbook/zh/sandbox/advanced.md b/cookbook/zh/sandbox/advanced.md index d09dfc7d9..f92942b84 100644 --- a/cookbook/zh/sandbox/advanced.md +++ b/cookbook/zh/sandbox/advanced.md @@ -110,20 +110,21 @@ KUBECONFIG_PATH= #### Runtime Manager 设置 -| Parameter | Description | Default | Notes | -| ------------------------- | ------------------------------- | -------------------------- | ------------------------------------------------------------ | -| `DEFAULT_SANDBOX_TYPE` | 默认沙箱类型(可多个) | `base` | 可以是单个类型,也可以是多个类型的列表,从而启用多个独立的沙箱预热池。合法取值包括 `base`、`filesystem`、`browser`、`gui` 等。
支持的写法:
• 单类型:`DEFAULT_SANDBOX_TYPE=base`
• 多类型(逗号分隔):`DEFAULT_SANDBOX_TYPE=base,gui`
• 多类型(JSON 列表):`DEFAULT_SANDBOX_TYPE=["base","gui"]`
每种类型都会维护自己独立的预热池。 | -| `POOL_SIZE` | 预热容器池大小 | `1` | 缓存的容器以实现更快启动。`POOL_SIZE` 参数控制预创建并缓存在就绪状态的容器数量。当用户请求新沙箱时,系统将首先尝试从这个预热池中分配,相比从零开始创建容器显著减少启动时间。例如,使用 `POOL_SIZE=10`,系统维护 10 个就绪容器,可以立即分配给新请求 | -| `AUTO_CLEANUP` | 自动容器清理 | `True` | 如果设置为 `True`,服务器关闭后将释放所有沙箱。 | -| `CONTAINER_PREFIX_KEY` | 容器名称前缀 | `agent-runtime-container-` | 用于标识 | -| `CONTAINER_DEPLOYMENT` | 容器运行时 | `docker` | 目前支持`docker`、`k8s`、`agentrun`, `fc`、`gvisor` | -| `DEFAULT_MOUNT_DIR` | 默认挂载目录 | `sessions_mount_dir` | 用于持久存储路径,存储`/workspace` 文件 | -| `READONLY_MOUNTS` | 只读目录挂载 | `None` | 一个字典,映射 **宿主机路径** → **容器路径**,以 **只读** 方式挂载。用于共享文件 / 配置,但禁止容器修改数据。示例:
`{"\/Users\/alice\/data": "\/data"}` 会把宿主机 `/Users/alice/data` 挂载到容器的 `/data`(只读)。 | -| `PORT_RANGE` | 可用端口范围 | `[49152,59152]` | 用于服务端口分配 | -| `HEARTBEAT_TIMEOUT` | 会话心跳超时时间(秒) | `300` | 当某个 `session_ctx_id` 在该时间内没有发生任何“触达事件”(如 list_tools/call_tool/check_health/add_mcp_servers),会被判定为闲置,可被扫描任务回收(reap)。 | -| `HEARTBEAT_SCAN_INTERVAL` | 心跳扫描间隔(秒) | `0` | 后台扫描任务的执行间隔。设为 `0` 表示禁用后台 watcher(可改用外部 cron 手动调用 `scan_heartbeat_once()`)。 | -| `HEARTBEAT_LOCK_TTL` | 心跳扫描/回收分布式锁 TTL(秒) | `120` | 多实例部署时用于互斥回收同一 `session_ctx_id` 的锁过期时间,避免重复回收。应大于一次回收的典型耗时;过小可能导致锁过期后被其他实例重复回收。 | -| `MAX_SANDBOX_INSTANCES` | 最大沙盒实例数(容器总数上限) | `0` | 用于限制 SandboxManager 可创建/维持的沙盒容器总数量。当当前容器数达到或超过该值时,新的创建请求会被拒绝(例如返回 `None` 或抛异常,取决于实现)。 取值说明: • `0`:不限制 • `N>0`:最多 `N` 个容器实例 示例: • `MAX_SANDBOX_INSTANCES=20` | +| Parameter | Description | Default | Notes | +| ----------------------- | ------------------------------- | -------------------------- | ------------------------------------------------------------ | +| `DEFAULT_SANDBOX_TYPE` | 默认沙箱类型(可多个) | `base` | 可以是单个类型,也可以是多个类型的列表,从而启用多个独立的沙箱预热池。合法取值包括 `base`、`filesystem`、`browser`、`gui` 等。
支持的写法:
• 单类型:`DEFAULT_SANDBOX_TYPE=base`
• 多类型(逗号分隔):`DEFAULT_SANDBOX_TYPE=base,gui`
• 多类型(JSON 列表):`DEFAULT_SANDBOX_TYPE=["base","gui"]`
每种类型都会维护自己独立的预热池。 | +| `POOL_SIZE` | 预热容器池大小 | `1` | 缓存的容器以实现更快启动。`POOL_SIZE` 参数控制预创建并缓存在就绪状态的容器数量。当用户请求新沙箱时,系统将首先尝试从这个预热池中分配,相比从零开始创建容器显著减少启动时间。例如,使用 `POOL_SIZE=10`,系统维护 10 个就绪容器,可以立即分配给新请求 | +| `AUTO_CLEANUP` | 自动容器清理 | `True` | 如果设置为 `True`,服务器关闭后将释放所有沙箱。 | +| `CONTAINER_PREFIX_KEY` | 容器名称前缀 | `agent-runtime-container-` | 用于标识 | +| `CONTAINER_DEPLOYMENT` | 容器运行时 | `docker` | 目前支持`docker`、`k8s`、`agentrun`, `fc`、`gvisor` | +| `DEFAULT_MOUNT_DIR` | 默认挂载目录 | `sessions_mount_dir` | 用于持久存储路径,存储`/workspace` 文件 | +| `READONLY_MOUNTS` | 只读目录挂载 | `None` | 一个字典,映射 **宿主机路径** → **容器路径**,以 **只读** 方式挂载。用于共享文件 / 配置,但禁止容器修改数据。示例:
`{"\/Users\/alice\/data": "\/data"}` 会把宿主机 `/Users/alice/data` 挂载到容器的 `/data`(只读)。 | +| `PORT_RANGE` | 可用端口范围 | `[49152,59152]` | 用于服务端口分配 | +| `HEARTBEAT_TIMEOUT` | 会话心跳超时时间(秒) | `300` | 当某个 `session_ctx_id` 在该时间内没有发生任何“触达事件”(如 list_tools/call_tool/check_health/add_mcp_servers),会被判定为闲置,可被扫描任务回收(reap)。 | +| `HEARTBEAT_LOCK_TTL` | 心跳扫描/回收分布式锁 TTL(秒) | `120` | 多实例部署时用于互斥回收同一 `session_ctx_id` 的锁过期时间,避免重复回收。应大于一次回收的典型耗时;过小可能导致锁过期后被其他实例重复回收。 | +| `WATCHER_SCAN_INTERVAL` | 后台 watcher 扫描间隔(秒) | `1` | 后台 watcher 主循环间隔。watcher 会执行: 1) heartbeat 扫描与回收(reap) 2) 预热池(pool)补齐 3) 过期的 `RELEASED` 容器记录清理 设为 `0` 表示禁用 watcher(也可以用外部 cron 定时调用相关 scan 函数)。 | +| `RELEASED_KEY_TTL` | RELEASED 容器记录保留时间(秒) | `3600` | `container_mapping` 中 `state=RELEASED` 的记录在超过该 TTL 后会被删除,防止键无限增长。设为 `0` 表示不清理。 | +| `MAX_SANDBOX_INSTANCES` | 最大沙盒实例数(容器总数上限) | `0` | 用于限制 SandboxManager 可创建/维持的沙盒容器总数量。当当前容器数达到或超过该值时,新的创建请求会被拒绝(例如返回 `None` 或抛异常,取决于实现)。 取值说明: • `0`:不限制 • `N>0`:最多 `N` 个容器实例 示例: • `MAX_SANDBOX_INSTANCES=20` | ##### 后端对比 diff --git a/src/agentscope_runtime/common/container_clients/boxlite_client.py b/src/agentscope_runtime/common/container_clients/boxlite_client.py index ce8d38858..574eed879 100644 --- a/src/agentscope_runtime/common/container_clients/boxlite_client.py +++ b/src/agentscope_runtime/common/container_clients/boxlite_client.py @@ -159,7 +159,10 @@ def create( # Convert environment dict to list of tuples env_list = [] if environment: - env_list = list(environment.items()) + env_list = [ + (str(k), "" if v is None else str(v)) + for k, v in environment.items() + ] # Convert volumes to BoxLite format volume_list = [] diff --git a/src/agentscope_runtime/engine/services/sandbox/sandbox_service.py b/src/agentscope_runtime/engine/services/sandbox/sandbox_service.py index c75487ba2..44ef3ed92 100644 --- a/src/agentscope_runtime/engine/services/sandbox/sandbox_service.py +++ b/src/agentscope_runtime/engine/services/sandbox/sandbox_service.py @@ -9,11 +9,41 @@ class SandboxService(ServiceWithLifecycleManager): - def __init__(self, base_url=None, bearer_token=None): + def __init__( + self, + base_url=None, + bearer_token=None, + drain_on_stop: bool = True, + ): + """ + Create a SandboxService. + + Args: + base_url: + Sandbox manager API base URL. If None, runs in embedded mode. + bearer_token: + Bearer token used to authenticate with the sandbox manager API. + drain_on_stop: + Whether to drain (release) all sandboxes associated with this + service instance when `stop()` is called. + + - True (default): `stop()` will iterate over all known session + mappings and release all non-AgentBay sandbox environments. + This helps prevent resource leaks when the service shuts + down. + - False: `stop()` will NOT release sessions/environments. Use + this when sandboxes are meant to outlive the service process + (e.g., managed elsewhere). + + Note: In embedded mode (`base_url is None`), `stop()` will + still call `manager_api.cleanup()` to tear down embedded + resources. + """ self.manager_api = None self.base_url = base_url self.bearer_token = bearer_token self._health = False + self.drain_on_stop = drain_on_stop async def start(self) -> None: if self.manager_api is None: @@ -29,14 +59,16 @@ async def stop(self) -> None: self._health = False return - session_keys = self.manager_api.list_session_keys() - - if session_keys: - for session_ctx_id in session_keys: - env_ids = self.manager_api.get_session_mapping(session_ctx_id) - if env_ids: - for env_id in env_ids: - self.manager_api.release(env_id) + if self.drain_on_stop: + session_keys = self.manager_api.list_session_keys() + if session_keys: + for session_ctx_id in session_keys: + env_ids = self.manager_api.get_session_mapping( + session_ctx_id, + ) + if env_ids: + for env_id in env_ids: + self.manager_api.release(env_id) if self.base_url is None: # Embedded mode diff --git a/src/agentscope_runtime/sandbox/box/sandbox.py b/src/agentscope_runtime/sandbox/box/sandbox.py index 1d00666cb..1e0dd083f 100644 --- a/src/agentscope_runtime/sandbox/box/sandbox.py +++ b/src/agentscope_runtime/sandbox/box/sandbox.py @@ -4,6 +4,8 @@ import signal from typing import Any, Optional +import shortuuid + from ..enums import SandboxType from ..manager.sandbox_manager import SandboxManager from ..manager.server.app import get_config @@ -154,17 +156,25 @@ class Sandbox(SandboxBase): def __enter__(self): # Create sandbox if sandbox_id not provided if self._sandbox_id is None: + short_uuid = shortuuid.ShortUUID().uuid() + session_ctx_id = str(short_uuid) if self.workspace_dir: # bypass pool when workspace_dir is set - self._sandbox_id = self.manager_api.create( + _id = self.manager_api.create( sandbox_type=SandboxType(self.sandbox_type).value, mount_dir=self.workspace_dir, + # TODO: support bind self-define id + meta={"session_ctx_id": session_ctx_id}, ) else: - self._sandbox_id = self.manager_api.create_from_pool( + _id = self.manager_api.create_from_pool( sandbox_type=SandboxType(self.sandbox_type).value, + # TODO: support bind self-define id + meta={"session_ctx_id": session_ctx_id}, ) + self._sandbox_id = _id + if self._sandbox_id is None: raise RuntimeError( "No sandbox available. This may happen if: " @@ -217,18 +227,24 @@ def add_mcp_servers(self, server_configs: dict, overwrite=False): class SandboxAsync(SandboxBase): async def __aenter__(self): if self._sandbox_id is None: + short_uuid = shortuuid.ShortUUID().uuid() + session_ctx_id = str(short_uuid) if self.workspace_dir: - self._sandbox_id = await self.manager_api.create_async( + _id = await self.manager_api.create_async( sandbox_type=SandboxType(self.sandbox_type).value, mount_dir=self.workspace_dir, + # TODO: support bind self-define id + meta={"session_ctx_id": session_ctx_id}, ) else: - self._sandbox_id = ( - await self.manager_api.create_from_pool_async( - sandbox_type=SandboxType(self.sandbox_type).value, - ) + _id = await self.manager_api.create_from_pool_async( + sandbox_type=SandboxType(self.sandbox_type).value, + # TODO: support bind self-define id + meta={"session_ctx_id": session_ctx_id}, ) + self._sandbox_id = _id + if self._sandbox_id is None: raise RuntimeError("No sandbox available.") if self.embed_mode: diff --git a/src/agentscope_runtime/sandbox/manager/heartbeat_mixin.py b/src/agentscope_runtime/sandbox/manager/heartbeat_mixin.py index 816c30367..016ce2da7 100644 --- a/src/agentscope_runtime/sandbox/manager/heartbeat_mixin.py +++ b/src/agentscope_runtime/sandbox/manager/heartbeat_mixin.py @@ -3,26 +3,33 @@ import inspect import time import secrets -from typing import Optional +from typing import Optional, List from functools import wraps import logging from redis.exceptions import ResponseError -from ..model import ContainerModel +from ..model import ContainerModel, ContainerState logger = logging.getLogger(__name__) def touch_session(identity_arg: str = "identity"): - """ - Sugar decorator: update heartbeat for session_ctx_id derived from identity. + """Decorator factory that updates session heartbeat derived from identity. + + This decorator extracts ``identity`` (or the argument named by + ``identity_arg``) from the wrapped function call, resolves + ``session_ctx_id``, updates heartbeat, and triggers restore when needed. + + .. important:: Any exceptions raised during the touch process are ignored. + + Args: + identity_arg (`str`): + The keyword/parameter name that carries the identity. - Requirements on self: - - get_session_ctx_id_by_identity(identity) -> Optional[str] - - update_heartbeat(session_ctx_id) - - needs_restore(session_ctx_id) -> bool - - restore_session(session_ctx_id) # currently stubbed (pass) + Returns: + `callable`: + A decorator that wraps the target function (sync or async). """ def decorator(func): @@ -44,7 +51,8 @@ async def async_wrapper(self, *args, **kwargs): if session_ctx_id: self.update_heartbeat(session_ctx_id) if self.needs_restore(session_ctx_id): - self.restore_session(session_ctx_id) + if hasattr(self, "restore_session"): + self.restore_session(session_ctx_id) except Exception as e: logger.debug(f"touch_session failed (ignored): {e}") @@ -68,7 +76,8 @@ def sync_wrapper(self, *args, **kwargs): if session_ctx_id: self.update_heartbeat(session_ctx_id) if self.needs_restore(session_ctx_id): - self.restore_session(session_ctx_id) + if hasattr(self, "restore_session"): + self.restore_session(session_ctx_id) except Exception as e: logger.debug(f"touch_session failed (ignored): {e}") @@ -80,19 +89,25 @@ def sync_wrapper(self, *args, **kwargs): class HeartbeatMixin: - """ - Mixin providing: - - heartbeat timestamp read/write - - recycled (restore-required) marker - - redis distributed lock for reaping - - Host class must provide: - - self.heartbeat_mapping, self.recycled_mapping - (Mapping-like with set/get/delete) - - self.get_info(identity) -> dict compatible with ContainerModel(**dict) - - self.config.redis_enabled (bool) - - self.config.heartbeat_lock_ttl (int) - - self.redis_client (redis client or None) + """Mixin that provides heartbeat, recycle markers, and a distributed lock. + + This mixin stores heartbeat timestamps and recycle markers in + ``ContainerModel`` records persisted through ``container_mapping``. + It also supports a Redis-based distributed lock for reaping/heartbeat + operations. + + .. important:: The host class must provide required attributes/methods. + + Host class requirements: + - ``self.container_mapping`` (Mapping-like with set/get/delete/scan) + - ``self.session_mapping`` (Mapping-like with set/get/delete/scan) + - ``self.get_info(identity) -> dict`` compatible with + ``ContainerModel(**dict)`` + - ``self.config.redis_enabled`` (`bool`) + - ``self.config.heartbeat_lock_ttl`` (`int`) + - ``self.redis_client`` (redis client or ``None``) + - ``self.restore_session(session_ctx_id)`` (optional, for restore) + """ _REDIS_RELEASE_LOCK_LUA = """if redis.call("GET", KEYS[1]) == ARGV[1] then @@ -102,102 +117,317 @@ class HeartbeatMixin: end """ + def _list_container_names_by_session( + self, + session_ctx_id: str, + ) -> List[str]: + """List container names bound to the given session context id. + + Args: + session_ctx_id (`str`): + The session context id. + + Returns: + `List[str]`: + A list of container names for the session, or an empty list. + """ + if not session_ctx_id: + return [] + # session_mapping stores container_name list + try: + return self.session_mapping.get(session_ctx_id) or [] + except Exception as e: + logger.warning( + f"_list_container_names_by_session " + f"failed for session_ctx_id={session_ctx_id}: {e}", + exc_info=True, + ) + return [] + + def _load_container_model(self, identity: str) -> Optional[ContainerModel]: + """Load a `ContainerModel` from storage by container identity. + + Args: + identity (`str`): + The container identity (typically container name). + + Returns: + `Optional[ContainerModel]`: + The loaded model, or ``None`` if it cannot be loaded. + """ + try: + info_dict = self.get_info(identity) + return ContainerModel(**info_dict) + except Exception as e: + logger.debug(f"_load_container_model failed for {identity}: {e}") + return None + + def _save_container_model(self, model: ContainerModel) -> None: + """Persist a `ContainerModel` back into ``container_mapping``. + + Args: + model (`ContainerModel`): + The model to persist. + + Returns: + `None`: + No return value. + """ + # IMPORTANT: persist back into container_mapping + self.container_mapping.set(model.container_name, model.model_dump()) + # ---------- heartbeat ---------- def update_heartbeat( self, session_ctx_id: str, ts: Optional[float] = None, ) -> float: - """ - heartbeat_mapping[session_ctx_id] = last_active_timestamp - (unix seconds). + """Update heartbeat timestamp for all RUNNING containers of a session. + + The timestamp is written into ``ContainerModel.last_active_at`` and + ``updated_at`` is refreshed. + + Args: + session_ctx_id (`str`): + The session context id. + ts (`Optional[float]`, optional): + The timestamp to write. If ``None``, uses ``time.time()``. + + Returns: + `float`: + The timestamp that was written. """ if not session_ctx_id: raise ValueError("session_ctx_id is required") + ts = float(ts if ts is not None else time.time()) - self.heartbeat_mapping.set(session_ctx_id, ts) + now = time.time() + + container_names = self._list_container_names_by_session(session_ctx_id) + for cname in list(container_names): + model = self._load_container_model(cname) + if not model: + continue + + # only update heartbeat for RUNNING containers + if model.state != ContainerState.RUNNING: + continue + + model.last_active_at = ts + model.updated_at = now + + # keep session_ctx_id consistent (migration safety) + model.session_ctx_id = session_ctx_id + + self._save_container_model(model) + return ts def get_heartbeat(self, session_ctx_id: str) -> Optional[float]: - val = ( - self.heartbeat_mapping.get(session_ctx_id) - if session_ctx_id - else None - ) - return float(val) if val is not None else None + """Get session-level heartbeat as max(last_active_at) of RUNNING items. + + Args: + session_ctx_id (`str`): + The session context id. - def delete_heartbeat(self, session_ctx_id: str) -> None: - if session_ctx_id: - self.heartbeat_mapping.delete(session_ctx_id) + Returns: + `Optional[float]`: + The maximum heartbeat timestamp, or ``None`` if unavailable. + """ + if not session_ctx_id: + return None + + container_names = self._list_container_names_by_session(session_ctx_id) + last_vals = [] + for cname in list(container_names): + model = self._load_container_model(cname) + if not model: + continue + + if model.state != ContainerState.RUNNING: + continue + + if model.last_active_at is not None: + last_vals.append(float(model.last_active_at)) + + return max(last_vals) if last_vals else None # ---------- recycled marker ---------- def mark_session_recycled( self, session_ctx_id: str, ts: Optional[float] = None, + reason: str = "heartbeat_timeout", ) -> float: - """ - recycled_mapping[session_ctx_id] = recycled_timestamp (unix seconds). + """Mark all containers of a session as recycled. + + This only updates stored metadata; it does not stop/remove containers. + + Args: + session_ctx_id (`str`): + The session context id. + ts (`Optional[float]`, optional): + The recycle timestamp. If ``None``, uses ``time.time()``. + reason (`str`): + The recycle reason. + + Returns: + `float`: + The timestamp that was written. """ if not session_ctx_id: raise ValueError("session_ctx_id is required") + ts = float(ts if ts is not None else time.time()) - self.recycled_mapping.set(session_ctx_id, ts) + now = time.time() + + container_names = self._list_container_names_by_session(session_ctx_id) + for cname in list(container_names): + model = self._load_container_model(cname) + if not model: + continue + + # if already released, don't flip back + if model.state == ContainerState.RELEASED: + continue + + model.state = ContainerState.RECYCLED + model.recycled_at = ts + model.recycle_reason = reason + model.updated_at = now + + model.session_ctx_id = session_ctx_id + self._save_container_model(model) + return ts - def clear_session_recycled(self, session_ctx_id: str) -> None: - if session_ctx_id: - self.recycled_mapping.delete(session_ctx_id) + def clear_container_recycle_marker( + self, + identity: str, + *, + set_state: Optional[ContainerState] = None, + ) -> None: + """Clear recycle marker for a single container and set its state. + + This resets: + - ``recycled_at`` to ``None`` + - ``recycle_reason`` to ``None`` + + .. important:: This only updates the stored record; it does not manage + real container lifecycle and session mapping. + + Args: + identity (`str`): + The container identity. + set_state (`ContainerState`): + The state to set on the container record. + + Returns: + `None`: + No return value. + """ + model = self._load_container_model(identity) + if not model: + return + + model.recycled_at = None + model.recycle_reason = None + if set_state: + model.state = set_state + + model.updated_at = time.time() + self._save_container_model(model) def needs_restore(self, session_ctx_id: str) -> bool: + """Check whether any container in the session is marked for restore. + + A session is considered needing restore if any bound container is in + ``ContainerState.RECYCLED`` or has ``recycled_at`` set. + + Args: + session_ctx_id (`str`): + The session context id. + + Returns: + `bool`: + ``True`` if restore is needed, otherwise ``False``. + """ if not session_ctx_id: return False - return self.recycled_mapping.get(session_ctx_id) is not None - def restore_session(self, session_ctx_id: str) -> None: - """ - Stub for snapshot/restore phase. - Called when a session is marked recycled (needs_restore == True). - """ - logger.warning( - f"restore_session({session_ctx_id}) called but not implemented " - f"yet.", - ) - # NOTE: keep recycled mark for now, so future requests still - # indicate restore needed. If you prefer "warn once", uncomment next - # line: - # self.clear_session_recycled(session_ctx_id) + container_names = self._list_container_names_by_session(session_ctx_id) + for cname in list(container_names): + model = self._load_container_model(cname) + if not model: + continue + if ( + model.state == ContainerState.RECYCLED + or model.recycled_at is not None + ): + return True + return False # ---------- helpers ---------- def get_session_ctx_id_by_identity(self, identity: str) -> Optional[str]: - """ - Resolve session_ctx_id from a container identity. - Returns None if the container cannot be found (get_info raises - RuntimeError), which is an expected situation for recycled/removed - containers during heartbeat touches. + """Resolve ``session_ctx_id`` from a container identity. + + It prefers the top-level ``session_ctx_id`` field on `ContainerModel`, + and falls back to ``meta['session_ctx_id']`` for older payloads. + + Args: + identity (`str`): + The container identity. + + Returns: + `Optional[str]`: + The resolved session context id, or ``None`` if not found. """ try: info_dict = self.get_info(identity) except RuntimeError as exc: - # Missing container is a normal condition during heartbeat checks. logger.debug( - "get_session_ctx_id_by_identity: container not found for " - "identity %s: %s", - identity, - exc, + f"get_session_ctx_id_by_identity: container not found for " + f"identity {identity}: {exc}", ) + return None + info = ContainerModel(**info_dict) + + # NEW: prefer top-level field + if info.session_ctx_id: + return info.session_ctx_id + + # fallback for older payloads return (info.meta or {}).get("session_ctx_id") # ---------- redis distributed lock ---------- def _heartbeat_lock_key(self, session_ctx_id: str) -> str: + """Build the Redis key used for heartbeat locking. + + Args: + session_ctx_id (`str`): + The session context id. + + Returns: + `str`: + The redis lock key. + """ return f"heartbeat_lock:{session_ctx_id}" def acquire_heartbeat_lock(self, session_ctx_id: str) -> Optional[str]: - """ - Returns lock token if acquired, else None. - In non-redis mode returns 'inmemory'. + """Acquire a heartbeat lock for a session. + + In Redis mode, it uses ``SET key token NX EX ttl``. + In non-Redis mode, it returns a fixed token ``"inmemory"``. + + Args: + session_ctx_id (`str`): + The session context id. + + Returns: + `Optional[str]`: + The lock token if acquired, otherwise ``None``. """ if not self.config.redis_enabled or self.redis_client is None: return "inmemory" @@ -213,6 +443,23 @@ def acquire_heartbeat_lock(self, session_ctx_id: str) -> Optional[str]: return token if ok else None def release_heartbeat_lock(self, session_ctx_id: str, token: str) -> bool: + """Release a heartbeat lock if the token matches. + + It uses a Lua script to ensure only the owner token can release the + lock. + If Redis does not support ``EVAL``, it falls back to a GET+DEL check. + + Args: + session_ctx_id (`str`): + The session context id. + token (`str`): + The lock token returned by `acquire_heartbeat_lock`. + + Returns: + `bool`: + ``True`` if the lock was released (or non-Redis mode), else + ``False``. + """ if not self.config.redis_enabled or self.redis_client is None: return True diff --git a/src/agentscope_runtime/sandbox/manager/sandbox_manager.py b/src/agentscope_runtime/sandbox/manager/sandbox_manager.py index 3dad5b967..29e196b78 100644 --- a/src/agentscope_runtime/sandbox/manager/sandbox_manager.py +++ b/src/agentscope_runtime/sandbox/manager/sandbox_manager.py @@ -33,6 +33,7 @@ ) from ..model import ( ContainerModel, + ContainerState, SandboxManagerEnvConfig, ) from ..registry import SandboxRegistry @@ -221,24 +222,6 @@ def __init__( self.redis_client, prefix="session_mapping", ) - # Heartbeat mapping: - # Key: session_ctx_id (str) - # Value: last_active_timestamp (float, unix seconds) - # Used to determine whether a session is idle and should be reaped. - self.heartbeat_mapping = RedisMapping( - self.redis_client, - prefix="heartbeat_mapping", - ) - # Recycled/restore-required mapping: - # Key: session_ctx_id (str) - # Value: recycled_timestamp (float, unix seconds) or truthy - # marker - # Set when a session is reaped. Next user request should trigger - # "restore session" flow (stubbed in this iteration). - self.recycled_mapping = RedisMapping( - self.redis_client, - prefix="recycled_mapping", - ) # Init multi sand box pool for t in self.default_type: @@ -248,9 +231,6 @@ def __init__( self.redis_client = None self.container_mapping = InMemoryMapping() self.session_mapping = InMemoryMapping() - # See comments in Redis branch for semantics. - self.heartbeat_mapping = InMemoryMapping() - self.recycled_mapping = InMemoryMapping() # Init multi sand box pool for t in self.default_type: @@ -277,16 +257,9 @@ def __init__( else: self.storage = LocalStorage() - if self.pool_size > 0: - self._init_container_pool() - - self.heartbeat_timeout = self.config.heartbeat_timeout - self.heartbeat_scan_interval = self.config.heartbeat_scan_interval - self.heartbeat_lock_ttl = self.config.heartbeat_lock_ttl - - self._heartbeat_stop_event = threading.Event() - self._heartbeat_thread = None - self._heartbeat_thread_lock = threading.Lock() + self._watcher_stop_event = threading.Event() + self._watcher_thread = None + self._watcher_thread_lock = threading.Lock() logger.debug(str(config)) @@ -297,7 +270,7 @@ def __enter__(self): ) # local mode: watcher starts if self.http_session is None: - self.start_heartbeat_watcher() + self.start_watcher() return self @@ -305,7 +278,7 @@ def __exit__(self, exc_type, exc_value, traceback): logger.debug( "Exiting SandboxManager context (sync). Cleaning up resources.", ) - self.stop_heartbeat_watcher() + self.stop_watcher() self.cleanup() @@ -334,7 +307,7 @@ async def __aenter__(self): ) # local mode: watcher starts if self.http_session is None: - self.start_heartbeat_watcher() + self.start_watcher() return self @@ -342,7 +315,7 @@ async def __aexit__(self, exc_type, exc_value, tb): logger.debug( "Exiting SandboxManager context (async). Cleaning up resources.", ) - self.stop_heartbeat_watcher() + self.stop_watcher() await self.cleanup_async() @@ -361,6 +334,7 @@ async def __aexit__(self, exc_type, exc_value, tb): logger.warning(f"Error closing httpx_client: {e}") def _generate_container_key(self, session_id): + # TODO: refactor this and mapping, use sandbox_id as identity return f"{self.prefix}{session_id}" def _make_request(self, method: str, endpoint: str, data: dict): @@ -463,126 +437,140 @@ async def _make_request_async( return response.json() - def _init_container_pool(self): - """ - Init runtime pool - """ - for t in self.default_type: - queue = self.pool_queues[t] - while queue.size() < self.pool_size: - try: - container_name = self.create(sandbox_type=t.value) - container_model = self.container_mapping.get( - container_name, - ) - if container_model: - # Check the pool size again to avoid race condition - if queue.size() < self.pool_size: - queue.enqueue(container_model) - else: - # The pool size has reached the limit - self.release(container_name) - break - else: - logger.error("Failed to create container for pool") - break - except Exception as e: - logger.error(f"Error initializing runtime pool: {e}") - break - - def start_heartbeat_watcher(self) -> bool: + def start_watcher(self) -> bool: """ Start background heartbeat scanning thread. Default: not started automatically. Caller must invoke explicitly. - If heartbeat_scan_interval == 0 => disabled, returns False. + If watcher_scan_interval == 0 => disabled, returns False. """ - interval = int(self.config.heartbeat_scan_interval) + interval = int(self.config.watcher_scan_interval) if interval <= 0: logger.info( - "heartbeat watcher disabled (heartbeat_scan_interval <= 0)", + "Watcher disabled (watcher_scan_interval <= 0)", ) return False - with self._heartbeat_thread_lock: - if self._heartbeat_thread and self._heartbeat_thread.is_alive(): + with self._watcher_thread_lock: + if self._watcher_thread and self._watcher_thread.is_alive(): return True # already running - self._heartbeat_stop_event.clear() + self._watcher_stop_event.clear() def _loop(): - logger.info(f"heartbeat watcher started, interval={interval}s") - while not self._heartbeat_stop_event.is_set(): + logger.info(f"Watcher started, interval={interval}s") + while not self._watcher_stop_event.is_set(): try: - metrics = self.scan_heartbeat_once() - logger.debug(f"heartbeat scan metrics: {metrics}") + hb = self.scan_heartbeat_once() + pool = self.scan_pool_once() + gc = self.scan_released_cleanup_once() + + logger.debug( + "watcher metrics: " + f"heartbeat={hb}, pool={pool}, released_gc={gc}", + ) except Exception as e: - logger.warning(f"heartbeat watcher loop error: {e}") + logger.warning(f"Watcher loop error: {e}") logger.debug(traceback.format_exc()) # wait with stop support - self._heartbeat_stop_event.wait(interval) + self._watcher_stop_event.wait(interval) - logger.info("heartbeat watcher stopped") + logger.info("Watcher stopped") t = threading.Thread( target=_loop, - name="heartbeat-watcher", + name="watcher", daemon=True, ) - self._heartbeat_thread = t + self._watcher_thread = t t.start() return True - def stop_heartbeat_watcher(self, join_timeout: float = 5.0) -> None: + def stop_watcher(self, join_timeout: float = 5.0) -> None: """ Stop background watcher thread (if running). """ - with self._heartbeat_thread_lock: - self._heartbeat_stop_event.set() - t = self._heartbeat_thread + with self._watcher_thread_lock: + self._watcher_stop_event.set() + t = self._watcher_thread if t and t.is_alive(): t.join(timeout=join_timeout) - with self._heartbeat_thread_lock: - if self._heartbeat_thread is t: - self._heartbeat_thread = None + with self._watcher_thread_lock: + if self._watcher_thread is t: + self._watcher_thread = None @remote_wrapper() def cleanup(self): - logger.debug( - "Cleaning up resources.", - ) + """ + Destroy all non-terminal containers managed by this SandboxManager. + + Behavior (local mode): + - Dequeues and destroys containers from the warm pool (WARM/RUNNING). + - Scans container_mapping and destroys any remaining non-terminal + containers. + - Does NOT delete ContainerModel records from container_mapping; + instead it relies on release() to mark them as terminal (RELEASED). + - Skips containers already in terminal states: RELEASED / RECYCLED. + + Notes: + - Uses container_name as identity to avoid ambiguity with session_id. + - Pool containers (WARM) are also destroyed (per current policy). + """ + logger.debug("Cleaning up resources.") - # Clean up pool first + # Clean up pool first (destroy warm/running containers; skip + # terminal states) for queue in self.pool_queues.values(): try: while queue.size() > 0: container_json = queue.dequeue() - if container_json: - container_model = ContainerModel(**container_json) - logger.debug( - f"Destroy container" - f" {container_model.container_id}", - ) - self.release(container_model.session_id) + if not container_json: + continue + + container_model = ContainerModel(**container_json) + + # Terminal states: already cleaned logically + if container_model.state in ( + ContainerState.RELEASED, + ContainerState.RECYCLED, + ): + continue + + logger.debug( + f"Destroy pool container" + f" {container_model.container_id} " + f"({container_model.container_name})", + ) + # Use container_name to avoid ambiguity + self.release(container_model.container_name) except Exception as e: logger.error(f"Error cleaning up runtime pool: {e}") - # Clean up rest container + # Clean up remaining containers in mapping for key in self.container_mapping.scan(self.prefix): try: container_json = self.container_mapping.get(key) - if container_json: - container_model = ContainerModel(**container_json) - logger.debug( - f"Destroy container {container_model.container_id}", - ) - self.release(container_model.session_id) - except Exception as e: - logger.error( - f"Error cleaning up container {key}: {e}", + if not container_json: + continue + + container_model = ContainerModel(**container_json) + + # Terminal states: already cleaned logically + if container_model.state in ( + ContainerState.RELEASED, + ContainerState.RECYCLED, + ): + continue + + logger.debug( + f"Destroy container {container_model.container_id} " + f"({container_model.container_name})", ) + self.release(container_model.container_name) + except Exception as e: + logger.error(f"Error cleaning up container {key}: {e}") @remote_wrapper_async() async def cleanup_async(self, *args, **kwargs): @@ -600,113 +588,104 @@ def create_from_pool(self, sandbox_type=None, meta: Optional[Dict] = None): queue = self.pool_queues[sandbox_type] - cnt = 0 - try: - while True: - if cnt > self.pool_size: - raise RuntimeError( - "No container available in pool after check the pool.", - ) - cnt += 1 - - # Add a new one to container - container_name = self.create(sandbox_type=sandbox_type) - new_container_model = self.container_mapping.get( - container_name, - ) - - if new_container_model: - queue.enqueue( - new_container_model, - ) - - container_json = queue.dequeue() + def _bind_meta(container_model: ContainerModel): + if not meta: + return - if not container_json: - raise RuntimeError( - "No container available in pool.", - ) - - container_model = ContainerModel(**container_json) - - # Add meta field to container - if meta and not container_model.meta: - container_model.meta = meta - self.container_mapping.set( - container_model.container_name, - container_model.model_dump(), - ) + session_ctx_id = meta.get("session_ctx_id") - # Update session mapping + first heartbeat - # (only when session_ctx_id exists) - session_ctx_id = meta.get("session_ctx_id") - if session_ctx_id: - env_ids = ( - self.session_mapping.get( - session_ctx_id, - ) - or [] - ) - if container_model.container_name not in env_ids: - env_ids.append(container_model.container_name) + container_model.meta = meta + container_model.session_ctx_id = session_ctx_id + container_model.state = ( + ContainerState.RUNNING + if session_ctx_id + else ContainerState.WARM + ) + container_model.recycled_at = None + container_model.recycle_reason = None + container_model.updated_at = time.time() - # Treat "allocated from pool to a session" as first - # activity: ensure heartbeat is updated before the - # session mapping is persisted, so we never expose a - # session->container binding without a fresh heartbeat. - self.update_heartbeat(session_ctx_id) + # persist first + self.container_mapping.set( + container_model.container_name, + container_model.model_dump(), + ) - # If this session was previously reaped, - # clear restore-required marker before persisting the - # updated session mapping. - self.clear_session_recycled(session_ctx_id) + # session mapping + first heartbeat only when session_ctx_id exists + if session_ctx_id: + env_ids = self.session_mapping.get(session_ctx_id) or [] + if container_model.container_name not in env_ids: + env_ids.append(container_model.container_name) - self.session_mapping.set(session_ctx_id, env_ids) + self.session_mapping.set(session_ctx_id, env_ids) - logger.debug( - f"Retrieved container from pool:" - f" {container_model.session_id}", + self.clear_container_recycle_marker( + container_model.container_name, + set_state=ContainerState.RUNNING, ) + self.update_heartbeat(session_ctx_id) + + try: + # 1) Try dequeue first + container_json = queue.dequeue() + if container_json: + container_model = ContainerModel(**container_json) + # version check if ( container_model.version - != SandboxRegistry.get_image_by_type( - sandbox_type, - ) + != SandboxRegistry.get_image_by_type(sandbox_type) ): logger.warning( f"Container {container_model.session_id} outdated, " - f"trying next one in pool", + "dropping it", ) - self.release(container_model.session_id) - continue + self.release(container_model.container_name) + container_json = None + else: + # inspect + status check + if ( + self.client.inspect( + container_model.container_id, + ) + is None + ): + logger.warning( + f"Container {container_model.container_id} not " + f"found, dropping it", + ) + self.release(container_model.container_name) + container_json = None + else: + status = self.client.get_status( + container_model.container_id, + ) + if status != "running": + logger.warning( + f"Container {container_model.container_id} " + f"not running ({status}), dropping it", + ) + self.release(container_model.container_name) + container_json = None - if self.client.inspect(container_model.container_id) is None: - logger.warning( - f"Container {container_model.container_id} not found " - f"or unexpected error happens.", + # if still valid, bind meta and return + if container_json: + _bind_meta(container_model) + logger.debug( + f"Retrieved container from pool:" + f" {container_model.session_id}", ) - continue - - if ( - self.client.get_status(container_model.container_id) - == "running" - ): return container_model.container_name - else: - logger.error( - f"Container {container_model.container_id} is not " - f"running. Trying next one in pool.", - ) - # Destroy the stopped container - self.release(container_model.session_id) + + # 2) Pool empty or invalid -> create a new one and return + return self.create(sandbox_type=sandbox_type.value, meta=meta) except Exception as e: logger.warning( "Error getting container from pool, create a new one.", ) logger.debug(f"{e}: {traceback.format_exc()}") - return self.create() + return self.create(sandbox_type=sandbox_type.value, meta=meta) @remote_wrapper_async() async def create_from_pool_async(self, *args, **kwargs): @@ -726,14 +705,23 @@ def create( try: limit = self.config.max_sandbox_instances if limit > 0: - # TODO: Avoid SCAN+len(list(...)) here; maintain an atomic - # Redis counter (INCR/DECR or Lua) for O(1) instance limit - # checks. - current = len(list(self.container_mapping.scan(self.prefix))) - if current >= limit: - raise RuntimeError( - f"Max sandbox instances reached: {current}/{limit}", - ) + # Count only ACTIVE containers; exclude terminal states + active_states = { + ContainerState.WARM, + ContainerState.RUNNING, + } + current = 0 + for key in self.container_mapping.scan(self.prefix): + try: + container_json = self.container_mapping.get(key) + if not container_json: + continue + cm = ContainerModel(**container_json) + if cm.state in active_states: + current += 1 + except Exception: + # ignore broken records + continue except RuntimeError as e: logger.warning(str(e)) return None @@ -742,6 +730,10 @@ def create( logger.exception("Failed to check sandbox instance limit") return None + session_ctx_id = None + if meta and meta.get("session_ctx_id"): + session_ctx_id = meta["session_ctx_id"] + if sandbox_type is not None: target_sandbox_type = SandboxType(sandbox_type) else: @@ -885,6 +877,12 @@ def create( version=image, meta=meta or {}, timeout=config.timeout, + sandbox_type=target_sandbox_type.value, + session_ctx_id=session_ctx_id, + state=ContainerState.RUNNING + if session_ctx_id + else ContainerState.WARM, + updated_at=time.time(), ) # Register in mapping @@ -912,7 +910,10 @@ def create( self.update_heartbeat(session_ctx_id) # Session is now alive again; clear restore-required marker - self.clear_session_recycled(session_ctx_id) + self.clear_container_recycle_marker( + container_model.container_name, + set_state=ContainerState.RUNNING, + ) logger.debug( f"Created container {container_name}" @@ -949,11 +950,11 @@ def release(self, identity): container_info = ContainerModel(**container_json) - # remove key in mapping before we remove container - self.container_mapping.delete(container_json.get("container_name")) + # remove session key in mapping + session_ctx_id = container_info.session_ctx_id or ( + container_info.meta or {} + ).get("session_ctx_id") - # remove key in mapping - session_ctx_id = container_info.meta.get("session_ctx_id") if session_ctx_id: env_ids = self.session_mapping.get(session_ctx_id) or [] env_ids = [ @@ -967,23 +968,41 @@ def release(self, identity): # last container of this session is gone; # keep state consistent self.session_mapping.delete(session_ctx_id) - try: - self.delete_heartbeat(session_ctx_id) - except Exception as e: - logger.debug( - f"delete_heartbeat failed for {session_ctx_id}:" - f" {e}", - ) - try: - self.clear_session_recycled(session_ctx_id) - except Exception as e: - logger.debug( - f"clear_session_recycled failed for" - f" {session_ctx_id}: {e}", - ) - self.client.stop(container_info.container_id, timeout=1) - self.client.remove(container_info.container_id, force=True) + # Mark released (do NOT delete mapping) in model + now = time.time() + container_info.state = ContainerState.RELEASED + container_info.released_at = now + container_info.updated_at = now + container_info.recycled_at = None + container_info.recycle_reason = None + + # Unbind session in model + container_info.session_ctx_id = None + if container_info.meta is None: + container_info.meta = {} + container_info.meta.pop("session_ctx_id", None) + + self.container_mapping.set( + container_info.container_name, + container_info.model_dump(), + ) + + try: + self.client.stop(container_info.container_id, timeout=1) + except Exception as e: + logger.debug( + f"release stop ignored for" + f" {container_info.container_id}: {e}", + ) + + try: + self.client.remove(container_info.container_id, force=True) + except Exception as e: + logger.debug( + f"release remove ignored for" + f" {container_info.container_id}: {e}", + ) logger.debug(f"Container for {identity} destroyed.") @@ -1250,39 +1269,177 @@ def reap_session( try: env_ids = self.get_session_mapping(session_ctx_id) or [] - # (virtual hook) snapshot/save state before releasing - not - # implemented yet - # self.save_session_snapshot( - # session_ctx_id, - # env_ids, - # reason=reason, - # ) - for container_name in list(env_ids): + now = time.time() try: - self.release(container_name) + info = ContainerModel(**self.get_info(container_name)) + + # stop/remove actual container + try: + self.client.stop(info.container_id, timeout=1) + except Exception as e: + logger.debug( + f"Failed to stop container " + f"{info.container_id}: {e}", + ) + try: + self.client.remove(info.container_id, force=True) + except Exception as e: + logger.debug( + f"Failed to remove container " + f"{info.container_id}: {e}", + ) + + # upload storage if needed + if info.mount_dir and info.storage_path: + try: + self.storage.upload_folder( + info.mount_dir, + info.storage_path, + ) + except Exception as e: + logger.warning( + f"upload_folder failed for {container_name}:" + f" {e}", + ) + + # mark recycled, keep model + info.state = ContainerState.RECYCLED + info.recycled_at = now + info.recycle_reason = reason + info.updated_at = now + + # keep session_ctx_id for restore + info.session_ctx_id = session_ctx_id + if info.meta is None: + info.meta = {} + info.meta["session_ctx_id"] = session_ctx_id + + self.container_mapping.set( + info.container_name, + info.model_dump(), + ) + except Exception as e: logger.warning( - f"Failed to release container {container_name} for " + f"Failed to recycle container {container_name} for " f"session {session_ctx_id}: {e}", ) - # Mark session as recycled -> next request should go restore - # flow (stub) - self.mark_session_recycled(session_ctx_id) - - # Heartbeat no longer meaningful after reap - self.delete_heartbeat(session_ctx_id) - - # Ensure mapping is cleared even if some releases failed - self.session_mapping.delete(session_ctx_id) - return True except Exception as e: logger.warning(f"Failed to reap session {session_ctx_id}: {e}") logger.debug(traceback.format_exc()) return False + def restore_session(self, session_ctx_id: str) -> None: + """ + Restore ALL recycled sandboxes (containers) for a session. + + For each container record with state==RECYCLED in session_mapping[ + session_ctx_id]: + - If mount_dir is empty -> allocate from pool + (prefer same sandbox_type). + - If mount_dir exists -> create a new container with that + mount_dir/storage_path. + - Bind new container to this session and mark RUNNING. + - Archive the old recycled record (mark RELEASED). + + After restore: + - session_mapping[session_ctx_id] will be replaced with the list of + NEW running containers. + """ + env_ids = self.get_session_mapping(session_ctx_id) or [] + if not env_ids: + return + + new_container_names: list[str] = [] + recycled_old_names: list[str] = [] + + # 1) restore each recycled container + for old_name in list(env_ids): + try: + old = ContainerModel(**self.get_info(old_name)) + except Exception: + continue + + if old.state != ContainerState.RECYCLED: + # keep non-recycled entries as-is (optional). In practice + # env_ids should be recycled only. + continue + + sandbox_type = old.sandbox_type or self.default_type[0].value + meta = { + "session_ctx_id": session_ctx_id, + } + + # allocate new container + if not old.mount_dir: + new_name = self.create_from_pool( + sandbox_type=sandbox_type, + meta=meta, + ) + else: + new_name = self.create( + sandbox_type=sandbox_type, + meta=meta, + mount_dir=old.mount_dir, + storage_path=old.storage_path, + ) + + if not new_name: + logger.warning( + f"restore_session: failed to restore container {old_name} " + f"for session {session_ctx_id}", + ) + continue + + recycled_old_names.append(old_name) + new_container_names.append(new_name) + + # ensure new container is marked RUNNING + bound + try: + new_cm = ContainerModel(**self.get_info(new_name)) + now = time.time() + new_cm.state = ContainerState.RUNNING + new_cm.session_ctx_id = session_ctx_id + if new_cm.meta is None: + new_cm.meta = {} + new_cm.meta["session_ctx_id"] = session_ctx_id + new_cm.meta["sandbox_type"] = sandbox_type + new_cm.recycled_at = None + new_cm.recycle_reason = None + new_cm.updated_at = now + self.container_mapping.set( + new_cm.container_name, + new_cm.model_dump(), + ) + except Exception as e: + logger.warning( + f"restore_session: failed to mark new container running:" + f" {e}", + ) + + if not new_container_names: + # nothing restored + return + + # 2) switch session mapping to restored running containers + self.session_mapping.set(session_ctx_id, new_container_names) + + # 3) heartbeat after restore (session-level) + self.update_heartbeat(session_ctx_id) + + # 4) archive old recycled records so needs_restore becomes False + for old_name in recycled_old_names: + try: + self.container_mapping.delete(old_name) + except Exception as e: + logger.warning( + f"restore_session: failed to delete old model" + f" {old_name}: {e}", + ) + def scan_heartbeat_once(self) -> dict: """ Scan all session_ctx_id in session_mapping and reap those idle @@ -1295,6 +1452,7 @@ def scan_heartbeat_once(self) -> dict: "scanned_sessions": 0, "reaped_sessions": 0, "skipped_no_heartbeat": 0, + "skipped_no_running_containers": 0, "skipped_lock_busy": 0, "skipped_not_idle_after_double_check": 0, "errors": 0, @@ -1303,6 +1461,24 @@ def scan_heartbeat_once(self) -> dict: for session_ctx_id in list(self.session_mapping.scan()): result["scanned_sessions"] += 1 + has_running = False + try: + env_ids = self.get_session_mapping(session_ctx_id) or [] + for cname in list(env_ids): + try: + cm = ContainerModel(**self.get_info(cname)) + except Exception: + continue + if cm.state == ContainerState.RUNNING: + has_running = True + break + except Exception: + has_running = False + + if not has_running: + result["skipped_no_running_containers"] += 1 + continue + last_active = self.get_heartbeat(session_ctx_id) if last_active is None: result["skipped_no_heartbeat"] += 1 @@ -1347,8 +1523,123 @@ def scan_heartbeat_once(self) -> dict: return result - async def scan_heartbeat_once_async(self) -> dict: + def scan_pool_once(self) -> dict: + """ + Replenish warm pool for each sandbox_type up to pool_size. + + Note: + - No distributed lock by design (multi-instance may overfill slightly). + - Pool containers are WARM (no session_ctx_id). """ - Async convenience wrapper (internal use). Not a remote API. + result = { + "types": 0, + "created": 0, + "enqueued": 0, + "failed_create": 0, + "skipped_pool_disabled": 0, + } + + if self.pool_size <= 0: + result["skipped_pool_disabled"] = 1 + return result + + for t in self.default_type: + result["types"] += 1 + queue = self.pool_queues.get(t) + if queue is None: + continue + + try: + need = int(self.pool_size - queue.size()) + except Exception: + # if queue.size() fails for any reason, skip this type + continue + + if need <= 0: + continue + + for _ in range(need): + try: + # create a WARM container (no session_ctx_id) + container_name = self.create( + sandbox_type=t.value, + meta=None, + ) + if not container_name: + result["failed_create"] += 1 + continue + + cm_json = self.container_mapping.get(container_name) + if not cm_json: + result["failed_create"] += 1 + continue + + queue.enqueue(cm_json) + result["created"] += 1 + result["enqueued"] += 1 + except Exception: + result["failed_create"] += 1 + logger.debug(traceback.format_exc()) + + return result + + def scan_released_cleanup_once(self, max_delete: int = 200) -> dict: """ - return await asyncio.to_thread(self.scan_heartbeat_once) + Delete container_mapping records whose state == RELEASED and expired. + + TTL is config.released_key_ttl seconds. 0 disables cleanup. + """ + ttl = int(getattr(self.config, "released_key_ttl", 0)) + result = { + "ttl": ttl, + "scanned": 0, + "deleted": 0, + "skipped_ttl_disabled": 0, + "skipped_not_expired": 0, + "skipped_not_released": 0, + "errors": 0, + } + + if ttl <= 0: + result["skipped_ttl_disabled"] = 1 + return result + + now = time.time() + + for key in self.container_mapping.scan(self.prefix): + if result["deleted"] >= max_delete: + break + + result["scanned"] += 1 + try: + container_json = self.container_mapping.get(key) + if not container_json: + continue + + cm = ContainerModel(**container_json) + + if cm.state != ContainerState.RELEASED: + result["skipped_not_released"] += 1 + continue + + released_at = cm.released_at or cm.updated_at or 0 + if released_at <= 0: + # no timestamp -> treat as not expired + result["skipped_not_expired"] += 1 + continue + + if now - released_at <= ttl: + result["skipped_not_expired"] += 1 + continue + + self.container_mapping.delete(cm.container_name) + result["deleted"] += 1 + + except Exception as e: + result["errors"] += 1 + logger.debug( + f"scan_released_cleanup_once: {e}," + f" {traceback.format_exc()}", + ) + + return result diff --git a/src/agentscope_runtime/sandbox/manager/server/app.py b/src/agentscope_runtime/sandbox/manager/server/app.py index 6e093737a..4e41326d6 100644 --- a/src/agentscope_runtime/sandbox/manager/server/app.py +++ b/src/agentscope_runtime/sandbox/manager/server/app.py @@ -105,8 +105,9 @@ def get_config() -> SandboxManagerEnvConfig: fc_log_project=settings.FC_LOG_PROJECT, fc_log_store=settings.FC_LOG_STORE, heartbeat_timeout=settings.HEARTBEAT_TIMEOUT, - heartbeat_scan_interval=settings.HEARTBEAT_SCAN_INTERVAL, heartbeat_lock_ttl=settings.HEARTBEAT_LOCK_TTL, + watcher_scan_interval=settings.WATCHER_SCAN_INTERVAL, + released_key_ttl=settings.RELEASE_KET_TTL, max_sandbox_instances=settings.MAX_SANDBOX_INSTANCES, ) return _config @@ -212,7 +213,7 @@ async def startup_event(): register_routes(app, _sandbox_manager) # Start heartbeat watcher on server side - _sandbox_manager.start_heartbeat_watcher() + _sandbox_manager.start_watcher() @app.on_event("shutdown") @@ -224,7 +225,7 @@ async def shutdown_event(): return # stop watcher first - _sandbox_manager.stop_heartbeat_watcher() + _sandbox_manager.stop_watcher() if settings.AUTO_CLEANUP: _sandbox_manager.cleanup() @@ -368,13 +369,6 @@ def setup_logging(log_level: str): level = level_mapping.get(log_level.upper(), logging.INFO) - # Reconfigure logging - logging.basicConfig( - level=level, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - force=True, # This will reconfigure existing loggers - ) - # Update the logger for this module global logger logger.setLevel(level) diff --git a/src/agentscope_runtime/sandbox/manager/server/config.py b/src/agentscope_runtime/sandbox/manager/server/config.py index ecf87b0dc..e6a56278b 100644 --- a/src/agentscope_runtime/sandbox/manager/server/config.py +++ b/src/agentscope_runtime/sandbox/manager/server/config.py @@ -101,13 +101,13 @@ class Settings(BaseSettings): # Heartbeat related HEARTBEAT_TIMEOUT: int = 300 - HEARTBEAT_SCAN_INTERVAL: int = 0 # 0 to disable heartbeat check HEARTBEAT_LOCK_TTL: int = 120 + WATCHER_SCAN_INTERVAL: int = 1 # 0 to disable watcher + RELEASE_KET_TTL: int = 3600 MAX_SANDBOX_INSTANCES: int = 0 # 0 means unlimited model_config = ConfigDict( - case_sensitive=True, extra="allow", ) diff --git a/src/agentscope_runtime/sandbox/model/__init__.py b/src/agentscope_runtime/sandbox/model/__init__.py index 25eb5f555..65873aa8a 100644 --- a/src/agentscope_runtime/sandbox/model/__init__.py +++ b/src/agentscope_runtime/sandbox/model/__init__.py @@ -1,8 +1,9 @@ # -*- coding: utf-8 -*- -from .container import ContainerModel +from .container import ContainerModel, ContainerState from .manager_config import SandboxManagerEnvConfig __all__ = [ "ContainerModel", + "ContainerState", "SandboxManagerEnvConfig", ] diff --git a/src/agentscope_runtime/sandbox/model/container.py b/src/agentscope_runtime/sandbox/model/container.py index c6dd93a9c..23ac74e64 100644 --- a/src/agentscope_runtime/sandbox/model/container.py +++ b/src/agentscope_runtime/sandbox/model/container.py @@ -1,10 +1,23 @@ # -*- coding: utf-8 -*- +import copy +import time +from enum import Enum from typing import List, Optional, Dict -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ConfigDict, model_validator + + +class ContainerState(str, Enum): + WARM = "warm" + RUNNING = "running" + RECYCLED = "recycled" + ERROR = "error" + RELEASED = "released" class ContainerModel(BaseModel): + model_config = ConfigDict(extra="allow") + session_id: str = Field( ..., description="Unique identifier for the session", @@ -59,5 +72,79 @@ class ContainerModel(BaseModel): ge=0, ) - class Config: - extra = "allow" + sandbox_type: Optional[str] = Field( + default=None, + description="Sandbox type (e.g. base/browser/...). Usually " + "SandboxType.value.", + ) + + state: ContainerState = Field( + default=ContainerState.RUNNING, + description="Lifecycle state", + ) + + # Pull session_ctx_id up from meta for easier indexing/logic + session_ctx_id: Optional[str] = Field( + default=None, + description="Bound session context id " + "(copied from meta['session_ctx_id'] for compatibility)", + ) + + # Heartbeat timestamp (unix seconds) + last_active_at: Optional[float] = Field( + default=None, + description="Last activity timestamp (unix seconds)", + ) + + # Recycle/release timestamps + recycled_at: Optional[float] = Field( + default=None, + description="Recycled timestamp (unix seconds)", + ) + released_at: Optional[float] = Field( + default=None, + description="Released timestamp (unix seconds)", + ) + updated_at: Optional[float] = Field( + default=None, + description="Last model update timestamp (unix seconds)", + ) + + recycle_reason: Optional[str] = Field( + default=None, + description="Reason for recycle", + ) + + @model_validator(mode="after") + def _compat_and_defaults(self): + """Compatibility layer for ContainerModel. + + This validator ensures backward compatibility and default value + population: + - Reads session_ctx_id from meta if not provided + - Writes session_ctx_id back to meta for old code compatibility + - Ensures updated_at is always populated + + Returns: + `ContainerModel`: + The validated model instance + """ + # normalize meta + if self.meta is None: + self.meta = {} + + # meta -> session_ctx_id + if not self.session_ctx_id: + v = self.meta.get("session_ctx_id") + if v: + self.session_ctx_id = v + + # session_ctx_id -> meta + if self.session_ctx_id: + self.meta["session_ctx_id"] = copy.deepcopy(self.session_ctx_id) + + # default updated_at + if self.updated_at is None: + self.updated_at = time.time() + + return self diff --git a/src/agentscope_runtime/sandbox/model/manager_config.py b/src/agentscope_runtime/sandbox/model/manager_config.py index 79404cf98..8f528bc89 100644 --- a/src/agentscope_runtime/sandbox/model/manager_config.py +++ b/src/agentscope_runtime/sandbox/model/manager_config.py @@ -254,16 +254,28 @@ class SandboxManagerEnvConfig(BaseModel): description="Idle timeout in seconds before session is reaped.", gt=0, ) - heartbeat_scan_interval: int = Field( - default=0, - description="Heartbeat scan interval in seconds. 0 disables scanning.", - ge=0, - ) heartbeat_lock_ttl: int = Field( default=120, description="Redis distributed lock TTL in seconds for reaping.", gt=0, ) + watcher_scan_interval: int = Field( + default=1, + description=( + "Background watcher loop interval in seconds. " + "0 disables watcher. Watcher includes heartbeat scan, pool " + "replenish, and released-record cleanup." + ), + ge=0, + ) + released_key_ttl: int = Field( + default=3600, + description=( + "TTL in seconds for keeping RELEASED container records in " + "container_mapping. 0 disables cleanup." + ), + ge=0, + ) max_sandbox_instances: int = Field( default=0, description="Maximum number of sandbox instances allowed. " diff --git a/tests/sandbox/test_heartbeat.py b/tests/sandbox/test_heartbeat.py index acb27acad..e3c38d422 100644 --- a/tests/sandbox/test_heartbeat.py +++ b/tests/sandbox/test_heartbeat.py @@ -1,364 +1,258 @@ # -*- coding: utf-8 -*- -# pylint: disable=unused-argument, redefined-outer-name +# pylint: disable=unused-argument,protected-access,redefined-outer-name import time -import threading - import pytest -import fakeredis -from agentscope_runtime.common.collections import InMemoryMapping, RedisMapping -from agentscope_runtime.sandbox.manager.heartbeat_mixin import ( - HeartbeatMixin, - touch_session, +from agentscope_runtime.sandbox.manager.sandbox_manager import SandboxManager +from agentscope_runtime.sandbox.model import ( + SandboxManagerEnvConfig, + ContainerModel, + ContainerState, ) +from agentscope_runtime.sandbox.enums import SandboxType -class _FakeConfig: - def __init__( - self, - redis_enabled: bool = False, - heartbeat_timeout: int = 1, - heartbeat_scan_interval: int = 0, - heartbeat_lock_ttl: int = 3, - ): - self.redis_enabled = redis_enabled - self.heartbeat_timeout = heartbeat_timeout - self.heartbeat_scan_interval = heartbeat_scan_interval - self.heartbeat_lock_ttl = heartbeat_lock_ttl - - -class FakeManager(HeartbeatMixin): +class StubContainerClient: """ - One class supports both: - - in-memory (old tests) - - redis (new tests) + Stub docker client: + - create/start/stop/remove/inspect/get_status are enough for SandboxManager """ - def __init__( + def __init__(self): + self._by_id = {} # id -> dict + self._by_name = {} # name -> id + self._next = 1 + self.stopped = [] + self.removed = [] + + def create( self, - mode: str = "memory", - redis_client=None, - prefix: str = "t:", + image, + name, + ports, + volumes, + environment, + runtime_config=None, ): - mode = (mode or "memory").lower() - if mode not in ("memory", "redis"): - raise ValueError("mode must be 'memory' or 'redis'") - - self.config = _FakeConfig( - redis_enabled=(mode == "redis"), - heartbeat_timeout=1, - heartbeat_scan_interval=0, - heartbeat_lock_ttl=3, - ) - - self.reaped_sessions = [] - - if mode == "redis": - if redis_client is None: - raise ValueError("redis_client is required for redis mode") - self.redis_client = redis_client - self.heartbeat_mapping = RedisMapping( - redis_client, - prefix=prefix + "hb", - ) - self.recycled_mapping = RedisMapping( - redis_client, - prefix=prefix + "rc", - ) - self.session_mapping = RedisMapping( - redis_client, - prefix=prefix + "sm", - ) - self.container_mapping = RedisMapping( - redis_client, - prefix=prefix + "cm", - ) - else: - self.redis_client = None - self.heartbeat_mapping = InMemoryMapping() - self.recycled_mapping = InMemoryMapping() - self.session_mapping = InMemoryMapping() - self.container_mapping = InMemoryMapping() - - # --- minimal APIs required by mixin/decorator --- - def get_info(self, identity): - obj = self.container_mapping.get(identity) - if obj is None: - raise RuntimeError(f"container not found: {identity}") - return obj - - def create_for_session(self, identity: str, session_ctx_id: str): - # simulate session creation by setting up container and - # session mappings compatible with get_session_ctx_id_by_identity() - self.container_mapping.set( - identity, - {"meta": {"session_ctx_id": session_ctx_id}}, - ) - env_ids = self.session_mapping.get(session_ctx_id) or [] - if identity not in env_ids: - env_ids.append(identity) - self.session_mapping.set(session_ctx_id, env_ids) - - # mimic step-7 behavior - self.update_heartbeat(session_ctx_id) - self.clear_session_recycled(session_ctx_id) - - def list_session_keys(self): - return list(self.session_mapping.scan()) - - def get_session_mapping(self, session_ctx_id: str): - return self.session_mapping.get(session_ctx_id) or [] - - def reap_session( - self, - session_ctx_id: str, - reason: str = "heartbeat_timeout", - ) -> bool: - # minimal reap side effects - self.reaped_sessions.append((session_ctx_id, reason)) - self.mark_session_recycled(session_ctx_id) - self.delete_heartbeat(session_ctx_id) - self.session_mapping.delete(session_ctx_id) - return True - - def scan_heartbeat_once(self): - now = time.time() - timeout = int(self.config.heartbeat_timeout) - - for session_ctx_id in self.list_session_keys(): - last_active = self.get_heartbeat(session_ctx_id) - if last_active is None: - continue - if now - last_active <= timeout: - continue - - token = self.acquire_heartbeat_lock(session_ctx_id) - if not token: - continue - try: - last_active2 = self.get_heartbeat(session_ctx_id) - if last_active2 is None: - continue - if time.time() - last_active2 <= timeout: - continue - self.reap_session(session_ctx_id, reason="heartbeat_timeout") - finally: - self.release_heartbeat_lock(session_ctx_id, token) - - @touch_session(identity_arg="identity") - def ping(self, identity: str): - return True - - -def test_heartbeat_inmemory_basic(): - mgr = FakeManager() - session_ctx_id = "s1" - identity = "c1" - - mgr.create_for_session(identity=identity, session_ctx_id=session_ctx_id) - - t0 = mgr.get_heartbeat(session_ctx_id) - assert t0 is not None - assert mgr.needs_restore(session_ctx_id) is False - - # touch via decorator updates heartbeat - time.sleep(0.01) - mgr.ping(identity=identity) - t1 = mgr.get_heartbeat(session_ctx_id) - assert t1 is not None - assert t1 >= t0 + cid = f"cid-{self._next}" + self._next += 1 + self._by_id[cid] = {"name": name, "status": "running"} + self._by_name[name] = cid + # return: _id, ports, ip, *rest + return cid, [18080 + self._next], "127.0.0.1", "http" - # wait until timeout -> scan should reap - time.sleep(mgr.config.heartbeat_timeout + 0.1) - mgr.scan_heartbeat_once() + def inspect(self, identity): + cid = self._by_name.get(identity, identity) + return self._by_id.get(cid) - assert mgr.get_heartbeat(session_ctx_id) is None - assert mgr.needs_restore(session_ctx_id) is True - assert (session_ctx_id, "heartbeat_timeout") in mgr.reaped_sessions - assert session_ctx_id not in mgr.list_session_keys() + def get_status(self, identity): + cid = self._by_name.get(identity, identity) + obj = self._by_id.get(cid) + return None if obj is None else obj["status"] + def start(self, container_id): + if container_id in self._by_id: + self._by_id[container_id]["status"] = "running" -def test_heartbeat_watcher_inmemory_real_manager(monkeypatch): - """ - Test watcher thread itself (in-memory mode): - - create writes heartbeat - - watcher reaps after timeout automatically - """ - from agentscope_runtime.sandbox.manager.sandbox_manager import ( + def stop(self, container_id, timeout=1): + if container_id in self._by_id: + self._by_id[container_id]["status"] = "exited" + self.stopped.append(container_id) + + def remove(self, container_id, force=True): + self.removed.append(container_id) + obj = self._by_id.pop(container_id, None) + if obj: + name = obj["name"] + self._by_name.pop(name, None) + + +class StubRuntimeClient: + def check_health(self): + return {"status": "ok"} + + def list_tools(self, tool_type=None, **kwargs): + return [{"name": "dummy"}] + + def call_tool(self, tool_name=None, arguments=None): + return {"tool": tool_name, "ok": True} + + def add_mcp_servers(self, server_configs, overwrite=False): + return {"ok": True, "count": len(server_configs)} + + +@pytest.fixture() +def mgr(monkeypatch): + # 1) stub ContainerClientFactory.create_client + stub_cc = StubContainerClient() + from agentscope_runtime.common.container_clients import ( + ContainerClientFactory, + ) + + monkeypatch.setattr( + ContainerClientFactory, + "create_client", + lambda *args, **kwargs: stub_cc, + raising=True, + ) + + # 2) make storage operations no-op + from agentscope_runtime.sandbox.manager.storage import LocalStorage + + monkeypatch.setattr( + LocalStorage, + "upload_folder", + lambda *a, **k: None, + raising=True, + ) + monkeypatch.setattr( + LocalStorage, + "download_folder", + lambda *a, **k: None, + raising=True, + ) + + # 3) make runtime http client no-op (so touch_session can run) + monkeypatch.setattr( SandboxManager, + "_establish_connection", + lambda self, identity: StubRuntimeClient(), + raising=True, ) - from agentscope_runtime.sandbox.model import SandboxManagerEnvConfig cfg = SandboxManagerEnvConfig( redis_enabled=False, file_system="local", - container_deployment="docker", # won't be used due to monkeypatch + container_deployment="docker", pool_size=0, default_mount_dir="sessions_mount_dir", heartbeat_timeout=1, - heartbeat_scan_interval=1, # enable watcher - heartbeat_lock_ttl=3, + watcher_scan_interval=0, + heartbeat_lock_ttl=2, ) - mgr = SandboxManager(config=cfg) - - session_ctx_id = "watcher_session_ctx" - container_name = "watcher_container" - - # fake create - def _fake_create( - self, - sandbox_type=None, - mount_dir=None, - storage_path=None, - environment=None, - meta=None, - ): - meta = meta or {} - model_dict = { - "session_id": "sid1", - "container_id": "cid1", - "container_name": container_name, - "url": "http://127.0.0.1:1", - "ports": [1], - "mount_dir": "", - "storage_path": "", - "runtime_token": "token", - "version": "fake", - "meta": meta, - "timeout": 0, - } - self.container_mapping.set(container_name, model_dict) - - if meta and meta.get("session_ctx_id"): - scid = meta["session_ctx_id"] - env_ids = self.session_mapping.get(scid) or [] - if container_name not in env_ids: - env_ids.append(container_name) - self.session_mapping.set(scid, env_ids) - self.update_heartbeat(scid) - self.clear_session_recycled(scid) - - return container_name - - # fake release - def _fake_release(self, identity): - c = self.container_mapping.get(identity) - if c is None: - return True - self.container_mapping.delete(identity) - meta = c.get("meta") or {} - scid = meta.get("session_ctx_id") - if scid: - self.session_mapping.delete(scid) - return True - - monkeypatch.setattr(SandboxManager, "create", _fake_create, raising=True) - monkeypatch.setattr(SandboxManager, "release", _fake_release, raising=True) + m = SandboxManager(config=cfg, default_type=SandboxType.BASE) + # expose stub client for assertions about stop/remove calls + m._stub_cc = stub_cc try: - mgr.create(meta={"session_ctx_id": session_ctx_id}) - assert mgr.get_heartbeat(session_ctx_id) is not None - - mgr.start_heartbeat_watcher() - - # wait (polling) for: timeout + at least one scan interval, or until - # the heartbeat has been cleared by the watcher - max_wait = cfg.heartbeat_timeout + cfg.heartbeat_scan_interval + 0.5 - start_time = time.time() - while time.time() - start_time < max_wait: - if mgr.get_heartbeat(session_ctx_id) is None: - break - time.sleep(0.1) - - assert mgr.get_heartbeat(session_ctx_id) is None - assert mgr.needs_restore(session_ctx_id) is True - assert session_ctx_id not in mgr.list_session_keys() + yield m finally: - mgr.stop_heartbeat_watcher() - - -@pytest.fixture() -def fake_redis(): - return fakeredis.FakeRedis(decode_responses=True) - + # avoid leftovers + m.cleanup() + + +def _force_expire_session( + mgr: SandboxManager, + session_ctx_id: str, + seconds_ago: float = 100, +): + # Set last_active_at of all RUNNING containers in this session to a very + # old timestamp + for cname in mgr.get_session_mapping(session_ctx_id): + cm = ContainerModel(**mgr.get_info(cname)) + cm.last_active_at = time.time() - seconds_ago + cm.updated_at = time.time() + mgr.container_mapping.set(cm.container_name, cm.model_dump()) + + +def test_heartbeat_reap_then_touch_auto_restore_flow(mgr: SandboxManager): + session = "sess-1" + + # 1) Create a session-bound container (real SandboxManager.create logic) + cname = mgr.create( + sandbox_type=SandboxType.BASE, + meta={"session_ctx_id": session}, + ) + assert cname is not None -def test_redis_lock_token_semantics_and_ttl(fake_redis): - mgr = FakeManager(mode="redis", redis_client=fake_redis, prefix="lock:") + cm = ContainerModel(**mgr.get_info(cname)) + assert cm.state == ContainerState.RUNNING + assert cm.session_ctx_id == session + assert mgr.get_heartbeat(session) is not None + assert mgr.needs_restore(session) is False - session = "s_lock" - token1 = mgr.acquire_heartbeat_lock(session) - assert token1, "first acquire should succeed" + # 2) touch_session: calling check_health will update_heartbeat + hb0 = mgr.get_heartbeat(session) + time.sleep(0.01) + mgr.check_health(cname) + hb1 = mgr.get_heartbeat(session) + assert hb1 >= hb0 - token2 = mgr.acquire_heartbeat_lock(session) - assert not token2, "second acquire should fail while lock held" + # 3) Expire heartbeat + scan => triggers reap_session (stop/remove + + # mark RECYCLED) + _force_expire_session(mgr, session, seconds_ago=100) + metrics = mgr.scan_heartbeat_once() + assert metrics["reaped_sessions"] == 1 - # wrong token should not release - mgr.release_heartbeat_lock(session, token="WRONG_TOKEN") - token3 = mgr.acquire_heartbeat_lock(session) - assert not token3, "lock should still be held after wrong-token release" + # old container should be stopped/removed + assert mgr._stub_cc.stopped, "reap should call stop" + assert mgr._stub_cc.removed, "reap should call remove" - # correct token releases - mgr.release_heartbeat_lock(session, token1) - token4 = mgr.acquire_heartbeat_lock(session) - assert token4, "lock should be acquirable after correct release" + # should require restore + assert mgr.needs_restore(session) is True - # ttl expiry allows re-acquire (don't release token4) - time.sleep(mgr.config.heartbeat_lock_ttl + 0.2) - token5 = mgr.acquire_heartbeat_lock(session) - assert token5, "lock should expire after ttl" + # 4) touch_session again: should auto restore_session (decorator logic) + # here identity=old cname; get_info can still load the model (still + # in container_mapping) + mgr.check_health(cname) + assert mgr.needs_restore(session) is False -def test_redis_mapping_roundtrip_for_heartbeat_and_recycled(fake_redis): - mgr = FakeManager(mode="redis", redis_client=fake_redis, prefix="state:") - session = "s_state" - identity = "c_state" + # 5) after restore, session_mapping should point to a "new container" + new_list = mgr.get_session_mapping(session) + assert new_list, "session_mapping should exist after restore" + assert ( + new_list[0] != cname + ), "restored container should be a new container name" - mgr.create_for_session(identity=identity, session_ctx_id=session) + new_cm = ContainerModel(**mgr.get_info(new_list[0])) + assert new_cm.state == ContainerState.RUNNING + assert new_cm.session_ctx_id == session - # heartbeat written + # new container should also be touchable + mgr.list_tools(new_list[0]) assert mgr.get_heartbeat(session) is not None - # recycled cleared - assert mgr.needs_restore(session) is False - # mark/clear recycled in redis - mgr.mark_session_recycled(session) - assert mgr.needs_restore(session) is True - mgr.clear_session_recycled(session) - assert mgr.needs_restore(session) is False - - # delete heartbeat in redis - mgr.delete_heartbeat(session) - assert mgr.get_heartbeat(session) is None +def test_scan_branches_no_running_and_no_heartbeat(mgr: SandboxManager): + # skipped_no_running_containers: session mapping contains only WARM + # containers + s1 = "s_no_running" + mgr.session_mapping.set(s1, ["c_warm"]) + mgr.container_mapping.set( + "c_warm", + ContainerModel( + session_id="sid", + container_id="cid", + container_name="c_warm", + url="http://127.0.0.1:1", + ports=[1], + mount_dir="", + storage_path="", + runtime_token="t", + version="fake", + meta={"session_ctx_id": s1}, + timeout=0, + sandbox_type="base", + session_ctx_id=s1, + state=ContainerState.WARM, + updated_at=time.time(), + ).model_dump(), + ) -def test_multi_instance_scan_race_only_one_reaps(fake_redis): - """ - Two FakeManager instances share same - redis/prefix ->simulate multi-instance. Only one should reap due to - distributed lock. - """ - prefix = "race:" - mgr1 = FakeManager(mode="redis", redis_client=fake_redis, prefix=prefix) - mgr2 = FakeManager(mode="redis", redis_client=fake_redis, prefix=prefix) - - session = "s_race" - - # make session visible to scan - mgr1.session_mapping.set(session, ["c1", "c2"]) - # make it expired - mgr1.heartbeat_mapping.set(session, time.time() - 100) - - t1 = threading.Thread(target=mgr1.scan_heartbeat_once) - t2 = threading.Thread(target=mgr2.scan_heartbeat_once) - t1.start() - t2.start() - t1.join() - t2.join() - - total_reaped = len(mgr1.reaped_sessions) + len(mgr2.reaped_sessions) - assert total_reaped == 1 - - # state after reap - assert mgr1.get_heartbeat(session) is None - assert mgr1.needs_restore(session) is True - assert session not in mgr1.list_session_keys() + # skipped_no_heartbeat: RUNNING but last_active_at=None + s2 = "s_no_hb" + mgr.session_mapping.set(s2, ["c_nohb"]) + cm = ContainerModel(**mgr.container_mapping.get("c_warm")) + cm.container_name = "c_nohb" + cm.session_id = "sid2" + cm.container_id = "cid2" + cm.state = ContainerState.RUNNING + cm.session_ctx_id = s2 + cm.meta = {"session_ctx_id": s2} + cm.last_active_at = None + mgr.container_mapping.set("c_nohb", cm.model_dump()) + + metrics = mgr.scan_heartbeat_once() + assert metrics["skipped_no_running_containers"] >= 1 + assert metrics["skipped_no_heartbeat"] >= 1 diff --git a/tests/sandbox/test_heartbeat_timeout_restore.py b/tests/sandbox/test_heartbeat_timeout_restore.py new file mode 100644 index 000000000..51fa232d3 --- /dev/null +++ b/tests/sandbox/test_heartbeat_timeout_restore.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +""" +Integration test: heartbeat timeout -> reap -> touch -> restore -> +run_shell_command works again. + +This test verifies: +1) A sandbox container bound to a session_ctx_id can execute run_shell_command. +2) After heartbeat timeout, the session is reaped and the container is + marked RECYCLED. +3) When the user "comes back" (touch heartbeat), + restore_session() can bring up a new container. +4) The restored container is RUNNING and can execute run_shell_command. + +Run: + pytest -q test_heartbeat_timeout_restore.py +""" + +import time +import pytest + +from agentscope_runtime.sandbox.manager.server.app import get_config +from agentscope_runtime.sandbox.manager.sandbox_manager import SandboxManager +from agentscope_runtime.sandbox.enums import SandboxType +from agentscope_runtime.sandbox.model import ContainerModel, ContainerState + + +def test_heartbeat_reap_then_restore_run_shell(): + # Prepare manager config for a fast heartbeat/reap cycle + config = get_config() + config.allow_mount_dir = True + config.redis_enabled = False + + # Keep timeouts small so the test finishes quickly + config.heartbeat_timeout = 30 # seconds of inactivity to trigger reap + config.watcher_scan_interval = 3 # scan interval in seconds + + session_ctx_id = f"hb-restore-{int(time.time())}" + meta = {"session_ctx_id": session_ctx_id} + + with SandboxManager(config=config, default_type=SandboxType.BASE) as mgr: + # Start heartbeat watcher explicitly (not started automatically) + mgr.start_watcher() + + # Create a session-bound sandbox (required for heartbeat tracking) + old_name = mgr.create_from_pool( + sandbox_type=SandboxType.BASE.value, + meta=meta, + ) or mgr.create(sandbox_type=SandboxType.BASE.value, meta=meta) + assert old_name, "Failed to create sandbox container" + + # 1) Old container should be able to run a shell command + r0 = mgr.call_tool( + old_name, + "run_shell_command", + {"command": "echo old-ok"}, + ) + assert "old-ok" in str(r0) + + # 2) Wait long enough for heartbeat timeout + watcher reap + time.sleep( + config.heartbeat_timeout + config.watcher_scan_interval + 5, + ) + + # The old container model should be marked as RECYCLED + old_cm = ContainerModel(**mgr.get_info(old_name)) + assert old_cm.state == ContainerState.RECYCLED + assert old_cm.recycle_reason == "heartbeat_timeout" + + # Old container should not be usable anymore (it was stopped/removed) + with pytest.raises(Exception): + mgr.call_tool( + old_name, + "run_shell_command", + {"command": "echo should-fail"}, + ) + + # 3) User comes back: touch heartbeat + restore the session + mgr.update_heartbeat(session_ctx_id) + mgr.restore_session(session_ctx_id) + + # 4) Session mapping should now contain restored container(s) + env_ids = mgr.get_session_mapping(session_ctx_id) + assert env_ids, "Expected restored container(s), got empty mapping" + + new_name = env_ids[0] + new_cm = ContainerModel(**mgr.get_info(new_name)) + assert new_cm.state == ContainerState.RUNNING + + # Restored container should be able to run a shell command again + r1 = mgr.call_tool( + new_name, + "run_shell_command", + {"command": "echo new-ok"}, + ) + assert "new-ok" in str(r1)