Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions src/agentscope_runtime/sandbox/manager/heartbeat_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,11 @@ def mark_session_recycled(
if not model:
continue

# if already released, don't flip back
if model.state == ContainerState.RELEASED:
# if already in terminal state, don't flip back
if model.state in (
ContainerState.RELEASED,
ContainerState.REPLACED,
):
continue

model.state = ContainerState.RECYCLED
Expand Down Expand Up @@ -342,7 +345,8 @@ def needs_restore(self, session_ctx_id: str) -> bool:
"""Check whether any container in the session is marked for restore.

A session is considered needing restore if any bound container is in
``ContainerState.RECYCLED`` or has ``recycled_at`` set.
``ContainerState.RECYCLED`` (not REPLACED, as REPLACED containers
already have redirects set up).

Args:
session_ctx_id (`str`):
Expand All @@ -360,10 +364,8 @@ def needs_restore(self, session_ctx_id: str) -> bool:
model = self._load_container_model(cname)
if not model:
continue
if (
model.state == ContainerState.RECYCLED
or model.recycled_at is not None
):
# Only RECYCLED needs restore; REPLACED already has redirect
if model.state == ContainerState.RECYCLED:
return True
return False

Expand Down
101 changes: 88 additions & 13 deletions src/agentscope_runtime/sandbox/manager/sandbox_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ def cleanup(self):
if container_model.state in (
ContainerState.RELEASED,
ContainerState.RECYCLED,
ContainerState.REPLACED,
):
Comment thread
rayrayraykk marked this conversation as resolved.
continue

Expand All @@ -567,6 +568,7 @@ def cleanup(self):
if container_model.state in (
ContainerState.RELEASED,
ContainerState.RECYCLED,
ContainerState.REPLACED,
):
continue

Expand Down Expand Up @@ -1123,17 +1125,62 @@ async def get_status_async(self, *args, **kwargs):
return await asyncio.to_thread(self.get_status, *args, **kwargs)

@remote_wrapper()
def get_info(self, identity):
"""Get container information by container_name or container_id."""
def get_info(self, identity, _redirect_depth: int = 0):
"""
Get container information by container_name or container_id.

Automatically follows redirects if container is REPLACED.

Args:
identity: Container name or ID to look up
_redirect_depth: Internal parameter to prevent infinite redirect
loops

Returns:
Container information dict

Raises:
RuntimeError: If container not found or redirect loop detected
"""
# Prevent infinite redirect loops
if _redirect_depth > 10:
raise RuntimeError(
f"Redirect loop detected for container {identity}",
)

container_model = self.container_mapping.get(identity)
if container_model is None:
container_model = self.container_mapping.get(
self._generate_container_key(identity),
)
if container_model is None:
raise RuntimeError(f"No container found with id: {identity}.")

# Parse container model
if isinstance(container_model, dict):
cm_dict = container_model
elif hasattr(container_model, "model_dump"):
cm_dict = container_model.model_dump()
else:
cm_dict = container_model

# Check if this container has been replaced and needs redirect
cm = ContainerModel(**cm_dict)
if (
cm.state == ContainerState.REPLACED
and cm.redirect_to
and cm.redirect_to != identity
):
logger.debug(
f"Container {identity} is REPLACED, redirecting to "
f"{cm.redirect_to}",
)
# Follow the redirect recursively
return self.get_info(cm.redirect_to, _redirect_depth + 1)
Comment thread
rayrayraykk marked this conversation as resolved.

Comment thread
rayrayraykk marked this conversation as resolved.
# Return the container model as dict/json
if hasattr(container_model, "model_dump_json"):
container_model = container_model.model_dump_json()
return container_model.model_dump_json()

return container_model

Expand Down Expand Up @@ -1447,14 +1494,32 @@ def restore_session(self, session_ctx_id: str) -> None:
# 3) heartbeat after restore (session-level)
self.update_heartbeat(session_ctx_id)

# 4) archive old recycled records so needs_restore becomes False
for old_name in recycled_old_names:
# 4) mark old recycled records as REPLACED with redirect info
# (keep them for client compatibility, don't delete)
for i, old_name in enumerate(recycled_old_names):
Comment thread
rayrayraykk marked this conversation as resolved.
try:
self.container_mapping.delete(old_name)
# Get the raw container data without following redirects
old_data = self.container_mapping.get(old_name)
if not old_data:
logger.warning(
f"restore_session: old container {old_name} not "
f"found in mapping",
)
continue

old_cm = ContainerModel(**old_data)
old_cm.state = ContainerState.REPLACED
old_cm.redirect_to = new_container_names[i]
old_cm.updated_at = time.time()
self.container_mapping.set(old_name, old_cm.model_dump())
logger.debug(
f"restore_session: marked {old_name} as REPLACED -> "
f"{new_container_names[i]}",
)
except Exception as e:
logger.warning(
f"restore_session: failed to delete old model"
f" {old_name}: {e}",
f"restore_session: failed to mark old container "
f"{old_name} as REPLACED: {e}",
)

def scan_heartbeat_once(self) -> dict:
Expand Down Expand Up @@ -1602,7 +1667,8 @@ def scan_pool_once(self) -> dict:

def scan_released_cleanup_once(self, max_delete: int = 200) -> dict:
"""
Delete container_mapping records whose state == RELEASED and expired.
Delete container_mapping records whose state is RELEASED or
REPLACED and expired.

TTL is config.released_key_ttl seconds. 0 disables cleanup.
"""
Expand Down Expand Up @@ -1635,17 +1701,26 @@ def scan_released_cleanup_once(self, max_delete: int = 200) -> dict:

cm = ContainerModel(**container_json)

if cm.state != ContainerState.RELEASED:
# Only cleanup RELEASED or REPLACED states
if cm.state not in (
ContainerState.RELEASED,
ContainerState.REPLACED,
):
result["skipped_not_released"] += 1
Comment thread
rayrayraykk marked this conversation as resolved.
Outdated
continue

released_at = cm.released_at or cm.updated_at or 0
if released_at <= 0:
# For RELEASED: use released_at; for REPLACED: use updated_at
cleanup_at = (
cm.released_at
if cm.state == ContainerState.RELEASED
else cm.updated_at
)
if not cleanup_at or cleanup_at <= 0:
# no timestamp -> treat as not expired
result["skipped_not_expired"] += 1
continue

if now - released_at <= ttl:
if now - cleanup_at <= ttl:
result["skipped_not_expired"] += 1
continue

Expand Down
7 changes: 7 additions & 0 deletions src/agentscope_runtime/sandbox/model/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class ContainerState(str, Enum):
WARM = "warm"
RUNNING = "running"
RECYCLED = "recycled"
REPLACED = "replaced"
ERROR = "error"
RELEASED = "released"

Expand Down Expand Up @@ -115,6 +116,12 @@ class ContainerModel(BaseModel):
description="Reason for recycle",
)

redirect_to: Optional[str] = Field(
default=None,
description="Container name to redirect to when this container is "
"REPLACED",
)

@model_validator(mode="after")
def _compat_and_defaults(self):
"""Compatibility layer for ContainerModel.
Expand Down
Loading