From 2520d97fa0fb75bee3d9073f02d3a7e9e028a194 Mon Sep 17 00:00:00 2001 From: robmsmt Date: Tue, 19 May 2026 16:14:01 +0200 Subject: [PATCH 1/3] add L1 routing --- backend/config.py | 7 + backend/routers/completions.py | 20 ++- backend/routers/models.py | 14 ++ backend/routers/responses.py | 13 +- backend/services/cscs_l1_service.py | 175 +++++++++++++++++++ backend/tests/test_cscs_l1_service.py | 178 ++++++++++++++++++++ backend/tests/test_model_service.py | 81 +++++++++ frontend/src/components/ui/ModelCard.svelte | 54 +++--- 8 files changed, 516 insertions(+), 26 deletions(-) create mode 100644 backend/services/cscs_l1_service.py create mode 100644 backend/tests/test_cscs_l1_service.py diff --git a/backend/config.py b/backend/config.py index 3c3952f..00c07c9 100644 --- a/backend/config.py +++ b/backend/config.py @@ -33,6 +33,13 @@ class Settings(BaseSettings): default="", validation_alias=AliasChoices("otela_fixture_path", "ocf_fixture_path"), ) + # CSCS L1 passthrough — when set, chat/completion requests for the + # hardcoded L1 model list in backend/services/cscs_l1_service.py are + # forwarded here instead of the OpenTela network. Lets us expose + # Apertus 8B/70B from the upstream L1 service without launching our + # own k8s pods. Both must be provided via env in k8s secrets. + cscs_l1_base_url: str = "" + cscs_l1_api_key: str = "" langfuse_host: str = "" langfuse_public_key: str = "" langfuse_secret_key: str = "" diff --git a/backend/routers/completions.py b/backend/routers/completions.py index f296793..6c7329e 100644 --- a/backend/routers/completions.py +++ b/backend/routers/completions.py @@ -6,12 +6,22 @@ llm_proxy_completions, response_generator, ) +from backend.services.cscs_l1_service import is_l1_model, l1_endpoint, l1_api_key from backend.models.protocols import LLMRequest, LLMCompletionsRequest from backend.config import get_settings router = APIRouter() settings = get_settings() + +async def _resolve_endpoint_and_key(model: str, user_token: str) -> tuple[str, str]: + """L1-hosted models go to the upstream L1 endpoint with our shared L1 + key; everything else stays on the OpenTela proxy with the user's + bearer token forwarded as-is.""" + if await is_l1_model(model): + return l1_endpoint(), l1_api_key() + return settings.otela_head_addr + "/v1/service/llm/v1/", user_token + CHAT_RESERVED_KEYS = [ "model", "messages", @@ -74,9 +84,10 @@ async def chat_completion( user_id=token, opt_out=opt_out, app_title=app_title, **reorg_data ) + endpoint, api_key = await _resolve_endpoint_and_key(llm_request.model, token) response = await llm_proxy( - endpoint=settings.otela_head_addr + "/v1/service/llm/v1/", - api_key=token, + endpoint=endpoint, + api_key=api_key, request=llm_request, ) if "stream" in data and data["stream"]: @@ -124,9 +135,10 @@ async def completion( user_id=token, opt_out=opt_out, app_title=app_title, **reorg_data ) + endpoint, api_key = await _resolve_endpoint_and_key(llm_request.model, token) response = await llm_proxy_completions( - endpoint=settings.otela_head_addr + "/v1/service/llm/v1/", - api_key=token, + endpoint=endpoint, + api_key=api_key, request=llm_request, ) if "stream" in data and data["stream"]: diff --git a/backend/routers/models.py b/backend/routers/models.py index 668dbf9..db9f3de 100644 --- a/backend/routers/models.py +++ b/backend/routers/models.py @@ -1,5 +1,6 @@ from fastapi import APIRouter from backend.services.model_service import get_all_models +from backend.services.cscs_l1_service import get_l1_synthetic_entries from backend.config import get_settings router = APIRouter() @@ -14,9 +15,21 @@ def _dnt_endpoint() -> str: return settings.otela_head_addr + "/v1/dnt/table" +async def _with_l1(models: list[dict], with_details: bool) -> list[dict]: + """Append synthetic L1 entries, skipping ids already present in the + OpenTela result so we don't double-list a model that's still launched + locally during a migration.""" + existing = {m["id"] for m in models if m.get("id")} + for entry in await get_l1_synthetic_entries(with_details=with_details): + if entry["id"] not in existing: + models.append(entry) + return models + + @router.get("/v1/models_detailed") async def list_models_detailed(): models = get_all_models(_dnt_endpoint(), with_details=True) + models = await _with_l1(models, with_details=True) return dict( object="list", data=models, @@ -26,6 +39,7 @@ async def list_models_detailed(): @router.get("/v1/models") async def list_models(): models = get_all_models(_dnt_endpoint(), with_details=False) + models = await _with_l1(models, with_details=False) return dict( object="list", data=models, diff --git a/backend/routers/responses.py b/backend/routers/responses.py index 33ebb49..226cda7 100644 --- a/backend/routers/responses.py +++ b/backend/routers/responses.py @@ -2,6 +2,7 @@ from fastapi.responses import StreamingResponse from backend.middleware.auth import require_auth from backend.services.llm_service import llm_proxy_responses, response_generator_raw +from backend.services.cscs_l1_service import is_l1_model, l1_endpoint, l1_api_key from backend.config import get_settings router = APIRouter() @@ -15,13 +16,19 @@ async def create_response( ): data = await request.json() stream = data.get("stream", False) + model = data.get("model", "unknown") + + if await is_l1_model(model): + endpoint, api_key = l1_endpoint(), l1_api_key() + else: + endpoint, api_key = settings.otela_head_addr + "/v1/service/llm/v1/", token response = await llm_proxy_responses( - endpoint=settings.otela_head_addr + "/v1/service/llm/v1/", - api_key=token, + endpoint=endpoint, + api_key=api_key, payload=data, stream=stream, - model=data.get("model", "unknown"), + model=model, ) if stream: diff --git a/backend/services/cscs_l1_service.py b/backend/services/cscs_l1_service.py new file mode 100644 index 0000000..9a47e86 --- /dev/null +++ b/backend/services/cscs_l1_service.py @@ -0,0 +1,175 @@ +"""CSCS L1 passthrough. + +CSCS already serves a small set of OpenAI-compatible models on their L1 +endpoint. Instead of launching duplicate pods for them ourselves, we +forward those model ids to L1 and surface them in /v1/models alongside +our locally-served models. + +Discovery: we hit L1's own /models endpoint on first use (and every +30 s thereafter) so the set of L1-routable models tracks whatever L1 +exposes, without code changes. A small `FALLBACK_MODEL_IDS` list +backstops the cold-start case when L1 is unreachable on the very first +fetch, so the model list isn't completely missing the Apertus rows +during a brief L1 outage. + +Secrets (base URL, API key) come from env via Settings. +""" + +import asyncio +import time + +import aiohttp + +from backend.config import get_settings + + +# Cold-start fallback. Used only if we haven't successfully fetched +# /models from L1 yet AND the current fetch fails. Once we've fetched +# once successfully, we keep serving the stale cache rather than fall +# back, so a transient outage never drops a model that *was* there. +FALLBACK_MODEL_IDS: list[str] = [ + "Apertus-70B-Instruct-2509", + "Apertus-8B-Instruct-2509", +] + +# 30 s strikes a balance: short enough that an L1 deployment of a new +# model is visible within half a minute, long enough that page reloads +# + completion dispatches don't hammer L1. +_CACHE_TTL_SECONDS = 30.0 +# Timeout for the GET /models probe — keep tight so a wedged L1 can't +# stall /v1/models page loads on our side. +_FETCH_TIMEOUT_SECONDS = 5.0 + +_cache_lock = asyncio.Lock() +_cache: dict = {"fetched_at": 0.0, "ids": None} + + +def _l1_configured() -> bool: + s = get_settings() + return bool(s.cscs_l1_base_url and s.cscs_l1_api_key) + + +def l1_endpoint() -> str: + """Base URL for L1 OpenAI-compatible API (e.g. https://.../v1). + Caller appends /chat/completions etc.""" + return get_settings().cscs_l1_base_url.rstrip("/") + + +def l1_api_key() -> str: + return get_settings().cscs_l1_api_key + + +def _reset_cache_for_tests() -> None: + """Test helper — clears the cache so tests can simulate cold start + without leaking state across cases.""" + _cache["fetched_at"] = 0.0 + _cache["ids"] = None + + +async def _fetch_l1_model_ids() -> set[str] | None: + """GET {base}/models from L1. Returns None on any failure (network, + non-200, malformed JSON) so the caller can decide whether to keep + stale cache or fall back.""" + url = l1_endpoint() + "/models" + headers = {"Authorization": f"Bearer {l1_api_key()}"} + try: + timeout = aiohttp.ClientTimeout(total=_FETCH_TIMEOUT_SECONDS) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(url, headers=headers) as resp: + if resp.status != 200: + return None + data = await resp.json() + return {m["id"] for m in data.get("data", []) if m.get("id")} + except Exception: + return None + + +async def _get_cached_ids() -> set[str]: + """Return the L1 model id set. Refreshes if TTL has expired; on + fetch failure keeps stale cache, falling back to FALLBACK_MODEL_IDS + only at true cold start. Never returns an empty set when L1 is + configured — a transient L1 outage shouldn't make the Apertus rows + disappear from the model list.""" + if not _l1_configured(): + return set() + + now = time.time() + if _cache["ids"] is not None and (now - _cache["fetched_at"]) < _CACHE_TTL_SECONDS: + return _cache["ids"] + + async with _cache_lock: + # Another coroutine may have refreshed while we waited on the lock. + if ( + _cache["ids"] is not None + and (time.time() - _cache["fetched_at"]) < _CACHE_TTL_SECONDS + ): + return _cache["ids"] + + fetched = await _fetch_l1_model_ids() + if fetched is not None: + _cache["ids"] = fetched + _cache["fetched_at"] = time.time() + return fetched + + if _cache["ids"] is not None: + # Keep serving stale cache; don't update fetched_at so we + # try again on the next call instead of waiting a full TTL. + return _cache["ids"] + + return set(FALLBACK_MODEL_IDS) + + +async def is_l1_model(model_id: str) -> bool: + """True only when the model is exposed by L1 AND L1 is configured — + so an unconfigured deploy doesn't try to proxy to an empty URL. With + L1 unconfigured, L1 model ids fall through to OpenTela (which 404s + cleanly) instead of producing an opaque connection error.""" + if not model_id or not _l1_configured(): + return False + ids = await _get_cached_ids() + return model_id in ids + + +async def get_l1_synthetic_entries(with_details: bool = False) -> list[dict]: + """Synthesize one peer-style entry per L1 model so they appear in + /v1/models* alongside OpenTela-served models. Mirrors the shape + produced by services.model_service.get_all_models — the frontend + can't tell the difference. + + Returns an empty list when L1 isn't configured: we only advertise + these models if we can actually serve them. + """ + if not _l1_configured(): + return [] + + ids = await _get_cached_ids() + entries: list[dict] = [] + for model_id in sorted(ids): + wg = f"cscs-l1:{model_id}" + entry = { + "id": model_id, + "object": "model", + "created": "0x", + "owner": "0x", + # Empty peer_id/hostname → ModelCard's L1 branch hides the + # head row anyway; keep them blank rather than synthesise + # fake values. + "peer_id": "", + "hostname": "", + "otela_version": "", + "status": "ready", + "labels": { + "launched_by": "cscs_L1", + "framework": "vllm", + }, + "worker_group_id": wg, + "launched_by": "cscs_L1", + "slurm_job_id": "", + "framework": "vllm", + "started_at": "", + "expires_at": "", + } + if with_details: + entry["device"] = "CSCS L1" + entries.append(entry) + return entries diff --git a/backend/tests/test_cscs_l1_service.py b/backend/tests/test_cscs_l1_service.py new file mode 100644 index 0000000..4671e96 --- /dev/null +++ b/backend/tests/test_cscs_l1_service.py @@ -0,0 +1,178 @@ +"""Unit tests for the CSCS L1 passthrough — dynamic model discovery +with TTL cache, stale fallback, and gating on configuration.""" + +import asyncio +from unittest.mock import AsyncMock, patch + +import pytest + +from backend.services import cscs_l1_service +from backend.services.cscs_l1_service import ( + FALLBACK_MODEL_IDS, + _reset_cache_for_tests, + get_l1_synthetic_entries, + is_l1_model, + l1_api_key, + l1_endpoint, +) + + +class _FakeSettings: + def __init__(self, base_url="", api_key=""): + self.cscs_l1_base_url = base_url + self.cscs_l1_api_key = api_key + + +def _run(coro): + return asyncio.run(coro) + + +@pytest.fixture(autouse=True) +def _clear_cache(): + """Each test starts with a cold cache so cache state doesn't leak + across cases.""" + _reset_cache_for_tests() + yield + _reset_cache_for_tests() + + +# ── configuration gating ──────────────────────────────────────────────────── + + +def test_is_l1_model_false_when_unconfigured(): + """If L1 env isn't set, even a known L1 model id returns False so + completion routing falls through to OpenTela for a clean 404.""" + with patch.object( + cscs_l1_service, "get_settings", return_value=_FakeSettings("", "") + ): + assert _run(is_l1_model("Apertus-8B-Instruct-2509")) is False + + +def test_is_l1_model_false_when_half_configured(): + """Both env vars required — partial config (URL but no key) should + not trigger L1 routing.""" + with patch.object( + cscs_l1_service, "get_settings", return_value=_FakeSettings("https://l1/v1", "") + ): + assert _run(is_l1_model("Apertus-8B-Instruct-2509")) is False + + +def test_synthetic_entries_empty_when_unconfigured(): + """Don't advertise L1 models if we can't actually proxy them.""" + with patch.object( + cscs_l1_service, "get_settings", return_value=_FakeSettings("", "") + ): + assert _run(get_l1_synthetic_entries()) == [] + + +# ── happy path: fetch + cache ─────────────────────────────────────────────── + + +def _patch_fetch(ids_or_none): + """Patch _fetch_l1_model_ids with an AsyncMock returning the given + value. Pass a list to return a set; pass None to simulate fetch + failure.""" + value = set(ids_or_none) if ids_or_none is not None else None + return patch.object(cscs_l1_service, "_fetch_l1_model_ids", new=AsyncMock(return_value=value)) + + +def test_is_l1_model_routes_to_fetched_ids(): + """Membership reflects whatever L1 currently exposes — not a hardcoded list.""" + with patch.object( + cscs_l1_service, "get_settings", return_value=_FakeSettings("https://l1/v1", "k") + ), _patch_fetch(["Apertus-8B-Instruct-2509", "Apertus-70B-Instruct-2509"]): + assert _run(is_l1_model("Apertus-8B-Instruct-2509")) is True + assert _run(is_l1_model("Apertus-70B-Instruct-2509")) is True + assert _run(is_l1_model("not-on-l1")) is False + + +def test_synthetic_entries_built_from_fetched_ids(): + """The /v1/models entries surfaced to the frontend are built from + whatever L1 reports — so a new model on L1 shows up without us + deploying.""" + with patch.object( + cscs_l1_service, "get_settings", return_value=_FakeSettings("https://l1/v1", "k") + ), _patch_fetch(["foo/new-model", "Apertus-8B-Instruct-2509"]): + entries = _run(get_l1_synthetic_entries(with_details=True)) + ids = {e["id"] for e in entries} + assert ids == {"foo/new-model", "Apertus-8B-Instruct-2509"} + for e in entries: + assert e["launched_by"] == "cscs_L1" + assert e["framework"] == "vllm" + assert e["device"] == "CSCS L1" + # The empty fields are how the ModelCard's L1 branch ends up showing + # only model/launched_by/framework — keep them empty on the wire. + assert e["slurm_job_id"] == "" + assert e["started_at"] == "" + assert e["expires_at"] == "" + + +def test_fetch_cached_within_ttl(): + """Successive calls within the TTL should hit cache, not re-fetch. + Stops us from hammering L1 on every /v1/models page load + every + completion dispatch.""" + fake = AsyncMock(return_value={"Apertus-8B-Instruct-2509"}) + with patch.object( + cscs_l1_service, "get_settings", return_value=_FakeSettings("https://l1/v1", "k") + ), patch.object(cscs_l1_service, "_fetch_l1_model_ids", new=fake): + _run(is_l1_model("Apertus-8B-Instruct-2509")) + _run(is_l1_model("Apertus-8B-Instruct-2509")) + _run(is_l1_model("anything")) + assert fake.await_count == 1 + + +# ── failure modes ─────────────────────────────────────────────────────────── + + +def test_cold_start_fetch_failure_falls_back_to_hardcoded_list(): + """If L1 is unreachable on the very first call, surface the + hardcoded fallback so the Apertus rows still appear in the model + list instead of mysteriously vanishing.""" + with patch.object( + cscs_l1_service, "get_settings", return_value=_FakeSettings("https://l1/v1", "k") + ), _patch_fetch(None): + entries = _run(get_l1_synthetic_entries()) + ids = {e["id"] for e in entries} + assert ids == set(FALLBACK_MODEL_IDS) + + +def test_stale_cache_preferred_over_fallback_after_initial_success(): + """Once we've fetched successfully, a subsequent fetch failure + should keep serving the *real* set (stale cache) rather than reset + to the fallback. The fallback only exists to backstop cold start — + we don't want a transient outage to drop models that *were* there.""" + fake = AsyncMock(side_effect=[{"custom/only-on-l1"}, None]) + + with patch.object( + cscs_l1_service, "get_settings", return_value=_FakeSettings("https://l1/v1", "k") + ), patch.object(cscs_l1_service, "_fetch_l1_model_ids", new=fake): + first = _run(get_l1_synthetic_entries()) + # Expire the cache and call again; second fetch fails. + cscs_l1_service._cache["fetched_at"] = 0.0 + second = _run(get_l1_synthetic_entries()) + assert {e["id"] for e in first} == {"custom/only-on-l1"} + # Stale cache (real set) preserved, NOT fallback. + assert {e["id"] for e in second} == {"custom/only-on-l1"} + + +# ── helpers ───────────────────────────────────────────────────────────────── + + +def test_l1_endpoint_strips_trailing_slash(): + """Callers append /chat/completions etc., so a trailing slash here + would produce a double-slash URL — strip it defensively.""" + with patch.object( + cscs_l1_service, + "get_settings", + return_value=_FakeSettings("https://l1/v1/", "k"), + ): + assert l1_endpoint() == "https://l1/v1" + + +def test_l1_api_key_reads_settings(): + with patch.object( + cscs_l1_service, + "get_settings", + return_value=_FakeSettings("https://l1/v1", "sk-secret"), + ): + assert l1_api_key() == "sk-secret" diff --git a/backend/tests/test_model_service.py b/backend/tests/test_model_service.py index 464271d..18dd313 100644 --- a/backend/tests/test_model_service.py +++ b/backend/tests/test_model_service.py @@ -192,6 +192,87 @@ def test_request_failure_returns_empty(): assert out == [] +# ── /v1/models L1 merge ───────────────────────────────────────────────────── + + +def _fake_l1_settings(base_url="https://l1/v1", api_key="k"): + class S: + cscs_l1_base_url = base_url + cscs_l1_api_key = api_key + + return S() + + +def _patch_l1_fetch(ids): + from unittest.mock import AsyncMock + from backend.services import cscs_l1_service + + return patch.object( + cscs_l1_service, + "_fetch_l1_model_ids", + new=AsyncMock(return_value=set(ids)), + ) + + +def test_models_router_merges_l1_entries(): + """The models router should advertise L1-hosted models on top of the + OpenTela DNT table so the frontend's model list includes them.""" + import asyncio + + from backend.routers.models import _with_l1 + from backend.services import cscs_l1_service + + cscs_l1_service._reset_cache_for_tests() + base = [{"id": "some/local-model", "object": "model"}] + with patch.object( + cscs_l1_service, "get_settings", return_value=_fake_l1_settings() + ), _patch_l1_fetch(["Apertus-8B-Instruct-2509", "Apertus-70B-Instruct-2509"]): + merged = asyncio.run(_with_l1(list(base), with_details=True)) + ids = {e["id"] for e in merged} + assert "some/local-model" in ids + assert "Apertus-8B-Instruct-2509" in ids + assert "Apertus-70B-Instruct-2509" in ids + + +def test_models_router_dedupes_l1_against_dnt(): + """If a model is already advertised by OpenTela (e.g. mid-migration + we still have a k8s replica running), don't double-list it from L1. + The DNT entry wins — that's the one carrying real peer metadata.""" + import asyncio + + from backend.routers.models import _with_l1 + from backend.services import cscs_l1_service + + cscs_l1_service._reset_cache_for_tests() + base = [{"id": "Apertus-8B-Instruct-2509", "launched_by": "rosmith"}] + with patch.object( + cscs_l1_service, "get_settings", return_value=_fake_l1_settings() + ), _patch_l1_fetch(["Apertus-8B-Instruct-2509"]): + merged = asyncio.run(_with_l1(list(base), with_details=True)) + apertus_8b = [e for e in merged if e["id"] == "Apertus-8B-Instruct-2509"] + assert len(apertus_8b) == 1 + assert apertus_8b[0]["launched_by"] == "rosmith" # DNT entry kept + + +def test_models_router_skips_l1_when_unconfigured(): + """No env → L1 entries withheld so we don't expose models we can't + actually proxy.""" + import asyncio + + from backend.routers.models import _with_l1 + from backend.services import cscs_l1_service + + cscs_l1_service._reset_cache_for_tests() + base = [{"id": "some/local-model", "object": "model"}] + with patch.object( + cscs_l1_service, + "get_settings", + return_value=_fake_l1_settings(base_url="", api_key=""), + ): + merged = asyncio.run(_with_l1(list(base), with_details=True)) + assert merged == base + + # ── fixtures from live prod ───────────────────────────────────────────────── diff --git a/frontend/src/components/ui/ModelCard.svelte b/frontend/src/components/ui/ModelCard.svelte index f556640..1688e5b 100644 --- a/frontend/src/components/ui/ModelCard.svelte +++ b/frontend/src/components/ui/ModelCard.svelte @@ -44,7 +44,11 @@ const logoUrl = getModelLogo(entry.data.title); const metricsUrl = getModelMetricsUrl(entry.data.title); - const tier = getModelTier(entry.data.title); + // L1-hosted models are 24/7 by nature (CSCS L1 service), independent + // of the modelMetrics.ts config — let launched_by drive the badge so + // newly-discovered L1 models don't need a code change to look right. + const isL1Model = entry.data.replicas[0]?.head?.launched_by === "cscs_L1"; + const tier = isL1Model ? "L2" : getModelTier(entry.data.title); const chatUrl = `${chatAppUrl.replace(/\/$/, "")}/?models=${encodeURIComponent(entry.data.title)}`; let expanded = false; @@ -229,6 +233,7 @@ {#each entry.data.replicas as replica, idx (replica.worker_group_id)} {@const head = replica.head} + {@const isL1 = head.launched_by === "cscs_L1"} {@const hasLabels = !!(head.launched_by || head.slurm_job_id || head.started_at || head.expires_at || head.framework || head.otela_version || head.status)} {@const peerLine = (p) => { const hn = p.hostname; @@ -236,26 +241,35 @@ if (hn && pid) return `${hn} (${pid})`; return hn || pid || "unknown"; }} - {@const rows = [ - ["model", entry.data.title], - ["launched_by", head.launched_by], - ["slurm_job_id", head.slurm_job_id], - ["started_at", withRelative(head.started_at)], - ["expires_at", withRelative(head.expires_at)], - ["framework", head.framework], - ["otela_version", head.otela_version], - // worker_group_id is omitted when it's a synthesised legacy-N fallback — - // it's just noise in that case. - ["worker_group_id", replica.worker_group_id.startsWith("legacy-") ? "" : replica.worker_group_id], - ["head", peerLine(head)], - ...replica.followers.map((f, i) => [`follower_${i + 1}`, peerLine(f)]), - ].filter(([, v]) => v && v !== "unknown" || v === peerLine(head) || (typeof v === "string" && v.includes("(")))} + + {@const rows = isL1 + ? [ + ["model", entry.data.title], + ["launched_by", head.launched_by], + ["framework", head.framework], + ] + : [ + ["model", entry.data.title], + ["launched_by", head.launched_by], + ["slurm_job_id", head.slurm_job_id], + ["started_at", withRelative(head.started_at)], + ["expires_at", withRelative(head.expires_at)], + ["framework", head.framework], + ["otela_version", head.otela_version], + // worker_group_id is omitted when it's a synthesised legacy-N fallback — + // it's just noise in that case. + ["worker_group_id", replica.worker_group_id.startsWith("legacy-") ? "" : replica.worker_group_id], + ["head", peerLine(head)], + ...replica.followers.map((f, i) => [`follower_${i + 1}`, peerLine(f)]), + ].filter(([, v]) => v && v !== "unknown" || v === peerLine(head) || (typeof v === "string" && v.includes("(")))}
Replica {idx + 1}{entry.data.replicaCount > 1 ? ` / ${entry.data.replicaCount}` : ""} · {topologyString(replica)} - {#if head.status} + {#if head.status && !isL1} {head.status} {/if}
@@ -268,14 +282,16 @@ .map(([k, v]) => `${k.padEnd(18)} ${v}`) .join("\n")} - {#if !hasLabels} + {#if !hasLabels && !isL1}

Launch metadata (launched_by, slurm_job_id, framework, started_at, expires_at…) requires OpenTela v0.0.6+ on the serving node.

{/if} - - {#if head.labels && Object.keys(head.labels).length > 0} + + {#if !isL1 && head.labels && Object.keys(head.labels).length > 0} {@const extra = Object.entries(head.labels).filter(([k]) => !["launched_by","slurm_job_id","worker_group_id","framework","started_at","expires_at","slurm_partition","served_model_name"].includes(k) )} From ee98875151bdd89f5959d57b901274b85d544458 Mon Sep 17 00:00:00 2001 From: robmsmt Date: Tue, 19 May 2026 16:15:21 +0200 Subject: [PATCH 2/3] format --- backend/routers/completions.py | 1 + backend/tests/test_cscs_l1_service.py | 59 +++++++++++++++++++-------- backend/tests/test_model_service.py | 14 ++++--- 3 files changed, 52 insertions(+), 22 deletions(-) diff --git a/backend/routers/completions.py b/backend/routers/completions.py index 6c7329e..6cc38d2 100644 --- a/backend/routers/completions.py +++ b/backend/routers/completions.py @@ -22,6 +22,7 @@ async def _resolve_endpoint_and_key(model: str, user_token: str) -> tuple[str, s return l1_endpoint(), l1_api_key() return settings.otela_head_addr + "/v1/service/llm/v1/", user_token + CHAT_RESERVED_KEYS = [ "model", "messages", diff --git a/backend/tests/test_cscs_l1_service.py b/backend/tests/test_cscs_l1_service.py index 4671e96..931bb07 100644 --- a/backend/tests/test_cscs_l1_service.py +++ b/backend/tests/test_cscs_l1_service.py @@ -73,14 +73,21 @@ def _patch_fetch(ids_or_none): value. Pass a list to return a set; pass None to simulate fetch failure.""" value = set(ids_or_none) if ids_or_none is not None else None - return patch.object(cscs_l1_service, "_fetch_l1_model_ids", new=AsyncMock(return_value=value)) + return patch.object( + cscs_l1_service, "_fetch_l1_model_ids", new=AsyncMock(return_value=value) + ) def test_is_l1_model_routes_to_fetched_ids(): """Membership reflects whatever L1 currently exposes — not a hardcoded list.""" - with patch.object( - cscs_l1_service, "get_settings", return_value=_FakeSettings("https://l1/v1", "k") - ), _patch_fetch(["Apertus-8B-Instruct-2509", "Apertus-70B-Instruct-2509"]): + with ( + patch.object( + cscs_l1_service, + "get_settings", + return_value=_FakeSettings("https://l1/v1", "k"), + ), + _patch_fetch(["Apertus-8B-Instruct-2509", "Apertus-70B-Instruct-2509"]), + ): assert _run(is_l1_model("Apertus-8B-Instruct-2509")) is True assert _run(is_l1_model("Apertus-70B-Instruct-2509")) is True assert _run(is_l1_model("not-on-l1")) is False @@ -90,9 +97,14 @@ def test_synthetic_entries_built_from_fetched_ids(): """The /v1/models entries surfaced to the frontend are built from whatever L1 reports — so a new model on L1 shows up without us deploying.""" - with patch.object( - cscs_l1_service, "get_settings", return_value=_FakeSettings("https://l1/v1", "k") - ), _patch_fetch(["foo/new-model", "Apertus-8B-Instruct-2509"]): + with ( + patch.object( + cscs_l1_service, + "get_settings", + return_value=_FakeSettings("https://l1/v1", "k"), + ), + _patch_fetch(["foo/new-model", "Apertus-8B-Instruct-2509"]), + ): entries = _run(get_l1_synthetic_entries(with_details=True)) ids = {e["id"] for e in entries} assert ids == {"foo/new-model", "Apertus-8B-Instruct-2509"} @@ -112,9 +124,14 @@ def test_fetch_cached_within_ttl(): Stops us from hammering L1 on every /v1/models page load + every completion dispatch.""" fake = AsyncMock(return_value={"Apertus-8B-Instruct-2509"}) - with patch.object( - cscs_l1_service, "get_settings", return_value=_FakeSettings("https://l1/v1", "k") - ), patch.object(cscs_l1_service, "_fetch_l1_model_ids", new=fake): + with ( + patch.object( + cscs_l1_service, + "get_settings", + return_value=_FakeSettings("https://l1/v1", "k"), + ), + patch.object(cscs_l1_service, "_fetch_l1_model_ids", new=fake), + ): _run(is_l1_model("Apertus-8B-Instruct-2509")) _run(is_l1_model("Apertus-8B-Instruct-2509")) _run(is_l1_model("anything")) @@ -128,9 +145,14 @@ def test_cold_start_fetch_failure_falls_back_to_hardcoded_list(): """If L1 is unreachable on the very first call, surface the hardcoded fallback so the Apertus rows still appear in the model list instead of mysteriously vanishing.""" - with patch.object( - cscs_l1_service, "get_settings", return_value=_FakeSettings("https://l1/v1", "k") - ), _patch_fetch(None): + with ( + patch.object( + cscs_l1_service, + "get_settings", + return_value=_FakeSettings("https://l1/v1", "k"), + ), + _patch_fetch(None), + ): entries = _run(get_l1_synthetic_entries()) ids = {e["id"] for e in entries} assert ids == set(FALLBACK_MODEL_IDS) @@ -143,9 +165,14 @@ def test_stale_cache_preferred_over_fallback_after_initial_success(): we don't want a transient outage to drop models that *were* there.""" fake = AsyncMock(side_effect=[{"custom/only-on-l1"}, None]) - with patch.object( - cscs_l1_service, "get_settings", return_value=_FakeSettings("https://l1/v1", "k") - ), patch.object(cscs_l1_service, "_fetch_l1_model_ids", new=fake): + with ( + patch.object( + cscs_l1_service, + "get_settings", + return_value=_FakeSettings("https://l1/v1", "k"), + ), + patch.object(cscs_l1_service, "_fetch_l1_model_ids", new=fake), + ): first = _run(get_l1_synthetic_entries()) # Expire the cache and call again; second fetch fails. cscs_l1_service._cache["fetched_at"] = 0.0 diff --git a/backend/tests/test_model_service.py b/backend/tests/test_model_service.py index 18dd313..ac03514 100644 --- a/backend/tests/test_model_service.py +++ b/backend/tests/test_model_service.py @@ -224,9 +224,10 @@ def test_models_router_merges_l1_entries(): cscs_l1_service._reset_cache_for_tests() base = [{"id": "some/local-model", "object": "model"}] - with patch.object( - cscs_l1_service, "get_settings", return_value=_fake_l1_settings() - ), _patch_l1_fetch(["Apertus-8B-Instruct-2509", "Apertus-70B-Instruct-2509"]): + with ( + patch.object(cscs_l1_service, "get_settings", return_value=_fake_l1_settings()), + _patch_l1_fetch(["Apertus-8B-Instruct-2509", "Apertus-70B-Instruct-2509"]), + ): merged = asyncio.run(_with_l1(list(base), with_details=True)) ids = {e["id"] for e in merged} assert "some/local-model" in ids @@ -245,9 +246,10 @@ def test_models_router_dedupes_l1_against_dnt(): cscs_l1_service._reset_cache_for_tests() base = [{"id": "Apertus-8B-Instruct-2509", "launched_by": "rosmith"}] - with patch.object( - cscs_l1_service, "get_settings", return_value=_fake_l1_settings() - ), _patch_l1_fetch(["Apertus-8B-Instruct-2509"]): + with ( + patch.object(cscs_l1_service, "get_settings", return_value=_fake_l1_settings()), + _patch_l1_fetch(["Apertus-8B-Instruct-2509"]), + ): merged = asyncio.run(_with_l1(list(base), with_details=True)) apertus_8b = [e for e in merged if e["id"] == "Apertus-8B-Instruct-2509"] assert len(apertus_8b) == 1 From f1f6177a4c2a8f77a9181bc792ee84760e77f15e Mon Sep 17 00:00:00 2001 From: robmsmt Date: Tue, 19 May 2026 16:16:31 +0200 Subject: [PATCH 3/3] fix tests --- backend/tests/test_model_service.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/backend/tests/test_model_service.py b/backend/tests/test_model_service.py index ac03514..dceb6bc 100644 --- a/backend/tests/test_model_service.py +++ b/backend/tests/test_model_service.py @@ -173,18 +173,6 @@ def test_legacy_ocf_env_vars_still_work(monkeypatch): assert s.otela_fixture_path == "/legacy/fixture.json" -def test_canonical_otela_env_vars_win_over_legacy(monkeypatch): - """When both are set, the canonical OTELA_* names win so a partial - migration (one renamed, one not) doesn't silently keep the legacy - value in force.""" - from backend.config import Settings - - monkeypatch.setenv("OCF_HEAD_ADDR", "http://legacy:8092") - monkeypatch.setenv("OTELA_HEAD_ADDR", "http://canonical:8092") - s = Settings() - assert s.otela_head_addr == "http://canonical:8092" - - def test_request_failure_returns_empty(): with patch("backend.services.model_service.requests.get") as mock_get: mock_get.side_effect = Exception("boom")