Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 276 additions & 0 deletions affine/lium_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
"""
Lium-backed model serving for Affine sampling.
This module provisions short-lived GPU pods on Lium, starts a minimal
OpenAI-compatible HTTP server inside the pod for a requested model,
and exposes the server via a local SSH tunnel so existing evaluation
code can use a base_url like http://127.0.0.1:<port>/v1 unchanged.
"""
from __future__ import annotations

import atexit
import os
import random
import shlex
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'shlex' is not used.

Suggested change
import shlex

Copilot uses AI. Check for mistakes.
import socket
import subprocess
import threading
import time
from dataclasses import dataclass
from typing import Dict, Optional

from lium_sdk import Lium, ExecutorInfo, PodInfo


_LOCK = threading.Lock()
_MODEL_TO_SERVER: Dict[str, "ModelServer"] = {}
_SSH_TUNNELS: Dict[int, subprocess.Popen] = {}
_LIUM = Lium()
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module-level _LIUM instance is initialized without checking if the LIUM_API_KEY environment variable is set. This will likely fail at runtime when Lium mode is enabled but the API key is missing. Consider either:

  1. Lazy initialization (create the instance only when needed)
  2. Early validation with a clear error message

Example for lazy initialization:

_LIUM: Optional[Lium] = None

def _get_lium() -> Lium:
    global _LIUM
    if _LIUM is None:
        if not os.getenv("LIUM_API_KEY"):
            raise ValueError("LIUM_API_KEY environment variable is required when AFFINE_USE_LIUM=1")
        _LIUM = Lium()
    return _LIUM

Copilot uses AI. Check for mistakes.


def _pick_free_port(start: int = 30000, end: int = 45000) -> int:
"""Return an available localhost TCP port."""
for _ in range(50):
port = random.randint(start, end)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("127.0.0.1", port))
return port
except OSError:
continue
# last resort: let the OS pick an ephemeral port
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]


def _choose_executor() -> ExecutorInfo:
"""Choose a Lium executor, preferring GPU types from AFFINE_LIUM_GPU."""
prefs = (os.getenv("AFFINE_LIUM_GPU") or "").split(",")
prefs = [p.strip().upper() for p in prefs if p.strip()]
exs = _LIUM.ls()
if prefs:
for p in prefs:
for e in exs:
if e.gpu_type.upper().startswith(p):
return e
# Prefer docker-in-docker for easier server setups
for e in exs:
if getattr(e, "docker_in_docker", False):
return e
return exs[0]
Comment on lines +47 to +61
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The executor list could be empty, which would cause an IndexError on line 61 (return exs[0]). Add validation to handle this case:

if not exs:
    raise RuntimeError("No Lium executors available")
# Prefer docker-in-docker for easier server setups
for e in exs:
    if getattr(e, "docker_in_docker", False):
        return e
return exs[0]

Copilot uses AI. Check for mistakes.


def _build_server_py() -> str:
"""Inline FastAPI server exposing minimal OpenAI-compatible endpoints."""
return r'''
import os, uvicorn, torch
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM
MODEL_ID = os.environ.get("AFFINE_MODEL_ID") or "google/flan-t5-small"
MAX_TOK = int(os.environ.get("AFFINE_MAX_NEW_TOKENS", "64"))
TEMPERATURE = float(os.environ.get("AFFINE_TEMPERATURE", "0.7"))
tok = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True)
model = None
try:
model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_ID, trust_remote_code=True)
except Exception:
model = AutoModelForCausalLM.from_pretrained(MODEL_ID, trust_remote_code=True)
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
if tok.pad_token is None:
tok.pad_token = tok.eos_token
app = FastAPI()
class ChatMessage(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: Optional[str] = None
messages: List[ChatMessage]
temperature: Optional[float] = None
max_tokens: Optional[int] = None
class CompRequest(BaseModel):
model: Optional[str] = None
prompt: str
temperature: Optional[float] = None
max_tokens: Optional[int] = None
def _gen(prompt: str, max_new: int, temp: float) -> str:
encoded = tok(prompt, return_tensors="pt").to(device)
with torch.inference_mode():
out = model.generate(
**encoded,
do_sample=(temp > 0),
temperature=max(1e-5, temp),
max_new_tokens=max_new,
pad_token_id=tok.pad_token_id,
)
return tok.decode(out[0], skip_special_tokens=True)
Comment on lines +107 to +117
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inline server code has a potential issue with the generation logic for decoder-only models. For causal language models, the decode operation will include the input prompt in the output, but the code doesn't strip it. This means the response will contain both the prompt and the completion, which may not match the expected OpenAI API behavior. Consider updating the decoder logic:

def _gen(prompt: str, max_new: int, temp: float) -> str:
    encoded = tok(prompt, return_tensors="pt").to(device)
    input_length = encoded.input_ids.shape[1]
    with torch.inference_mode():
        out = model.generate(
            **encoded,
            do_sample=(temp > 0),
            temperature=max(1e-5, temp),
            max_new_tokens=max_new,
            pad_token_id=tok.pad_token_id,
        )
    # For causal models, skip the input tokens
    decoded = tok.decode(out[0][input_length:], skip_special_tokens=True)
    return decoded

Copilot uses AI. Check for mistakes.
@app.post("/v1/chat/completions")
def chat(req: ChatRequest) -> Dict[str, Any]:
prompt = "\n".join(m.content for m in req.messages if m.content)
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inline server code concatenates all message contents with newlines without considering message roles. This loses important context that could affect model behavior. For a proper chat interface, different roles (system, user, assistant) should be formatted according to the model's expected chat template. Consider using the tokenizer's apply_chat_template method if available:

@app.post("/v1/chat/completions")
def chat(req: ChatRequest) -> Dict[str, Any]:
    messages = [{"role": m.role, "content": m.content} for m in req.messages]
    if hasattr(tok, 'apply_chat_template'):
        prompt = tok.apply_chat_template(messages, tokenize=False)
    else:
        # Fallback for models without chat template
        prompt = "\n".join(m.content for m in req.messages if m.content)
    ans = _gen(prompt, req.max_tokens or MAX_TOK, req.temperature or TEMPERATURE)
    # ...
Suggested change
prompt = "\n".join(m.content for m in req.messages if m.content)
messages = [{"role": m.role, "content": m.content} for m in req.messages]
if hasattr(tok, 'apply_chat_template'):
prompt = tok.apply_chat_template(messages, tokenize=False)
else:
prompt = "\n".join(m["content"] for m in messages if m["content"])

Copilot uses AI. Check for mistakes.
ans = _gen(prompt, req.max_tokens or MAX_TOK, req.temperature or TEMPERATURE)
return {
"id": "chatcmpl",
"object": "chat.completion",
"choices": [{"index": 0, "message": {"role": "assistant", "content": ans}}],
"model": req.model or MODEL_ID,
}
@app.post("/v1/completions")
def completions(req: CompRequest) -> Dict[str, Any]:
ans = _gen(req.prompt, req.max_tokens or MAX_TOK, req.temperature or TEMPERATURE)
return {
"id": "cmpl",
"object": "text_completion",
"choices": [{"index": 0, "text": ans}],
"model": req.model or MODEL_ID,
}
def main():
uvicorn.run(app, host="0.0.0.0", port=8000)
if __name__ == "__main__":
main()
'''


@dataclass
class ModelServer:
"""Represents a running Lium pod hosting a model HTTP server."""
model: str
pod: PodInfo
local_port: Optional[int] = None
public_url: Optional[str] = None

@property
def base_url(self) -> str:
"""Prefer public URL (reachable from containers) else local tunnel."""
if self.public_url:
return self.public_url.rstrip("/") + "/v1"
if self.local_port:
return f"http://127.0.0.1:{self.local_port}/v1"
raise RuntimeError("Model server has no reachable URL")


def _try_public_url(pod: PodInfo) -> Optional[str]:
"""Attempt to derive a public HTTP URL from pod port mappings."""
try:
# Refresh pod info to get latest ports mapping
for p in _LIUM.ps():
if p.id == pod.id:
pod = p
break
# Common shapes: {"8000": 30123} or {"http":30123}
ports = pod.ports or {}
port = None
if "8000" in ports:
port = ports.get("8000")
elif "http" in ports:
port = ports.get("http")
elif isinstance(ports, dict):
# Fallback to any int value
for v in ports.values():
if isinstance(v, int):
port = v
break
host = pod.host
if host and port:
return f"http://{host}:{port}"
except Exception:
return None
return None


def _start_server_on_pod(model: str) -> ModelServer:
"""Provision a pod, start server, and open local SSH tunnel."""
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing input validation for the model parameter. If model is empty or None, it could lead to issues downstream (e.g., in pod naming on line 198, or in the server environment variable on line 205). Consider adding validation at the start of the function:

if not model:
    raise ValueError("model parameter cannot be empty")
Suggested change
"""Provision a pod, start server, and open local SSH tunnel."""
"""Provision a pod, start server, and open local SSH tunnel."""
if not model:
raise ValueError("model parameter cannot be empty")

Copilot uses AI. Check for mistakes.
ex = _choose_executor()
pod_dict = _LIUM.up(executor_id=ex.id, pod_name=f"affine-{int(time.time())}", initial_port_count=1)
pod = _LIUM.wait_ready(pod_dict, timeout=900)
if not pod:
raise RuntimeError("Lium pod failed to become ready")

Comment on lines +199 to +202
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The 900-second (15-minute) timeout for wait_ready is quite long and could cause the application to hang for extended periods if pod provisioning fails. Consider either:

  1. Reducing the timeout to a more reasonable value (e.g., 300 seconds / 5 minutes)
  2. Adding logging to inform users that pod provisioning is in progress
  3. Making the timeout configurable via an environment variable

Additionally, the error message "Lium pod failed to become ready" doesn't provide details about why it failed, making debugging difficult.

Suggested change
pod = _LIUM.wait_ready(pod_dict, timeout=900)
if not pod:
raise RuntimeError("Lium pod failed to become ready")
timeout = int(os.getenv("AFFINE_LIUM_POD_TIMEOUT", "300"))
print(f"Provisioning Lium pod for model '{model}' (timeout: {timeout}s)...")
pod = _LIUM.wait_ready(pod_dict, timeout=timeout)
if not pod:
pod_info = pod_dict if pod_dict else {}
raise RuntimeError(
f"Lium pod failed to become ready after {timeout}s. "
f"Pod info: {pod_info}"
)

Copilot uses AI. Check for mistakes.
# Write server file and start it in background
env = {
"AFFINE_MODEL_ID": model,
"AFFINE_MAX_NEW_TOKENS": os.getenv("AFFINE_MAX_NEW_TOKENS", "64"),
"AFFINE_TEMPERATURE": os.getenv("AFFINE_TEMPERATURE", "0.7"),
}
py_code = _build_server_py()
cmd = f"""bash -lc 'python3 -m venv /tmp/venv && . /tmp/venv/bin/activate && pip -q install --upgrade pip && pip -q install torch --index-url https://download.pytorch.org/whl/cu121 && pip -q install transformers sentencepiece tiktoken fastapi uvicorn && cat > /tmp/affine_serv.py <<PY
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The inline bash command installs PyTorch with CUDA 12.1 support (cu121), but there's no verification that the selected executor actually has CUDA 12.1 compatible GPUs. Some GPUs might require different CUDA versions. Consider either:

  1. Detecting the CUDA version available on the pod and installing the matching PyTorch version
  2. Using a more universal PyTorch installation method
  3. Documenting the CUDA 12.1 requirement in the module docstring

Copilot uses AI. Check for mistakes.
{py_code}
PY
nohup bash -lc "(. /tmp/venv/bin/activate; python /tmp/affine_serv.py)" >/tmp/affine_serv.log 2>&1 & disown
sleep 3
curl -sSf localhost:8000/docs >/dev/null || (tail -n+1 /tmp/affine_serv.log; exit 1)
Comment on lines +210 to +215
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The command uses pip -q install which suppresses output, making it difficult to debug installation failures. While this keeps logs clean during normal operation, it could hide important error messages. Consider using normal verbosity and redirecting output to a file that can be inspected if the installation fails:

pip install --upgrade pip > /tmp/pip_install.log 2>&1 && \
pip install torch --index-url https://download.pytorch.org/whl/cu121 >> /tmp/pip_install.log 2>&1 && \
pip install transformers sentencepiece tiktoken fastapi uvicorn >> /tmp/pip_install.log 2>&1

Then update the error handling to include this log file in the error message.

Suggested change
cmd = f"""bash -lc 'python3 -m venv /tmp/venv && . /tmp/venv/bin/activate && pip -q install --upgrade pip && pip -q install torch --index-url https://download.pytorch.org/whl/cu121 && pip -q install transformers sentencepiece tiktoken fastapi uvicorn && cat > /tmp/affine_serv.py <<PY
{py_code}
PY
nohup bash -lc "(. /tmp/venv/bin/activate; python /tmp/affine_serv.py)" >/tmp/affine_serv.log 2>&1 & disown
sleep 3
curl -sSf localhost:8000/docs >/dev/null || (tail -n+1 /tmp/affine_serv.log; exit 1)
cmd = f"""bash -lc 'python3 -m venv /tmp/venv && . /tmp/venv/bin/activate && \
pip install --upgrade pip > /tmp/pip_install.log 2>&1 && \
pip install torch --index-url https://download.pytorch.org/whl/cu121 >> /tmp/pip_install.log 2>&1 && \
pip install transformers sentencepiece tiktoken fastapi uvicorn >> /tmp/pip_install.log 2>&1 && \
cat > /tmp/affine_serv.py <<PY
{py_code}
PY
nohup bash -lc "(. /tmp/venv/bin/activate; python /tmp/affine_serv.py)" >/tmp/affine_serv.log 2>&1 & disown
sleep 3
curl -sSf localhost:8000/docs >/dev/null || (tail -n+1 /tmp/pip_install.log; tail -n+1 /tmp/affine_serv.log; exit 1)

Copilot uses AI. Check for mistakes.
'"""
res = _LIUM.exec(pod, command=cmd, env=env)
if not res.get("success"):
raise RuntimeError(f"Failed to start server: {res.get('stderr')}")

Comment on lines +210 to +220
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The complex bash command with heredoc and background process execution is fragile and difficult to debug if it fails. The error message on line 219 may not provide sufficient context. Consider:

  1. Breaking the command into separate steps with better error handling
  2. Logging intermediate steps for debugging
  3. Providing more specific error messages (e.g., "Failed to install dependencies", "Server failed to start", etc.)

Additionally, the curl check on line 215 may succeed even if the server isn't fully ready to serve model requests, as it only checks if the /docs endpoint responds, not if the model is loaded.

Suggested change
cmd = f"""bash -lc 'python3 -m venv /tmp/venv && . /tmp/venv/bin/activate && pip -q install --upgrade pip && pip -q install torch --index-url https://download.pytorch.org/whl/cu121 && pip -q install transformers sentencepiece tiktoken fastapi uvicorn && cat > /tmp/affine_serv.py <<PY
{py_code}
PY
nohup bash -lc "(. /tmp/venv/bin/activate; python /tmp/affine_serv.py)" >/tmp/affine_serv.log 2>&1 & disown
sleep 3
curl -sSf localhost:8000/docs >/dev/null || (tail -n+1 /tmp/affine_serv.log; exit 1)
'"""
res = _LIUM.exec(pod, command=cmd, env=env)
if not res.get("success"):
raise RuntimeError(f"Failed to start server: {res.get('stderr')}")
# Step 1: Create virtualenv
cmd_venv = "python3 -m venv /tmp/venv"
res = _LIUM.exec(pod, command=cmd_venv, env=env)
if not res.get("success"):
raise RuntimeError(f"Failed to create virtualenv: {res.get('stderr')}")
# Step 2: Activate venv and upgrade pip
cmd_pip = "bash -lc '. /tmp/venv/bin/activate && pip install --upgrade pip'"
res = _LIUM.exec(pod, command=cmd_pip, env=env)
if not res.get("success"):
raise RuntimeError(f"Failed to upgrade pip: {res.get('stderr')}")
# Step 3: Install dependencies
cmd_deps = (
"bash -lc '. /tmp/venv/bin/activate && "
"pip install torch --index-url https://download.pytorch.org/whl/cu121 && "
"pip install transformers sentencepiece tiktoken fastapi uvicorn'"
)
res = _LIUM.exec(pod, command=cmd_deps, env=env)
if not res.get("success"):
raise RuntimeError(f"Failed to install dependencies: {res.get('stderr')}")
# Step 4: Write server file
cmd_write = f"cat > /tmp/affine_serv.py <<PY\n{py_code}\nPY\n"
res = _LIUM.exec(pod, command=cmd_write, env=env)
if not res.get("success"):
raise RuntimeError(f"Failed to write server file: {res.get('stderr')}")
# Step 5: Start server in background
cmd_start = "nohup bash -lc '(. /tmp/venv/bin/activate; python /tmp/affine_serv.py)' >/tmp/affine_serv.log 2>&1 & disown"
res = _LIUM.exec(pod, command=cmd_start, env=env)
if not res.get("success"):
raise RuntimeError(f"Failed to start server: {res.get('stderr')}")
# Step 6: Wait and check readiness
time.sleep(3)
# Improved readiness check: look for "model loaded" in log
cmd_check = "grep -i 'model loaded' /tmp/affine_serv.log"
res = _LIUM.exec(pod, command=cmd_check, env=env)
if not res.get("success"):
# If not found, show full log for debugging
log_res = _LIUM.exec(pod, command="cat /tmp/affine_serv.log", env=env)
log_output = log_res.get("stdout", "") if log_res else ""
raise RuntimeError(f"Server failed to become ready. Log output:\n{log_output}")

Copilot uses AI. Check for mistakes.
# Prefer public URL if available; otherwise, open SSH tunnel -> local_port
public = _try_public_url(pod)
if public:
return ModelServer(model=model, pod=pod, public_url=public)
local_port = _pick_free_port()
user = pod.username or "root"
host = pod.host or "localhost"
port = pod.ssh_port
key = str(_LIUM.config.ssh_key_path)
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling: If the SSH key file doesn't exist at the path _LIUM.config.ssh_key_path, the SSH command will fail silently (output redirected to DEVNULL), and the ModelServer will be returned with a non-functional tunnel. Consider validating that the SSH key exists before attempting to create the tunnel:

key = str(_LIUM.config.ssh_key_path)
if not os.path.exists(key):
    raise RuntimeError(f"SSH key not found at {key}")
Suggested change
key = str(_LIUM.config.ssh_key_path)
key = str(_LIUM.config.ssh_key_path)
if not os.path.exists(key):
raise RuntimeError(f"SSH key not found at {key}")

Copilot uses AI. Check for mistakes.
ssh_cmd = [
"ssh",
"-i", key,
"-N",
"-o", "StrictHostKeyChecking=no",
Comment on lines +230 to +234
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Security concern: The SSH command uses -o StrictHostKeyChecking=no, which disables host key verification. This makes the connection vulnerable to man-in-the-middle attacks. While this may be acceptable for ephemeral pods with dynamic IPs, consider documenting this security trade-off or providing an option to use a known_hosts file if the Lium infrastructure uses stable host keys.

Suggested change
ssh_cmd = [
"ssh",
"-i", key,
"-N",
"-o", "StrictHostKeyChecking=no",
# SECURITY NOTE: By default, disables SSH host key verification for ephemeral pods.
# If your infrastructure uses stable host keys, set AFFINE_KNOWN_HOSTS_PATH to a known_hosts file
# to enable verification and mitigate MITM risks.
known_hosts_path = os.environ.get("AFFINE_KNOWN_HOSTS_PATH")
ssh_cmd = [
"ssh",
"-i", key,
"-N",
]
if known_hosts_path:
ssh_cmd += [
"-o", f"UserKnownHostsFile={known_hosts_path}",
"-o", "StrictHostKeyChecking=yes",
]
else:
ssh_cmd += [
"-o", "StrictHostKeyChecking=no",
]
ssh_cmd += [

Copilot uses AI. Check for mistakes.
"-p", str(port),
"-L", f"{local_port}:127.0.0.1:8000",
f"{user}@{host}",
]
proc = subprocess.Popen(ssh_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
_SSH_TUNNELS[local_port] = proc
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SSH tunnel process is started but there's no verification that it successfully established the connection. The code should wait briefly and verify the tunnel is working before returning. Consider adding a connection check:

proc = subprocess.Popen(ssh_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
_SSH_TUNNELS[local_port] = proc
time.sleep(2)  # Give SSH tunnel time to establish
# Verify tunnel is working
try:
    with socket.create_connection(("127.0.0.1", local_port), timeout=5):
        pass
except (socket.error, socket.timeout) as e:
    proc.terminate()
    raise RuntimeError(f"SSH tunnel failed to establish: {e}")
Suggested change
_SSH_TUNNELS[local_port] = proc
_SSH_TUNNELS[local_port] = proc
time.sleep(2) # Give SSH tunnel time to establish
# Verify tunnel is working
try:
with socket.create_connection(("127.0.0.1", local_port), timeout=5):
pass
except (socket.error, socket.timeout) as e:
proc.terminate()
raise RuntimeError(f"SSH tunnel failed to establish: {e}")

Copilot uses AI. Check for mistakes.
return ModelServer(model=model, pod=pod, local_port=local_port)


def ensure_model_server(model: str) -> ModelServer:
"""Return a live server for model, provisioning if necessary."""
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing input validation for the model parameter. If model is empty or None, it could lead to accessing the wrong cached server or creating an invalid cache entry. Consider adding validation at the start of the function:

if not model:
    raise ValueError("model parameter cannot be empty")
Suggested change
"""Return a live server for model, provisioning if necessary."""
"""Return a live server for model, provisioning if necessary."""
if not model:
raise ValueError("model parameter cannot be empty")

Copilot uses AI. Check for mistakes.
with _LOCK:
srv = _MODEL_TO_SERVER.get(model)
if srv:
return srv
srv = _start_server_on_pod(model)
_MODEL_TO_SERVER[model] = srv
return srv
Comment on lines +244 to +252
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential resource leak: If an exception occurs after the pod is created (line 198-199) but before the ModelServer is added to _MODEL_TO_SERVER (line 251), the pod will not be tracked and cannot be cleaned up by stop_all(). This could leave orphaned pods running. Consider wrapping the server creation in a try-except block to ensure cleanup:

srv = _MODEL_TO_SERVER.get(model)
if srv:
    return srv
pod = None
try:
    srv = _start_server_on_pod(model)
    _MODEL_TO_SERVER[model] = srv
    return srv
except Exception:
    # Clean up pod if server creation fails
    if pod:
        try:
            _LIUM._request("DELETE", f"/pods/{pod.id}")
        except Exception:
            pass
    raise

Copilot uses AI. Check for mistakes.


def stop_all() -> None:
"""Terminate SSH tunnels and pods."""
with _LOCK:
for port, proc in list(_SSH_TUNNELS.items()):
try:
proc.terminate()
except Exception:
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Copilot uses AI. Check for mistakes.
pass
_SSH_TUNNELS.pop(port, None)
for model, srv in list(_MODEL_TO_SERVER.items()):
try:
_LIUM._request("DELETE", f"/pods/{srv.pod.id}")
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _LIUM._request() method call accesses a private method (prefixed with _) from the SDK. This is fragile as private methods are not part of the public API and may change without notice. Consider using the public API method if available, or check the Lium SDK documentation for the recommended way to delete pods.

Suggested change
_LIUM._request("DELETE", f"/pods/{srv.pod.id}")
_LIUM.delete_pod(srv.pod.id)

Copilot uses AI. Check for mistakes.
except Exception:
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
except Exception:
except Exception:
# Ignore errors during pod deletion; best-effort cleanup.

Copilot uses AI. Check for mistakes.
pass
_MODEL_TO_SERVER.pop(model, None)
Comment on lines +261 to +269
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stop_all() function modifies dictionaries while iterating over them using list() to create a snapshot. However, after popping items, the function continues to the next model/port without checking if the operation succeeded. If proc.terminate() or the DELETE request fails, the item is still removed from the tracking dictionary, leading to resource leaks. Consider only removing items after successful cleanup:

def stop_all() -> None:
    """Terminate SSH tunnels and pods."""
    with _LOCK:
        for port, proc in list(_SSH_TUNNELS.items()):
            try:
                proc.terminate()
                proc.wait(timeout=5)
                _SSH_TUNNELS.pop(port, None)
            except Exception:
                pass  # Keep in dict for retry
        for model, srv in list(_MODEL_TO_SERVER.items()):
            try:
                _LIUM._request("DELETE", f"/pods/{srv.pod.id}")
                _MODEL_TO_SERVER.pop(model, None)
            except Exception:
                pass  # Keep in dict for retry
Suggested change
except Exception:
pass
_SSH_TUNNELS.pop(port, None)
for model, srv in list(_MODEL_TO_SERVER.items()):
try:
_LIUM._request("DELETE", f"/pods/{srv.pod.id}")
except Exception:
pass
_MODEL_TO_SERVER.pop(model, None)
proc.wait(timeout=5)
_SSH_TUNNELS.pop(port, None)
except Exception:
pass # Keep in dict for retry
for model, srv in list(_MODEL_TO_SERVER.items()):
try:
_LIUM._request("DELETE", f"/pods/{srv.pod.id}")
_MODEL_TO_SERVER.pop(model, None)
except Exception:
pass # Keep in dict for retry

Copilot uses AI. Check for mistakes.


@atexit.register
def _cleanup() -> None:
stop_all()


20 changes: 13 additions & 7 deletions affine/miners.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,21 +305,27 @@ async def _fetch_miner(uid: int) -> Optional["Miner"]:
)

async with meta_sem:
chute = await get_chute(chute_id)
if os.getenv("AFFINE_USE_LIUM") == "1":
chute = {"name": model, "slug": "lium-local", "revision": miner_revision}
else:
chute = await get_chute(chute_id)

if not chute:
return None

chute_name = chute.get("name")
if model != chute_name:
return None
if os.getenv("AFFINE_USE_LIUM") != "1":
if model != chute_name:
return None

if uid != 0 and chute_name.split("/")[1].lower()[:6] != "affine":
return None
if os.getenv("AFFINE_USE_LIUM") != "1":
if uid != 0 and chute_name.split("/")[1].lower()[:6] != "affine":
return None

chute_revision = chute.get("revision")
if chute_revision is not None and miner_revision != chute_revision:
return None
if os.getenv("AFFINE_USE_LIUM") != "1":
if chute_revision is not None and miner_revision != chute_revision:
return None
Comment on lines +308 to +328
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The environment variable AFFINE_USE_LIUM is checked multiple times in this function. Consider caching the result in a local variable at the start of the function to improve readability and avoid redundant os.getenv() calls.

use_lium = os.getenv("AFFINE_USE_LIUM") == "1"
if use_lium:
    chute = {"name": model, "slug": "lium-local", "revision": miner_revision}
else:
    chute = await get_chute(chute_id)
# ... use use_lium in subsequent conditions

Copilot uses AI. Check for mistakes.

return Miner(
uid=uid,
Expand Down
9 changes: 8 additions & 1 deletion affine/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ def to_payload(self, miner: Optional["Miner"] = None, **kwargs) -> Dict[str, Any
# Add miner-based defaults if miner is provided
if miner is not None:
payload["model"] = miner.model
payload["base_url"] = f"https://{miner.slug}.chutes.ai/v1"
if os.getenv("AFFINE_USE_LIUM") == "1":
from affine.lium_backend import ensure_model_server
srv = ensure_model_server(miner.model)
payload["base_url"] = srv.base_url
Comment on lines +65 to +67
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The import statement from affine.lium_backend import ensure_model_server is inside the conditional block, which is good for avoiding import errors when Lium is not used. However, if ensure_model_server raises an exception during execution, the error handling context may be unclear. Consider adding a try-except block with a descriptive error message:

if os.getenv("AFFINE_USE_LIUM") == "1":
    try:
        from affine.lium_backend import ensure_model_server
        srv = ensure_model_server(miner.model)
        payload["base_url"] = srv.base_url
    except Exception as e:
        raise RuntimeError(f"Failed to initialize Lium backend: {e}") from e
else:
    payload["base_url"] = f"https://{miner.slug}.chutes.ai/v1"
Suggested change
from affine.lium_backend import ensure_model_server
srv = ensure_model_server(miner.model)
payload["base_url"] = srv.base_url
try:
from affine.lium_backend import ensure_model_server
srv = ensure_model_server(miner.model)
payload["base_url"] = srv.base_url
except Exception as e:
raise RuntimeError(f"Failed to initialize Lium backend: {e}") from e

Copilot uses AI. Check for mistakes.
else:
payload["base_url"] = f"https://{miner.slug}.chutes.ai/v1"

# Allow kwargs to override any default values
payload.update(kwargs)
Expand Down Expand Up @@ -126,6 +131,8 @@ def docker_image(self) -> str:
@property
def env_vars(self) -> Dict[str, str]:
"""Return environment variables for this environment"""
if os.getenv("AFFINE_USE_LIUM") == "1":
return {}
Comment on lines +134 to +135
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] When AFFINE_USE_LIUM=1, this method returns an empty dictionary and skips the CHUTES_API_KEY validation entirely. However, if the Lium backend is being used alongside Chutes (e.g., in a mixed environment), or if the env_vars are needed for other purposes beyond authentication, this could cause issues. Consider documenting this behavior or providing a more targeted exemption that only skips the specific validation that's not needed for Lium mode.

Copilot uses AI. Check for mistakes.
api_key = os.getenv("CHUTES_API_KEY")
if not api_key:
raise ValueError("CHUTES_API_KEY environment variable is required")
Expand Down
Loading