Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions matrix/app_server/app_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,13 @@ class ApplicationStatus(str, Enum):
return "RUNNING"
except:
return "NOT_STARTED"

def app_cleanup(self, app_name: str) -> str:
"""A helper function to cleanup for stateful services, eg containers maybe be dangling due to exception etc."""
from matrix.client.container_client import ContainerClient

metadata = self.get_app_metadata(app_name)
assert metadata["app_type"] == "container"
base_url = metadata["endpoints"]["head"]
client = ContainerClient(base_url)
return run_async(client.release_all_containers())
196 changes: 137 additions & 59 deletions matrix/app_server/container/container_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
import random
import shlex
import subprocess
import threading
import time
import uuid
from functools import partial
from typing import Any, Dict, Optional

import ray
from fastapi import FastAPI, HTTPException
from ray import serve
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

from matrix.utils.ray import ACTOR_NAME_SPACE, get_matrix_actors, get_ray_head_node
from matrix.utils.ray import ACTOR_NAME_SPACE, get_ray_head_node, ray_get_async

"""
ContainerDeployment has several replicas controlled by user.
Expand All @@ -43,6 +46,9 @@ def __init__(self):
self.containers: Dict[str, str] = {}

def register_actor(self, owner_id: str, handle, actor_id: str):
print(
f"register {actor_id} {owner_id}"
) # note: logger.info does not print anything
self.actors[actor_id] = {"handle": handle, "owner": owner_id}
return actor_id

Expand Down Expand Up @@ -75,20 +81,26 @@ def acquire(
# randomly select one
aid, info = random.choice(available)
self.containers[container_id] = aid
print(f"acquire {container_id} {aid}")
return aid, info["handle"]
else:
return None, None

def release(self, container_id: str):
print(f"release {container_id}")
self.containers.pop(container_id, None)
return True

def list_actors(self):
return {
"actors": {aid: info["owner"] for aid, info in self.actors.items()},
"actors": self.actors,
"containers": self.containers,
}

def release_all_containers(self):
print(f"Release all containers")
self.containers.clear()

def cleanup_replica(self, replica_id: str):
"""
Cleanup all actors owned by this replica.
Expand Down Expand Up @@ -135,7 +147,7 @@ def execute(
cwd: str = "",
env: dict[str, str] = None,
forward_env: list[str] = None,
timeout_secs: int | None = None,
timeout: int | None = None,
) -> dict[str, Any]:
"""Run a command inside the running instance."""
if self.config is None:
Expand All @@ -162,7 +174,7 @@ def execute(
result = subprocess.run(
cmd,
text=True,
timeout=timeout_secs,
timeout=timeout,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
Expand Down Expand Up @@ -215,26 +227,46 @@ def __init__(self, num_containers_per_replica: int = 32):
self.num_containers_per_replica = num_containers_per_replica

# create local non-detached actors and register them
self.local_actors = [] # actor ids hex owned by this replica
self.local_actors: list[Any] = [] # actor ids hex owned by this replica
# Cancellation event
self._stop_event = threading.Event()

# Start background thread
self._thread = threading.Thread(target=self._launch_actors, daemon=True)
self._thread.start()

def _launch_actors(self):
"""Create actors with infinite retry until stopped."""
for _ in range(self.num_containers_per_replica):
if self._stop_event.is_set():
break
actor_handle = ContainerActor.remote() # type: ignore[attr-defined]
actor_id = ray.get(actor_handle.get_id.remote())
ray.get(
self.registry.register_actor.remote(
self.replica_id, actor_handle, actor_id
)
)
self.local_actors.append(actor_handle)
while not self._stop_event.is_set():
try:
# Use non-blocking wait with timeout
ready, _ = ray.wait(
[actor_handle.get_id.remote()],
timeout=2, # short timeout for interruptibility
)

async def _ray_get(self, ref):
# helper to await ray.get without blocking the async loop
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, ray.get, ref)
if ready:
actor_id = ray.get(ready[0])
# Register actor
ray.get(
self.registry.register_actor.remote(
self.replica_id, actor_handle, actor_id
)
)
self.local_actors.append(actor_handle)
break # move to next actor
except Exception as e:
# Could log or add exponential backoff
time.sleep(1)

@app.post("/acquire")
async def acquire_container(self, payload: Dict):
"""
payload: {"timeout_s": 5, "executable": "apptainer", "image": "docker://ubuntu:22.04", "run_args": []}
payload: {"timeout": 5, "executable": "apptainer", "image": "docker://ubuntu:22.04", "run_args": []}
returns {"container_id": ...}
"""
image = payload.get("image")
Expand All @@ -246,66 +278,76 @@ async def acquire_container(self, payload: Dict):
container_id = payload.get("container_id", None)
assert container_id is None, "container_id unexpected"
container_id = f"container-{uuid.uuid4().hex[:8]}"
timeout_s = float(payload.get("timeout_s", 5.0))
timeout = payload.get("timeout")

start = asyncio.get_event_loop().time()
while True:
_actor_id, handle = await self._ray_get(
self.registry.acquire.remote(container_id)
)
if handle is not None:
try:
await self._ray_get(
handle.start_container.remote(
executable=executable,
image=image,
run_args=run_args,
container_id=container_id,
)
_actor_id, handle = await ray_get_async(
self.registry.acquire.remote(container_id)
)
if handle is not None:
try:
await ray_get_async(
handle.start_container.remote(
executable=executable,
image=image,
run_args=run_args,
container_id=container_id,
timeout=timeout,
)
return {"container_id": container_id}
except Exception as e:
# actor probably died or failed - do a cleanup of that actor in registry
await self._ray_get(handle.cleanup.remote())
await self._ray_get(self.registry.release.remote(container_id))

raise HTTPException(
status_code=500, detail=f"Failed to start_container: {e}"
)
return {"container_id": container_id}
except Exception as e:
# actor probably died or failed - do a cleanup of that actor in registry
try:
await ray_get_async(
[
handle.cleanup.remote(),
self.registry.release.remote(container_id),
]
)
except Exception:
pass

# none available
if asyncio.get_event_loop().time() - start > timeout_s:
raise HTTPException(
status_code=503, detail="No available containers, wait then retry."
status_code=500, detail=f"Failed to start_container: {e}"
)
await asyncio.sleep(1)
raise HTTPException(
status_code=503, detail=f"Containers are not available, please retry later"
)

@app.post("/release")
async def release_container(self, payload: Dict):
"""
payload: {"container_id": "..."}
return {"container_id": container_id}
"""
container_id = payload.get("container_id")
if not container_id:
raise HTTPException(status_code=400, detail="container_id required")
return {"container_id": container_id}

# lookup actor for container
handle = await self._ray_get(
handle = await ray_get_async(
self.registry.get_container_handle.remote(container_id)
)
if handle is None:
raise HTTPException(
status_code=404, detail=f"bad container id {container_id}"
)
await self._ray_get(handle.cleanup.remote())

await self._ray_get(self.registry.release.remote(container_id))
return {"status": "ok", "container_id": container_id}
try:
await ray_get_async(
[handle.cleanup.remote(), self.registry.release.remote(container_id)]
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"container_id {container_id} release failed: {repr(e)}",
)
return {"container_id": container_id}

@app.post("/execute")
async def execute(self, payload: Dict):
"""
payload: {"container_id": "...", "cmd": "..."}
return :{"returncode": bash status code, "output": stdout}
"""
container_id = payload.get("container_id")
cmd = payload.get("cmd")
Expand All @@ -314,9 +356,10 @@ async def execute(self, payload: Dict):
cwd = payload.get("cwd")
env = payload.get("env")
forward_env = payload.get("forward_env")
timeout = payload.get("timeout", 30)

# lookup actor for container
handle = await self._ray_get(
handle = await ray_get_async(
self.registry.get_container_handle.remote(container_id)
)
if handle is None:
Expand All @@ -326,26 +369,61 @@ async def execute(self, payload: Dict):

# call the actor.execute remotely; await result
try:
return await self._ray_get(
handle.execute.remote(cmd, cwd=cwd, env=env, forward_env=forward_env)
return await ray_get_async(
handle.execute.remote(
cmd, cwd=cwd, env=env, forward_env=forward_env, timeout=timeout
)
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"actor execution failed: {e}")
raise HTTPException(
status_code=500, detail=f"actor execution failed: {repr(e)}"
)

@app.get("/status")
async def status(self):
info = await self._ray_get(self.registry.list_actors.remote())
return info
info = await ray_get_async(self.registry.list_actors.remote())
return {
"actors": {aid: info["owner"] for aid, info in info["actors"].items()},
"containers": info["containers"],
}

@app.post("/release_all")
async def release_all_containers(self, payload: Dict):
"""
remove all live containers
return {"container_ids": list(container_id)}
"""
actors_containers = await ray_get_async(self.registry.list_actors.remote())
actors = actors_containers["actors"]
containers = actors_containers["containers"]
handles = [
actors[aid]["handle"] for aid in containers.values() if aid in actors
]
try:
await ray_get_async(
[handle.cleanup.remote() for handle in handles]
+ [self.registry.release_all_containers.remote()]
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"release_all_containers failed: {repr(e)}"
)
return {"container_ids": list(containers.keys())}

def __del__(self):
"""Clean up this replica when it's destroyed"""
# This might not work reliably in all shutdown scenarios
# Signal background thread to stop, but don't wait
self._stop_event.set()

try:
tasks = []
tasks.append(self.registry.cleanup_replica.remote(self.replica_id))
for handle in self.local_actors:
tasks.append(handle.cleanup.remote())
ray.get(tasks)
try:
tasks.append(handle.cleanup.remote())
except Exception:
pass
ray.get(tasks, raise_on_error=False)
except Exception:
# Ignore all exceptions during cleanup
pass
Expand Down
7 changes: 3 additions & 4 deletions matrix/app_server/deploy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@
import_path: matrix.app_server.container.container_deployment:build_app
runtime_env: {}
args:
num_containers_per_replica: {{ app.num_containers_per_replica }}
num_containers_per_replica: {{ app.max_ongoing_requests }}
deployments:
- name: ContainerDeployment
max_ongoing_requests: 32
max_ongoing_requests: {{ app.max_ongoing_requests }}
autoscaling_config:
target_ongoing_requests: 32
target_ongoing_requests: {{ (app.max_ongoing_requests * 0.8) | int }}
min_replicas: {{ app.min_replica }}
max_replicas: {{ app.max_replica }}
{% elif app.app_type == 'hello' %}
Expand Down Expand Up @@ -384,7 +384,6 @@ def get_yaml_for_deployment(
default_params: Dict[str, Union[str, int]] = {
"name": "container",
"max_ongoing_requests": 32,
"num_containers_per_replica": 32,
}
app.update({k: v for k, v in default_params.items() if k not in app})
yaml_str += Template(other_app_template).render(app=app)
Expand Down
Loading