Skip to content
Merged
2 changes: 1 addition & 1 deletion .github/workflows/unit_test_sandbox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:

- name: Run tests with coverage (default)
run: |
coverage run -m pytest tests/sandbox/test_sandbox.py tests/sandbox/test_sandbox_service.py tests/sandbox/test_heartbeat.py
coverage run -m pytest tests/sandbox/test_sandbox.py tests/sandbox/test_sandbox_service.py tests/sandbox/test_heartbeat.py tests/sandbox/test_heartbeat_timeout_restore.py

- name: Generate coverage report
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ def create(
# Convert environment dict to list of tuples
env_list = []
if environment:
env_list = list(environment.items())
env_list = [
(str(k), "" if v is None else str(v))
for k, v in environment.items()
]
Comment thread
rayrayraykk marked this conversation as resolved.

# Convert volumes to BoxLite format
volume_list = []
Expand Down
30 changes: 23 additions & 7 deletions src/agentscope_runtime/sandbox/box/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import signal
from typing import Any, Optional

import shortuuid

from ..enums import SandboxType
from ..manager.sandbox_manager import SandboxManager
from ..manager.server.app import get_config
Expand Down Expand Up @@ -154,17 +156,25 @@ class Sandbox(SandboxBase):
def __enter__(self):
# Create sandbox if sandbox_id not provided
if self._sandbox_id is None:
short_uuid = shortuuid.ShortUUID().uuid()
session_ctx_id = str(short_uuid)
if self.workspace_dir:
# bypass pool when workspace_dir is set
self._sandbox_id = self.manager_api.create(
_id = self.manager_api.create(
sandbox_type=SandboxType(self.sandbox_type).value,
mount_dir=self.workspace_dir,
# TODO: support bind self-define id
meta={"session_ctx_id": session_ctx_id},
)
else:
self._sandbox_id = self.manager_api.create_from_pool(
_id = self.manager_api.create_from_pool(
sandbox_type=SandboxType(self.sandbox_type).value,
# TODO: support bind self-define id
meta={"session_ctx_id": session_ctx_id},
)

self._sandbox_id = _id

if self._sandbox_id is None:
raise RuntimeError(
"No sandbox available. This may happen if: "
Expand Down Expand Up @@ -217,18 +227,24 @@ def add_mcp_servers(self, server_configs: dict, overwrite=False):
class SandboxAsync(SandboxBase):
async def __aenter__(self):
if self._sandbox_id is None:
short_uuid = shortuuid.ShortUUID().uuid()
session_ctx_id = str(short_uuid)
if self.workspace_dir:
self._sandbox_id = await self.manager_api.create_async(
_id = await self.manager_api.create_async(
sandbox_type=SandboxType(self.sandbox_type).value,
mount_dir=self.workspace_dir,
# TODO: support bind self-define id
meta={"session_ctx_id": session_ctx_id},
)
else:
self._sandbox_id = (
await self.manager_api.create_from_pool_async(
sandbox_type=SandboxType(self.sandbox_type).value,
)
_id = await self.manager_api.create_from_pool_async(
sandbox_type=SandboxType(self.sandbox_type).value,
# TODO: support bind self-define id
meta={"session_ctx_id": session_ctx_id},
)

self._sandbox_id = _id

if self._sandbox_id is None:
raise RuntimeError("No sandbox available.")
if self.embed_mode:
Expand Down
191 changes: 145 additions & 46 deletions src/agentscope_runtime/sandbox/manager/heartbeat_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import inspect
import time
import secrets
from typing import Optional
from typing import Optional, List
from functools import wraps

import logging
from redis.exceptions import ResponseError

from ..model import ContainerModel
from ..model import ContainerModel, ContainerState

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,7 +44,8 @@ async def async_wrapper(self, *args, **kwargs):
if session_ctx_id:
self.update_heartbeat(session_ctx_id)
if self.needs_restore(session_ctx_id):
self.restore_session(session_ctx_id)
if hasattr(self, "restore_session"):
self.restore_session(session_ctx_id)
except Exception as e:
logger.debug(f"touch_session failed (ignored): {e}")

Expand All @@ -68,7 +69,8 @@ def sync_wrapper(self, *args, **kwargs):
if session_ctx_id:
self.update_heartbeat(session_ctx_id)
if self.needs_restore(session_ctx_id):
self.restore_session(session_ctx_id)
if hasattr(self, "restore_session"):
self.restore_session(session_ctx_id)
except Exception as e:
logger.debug(f"touch_session failed (ignored): {e}")

Expand All @@ -82,17 +84,20 @@ def sync_wrapper(self, *args, **kwargs):
class HeartbeatMixin:
"""
Mixin providing:
- heartbeat timestamp read/write
- recycled (restore-required) marker
- heartbeat timestamp read/write (stored on
ContainerModel.last_active_at)
- recycled (restore-required) marker (stored on
ContainerModel.state/recycled_at)
- redis distributed lock for reaping

Host class must provide:
- self.heartbeat_mapping, self.recycled_mapping
(Mapping-like with set/get/delete)
- self.container_mapping (Mapping-like with set/get/delete/scan)
- self.session_mapping (Mapping-like with set/get/delete/scan)
- self.get_info(identity) -> dict compatible with ContainerModel(**dict)
- self.config.redis_enabled (bool)
- self.config.heartbeat_lock_ttl (int)
- self.redis_client (redis client or None)
- self.restore_session (for restore session)
"""

_REDIS_RELEASE_LOCK_LUA = """if redis.call("GET", KEYS[1]) == ARGV[1] then
Expand All @@ -102,92 +107,186 @@ class HeartbeatMixin:
end
"""

def _list_container_names_by_session(
self,
session_ctx_id: str,
) -> List[str]:
if not session_ctx_id:
return []
# session_mapping stores container_name list
try:
return self.session_mapping.get(session_ctx_id) or []
except Exception:
return []
Comment thread
rayrayraykk marked this conversation as resolved.
Comment thread
rayrayraykk marked this conversation as resolved.

def _load_container_model(self, identity: str) -> Optional[ContainerModel]:
try:
info_dict = self.get_info(identity)
return ContainerModel(**info_dict)
except Exception as e:
logger.debug(f"_load_container_model failed for {identity}: {e}")
return None
Comment thread
rayrayraykk marked this conversation as resolved.
Comment thread
rayrayraykk marked this conversation as resolved.

def _save_container_model(self, model: ContainerModel) -> None:
# IMPORTANT: persist back into container_mapping
self.container_mapping.set(model.container_name, model.model_dump())
Comment thread
rayrayraykk marked this conversation as resolved.
Comment thread
rayrayraykk marked this conversation as resolved.

# ---------- heartbeat ----------
def update_heartbeat(
self,
session_ctx_id: str,
ts: Optional[float] = None,
) -> float:
"""
heartbeat_mapping[session_ctx_id] = last_active_timestamp
(unix seconds).
Update heartbeat timestamp onto ALL containers bound to session_ctx_id.
Returns the timestamp written.
"""
if not session_ctx_id:
raise ValueError("session_ctx_id is required")

ts = float(ts if ts is not None else time.time())
self.heartbeat_mapping.set(session_ctx_id, ts)
now = time.time()

container_names = self._list_container_names_by_session(session_ctx_id)
for cname in list(container_names):
model = self._load_container_model(cname)
if not model:
continue

# only update heartbeat for RUNNING containers
if model.state != ContainerState.RUNNING:
continue

model.last_active_at = ts
model.updated_at = now

# keep session_ctx_id consistent (migration safety)
model.session_ctx_id = session_ctx_id

self._save_container_model(model)
Comment thread
rayrayraykk marked this conversation as resolved.

return ts

def get_heartbeat(self, session_ctx_id: str) -> Optional[float]:
val = (
self.heartbeat_mapping.get(session_ctx_id)
if session_ctx_id
else None
)
return float(val) if val is not None else None
"""
Return session-level heartbeat = max(last_active_at) among bound
containers.
"""
if not session_ctx_id:
return None

container_names = self._list_container_names_by_session(session_ctx_id)
last_vals = []
for cname in list(container_names):
model = self._load_container_model(cname)
if not model:
continue

def delete_heartbeat(self, session_ctx_id: str) -> None:
if session_ctx_id:
self.heartbeat_mapping.delete(session_ctx_id)
if model.state != ContainerState.RUNNING:
continue

if model.last_active_at is not None:
last_vals.append(float(model.last_active_at))

return max(last_vals) if last_vals else None

# ---------- recycled marker ----------
def mark_session_recycled(
self,
session_ctx_id: str,
ts: Optional[float] = None,
reason: str = "heartbeat_timeout",
) -> float:
"""
recycled_mapping[session_ctx_id] = recycled_timestamp (unix seconds).
Mark ALL containers bound to session_ctx_id as recycled.
(Does not stop/remove containers here; reap_session will do that.)
"""
if not session_ctx_id:
raise ValueError("session_ctx_id is required")

ts = float(ts if ts is not None else time.time())
self.recycled_mapping.set(session_ctx_id, ts)
now = time.time()

container_names = self._list_container_names_by_session(session_ctx_id)
for cname in list(container_names):
model = self._load_container_model(cname)
if not model:
continue

# if already released, don't flip back
if model.state == ContainerState.RELEASED:
continue

model.state = ContainerState.RECYCLED
model.recycled_at = ts
model.recycle_reason = reason
model.updated_at = now

model.session_ctx_id = session_ctx_id
self._save_container_model(model)

return ts

def clear_session_recycled(self, session_ctx_id: str) -> None:
if session_ctx_id:
self.recycled_mapping.delete(session_ctx_id)
"""
Clear recycled marker on containers (if any) for this session.
Usually called when session is allocated a new running container.
"""
if not session_ctx_id:
return

now = time.time()
container_names = self._list_container_names_by_session(session_ctx_id)
for cname in list(container_names):
model = self._load_container_model(cname)
if not model:
continue
if model.state == ContainerState.RECYCLED:
model.state = ContainerState.RUNNING
model.recycled_at = None
model.recycle_reason = None
model.updated_at = now
model.session_ctx_id = session_ctx_id
self._save_container_model(model)

def needs_restore(self, session_ctx_id: str) -> bool:
if not session_ctx_id:
return False
return self.recycled_mapping.get(session_ctx_id) is not None

def restore_session(self, session_ctx_id: str) -> None:
"""
Stub for snapshot/restore phase.
Called when a session is marked recycled (needs_restore == True).
"""
logger.warning(
f"restore_session({session_ctx_id}) called but not implemented "
f"yet.",
)
# NOTE: keep recycled mark for now, so future requests still
# indicate restore needed. If you prefer "warn once", uncomment next
# line:
# self.clear_session_recycled(session_ctx_id)
container_names = self._list_container_names_by_session(session_ctx_id)
for cname in list(container_names):
model = self._load_container_model(cname)
if not model:
continue
if (
model.state == ContainerState.RECYCLED
or model.recycled_at is not None
):
return True
return False
Comment thread
rayrayraykk marked this conversation as resolved.

# ---------- helpers ----------
def get_session_ctx_id_by_identity(self, identity: str) -> Optional[str]:
"""
Resolve session_ctx_id from a container identity.
Returns None if the container cannot be found (get_info raises
RuntimeError), which is an expected situation for recycled/removed
containers during heartbeat touches.
"""
try:
info_dict = self.get_info(identity)
except RuntimeError as exc:
# Missing container is a normal condition during heartbeat checks.
logger.debug(
"get_session_ctx_id_by_identity: container not found for "
"identity %s: %s",
identity,
exc,
f"get_session_ctx_id_by_identity: container not found for "
f"identity {identity}: {exc}",
)

return None

info = ContainerModel(**info_dict)

# NEW: prefer top-level field
if info.session_ctx_id:
return info.session_ctx_id

# fallback for older payloads
return (info.meta or {}).get("session_ctx_id")
Comment thread
rayrayraykk marked this conversation as resolved.

# ---------- redis distributed lock ----------
Expand Down
Loading
Loading