Skip to content

Commit 91b3c6f

Browse files
committed
feat(BA-5837): add ValkeyCache source for AppConfigFragment merged-view reads
Re-introduces a Valkey-backed cache layer for the merged AppConfig view, replacing the cache source that was dropped together with the legacy `app_configs` table in #11265 (BA-5822). The hot path — `AppConfigFragmentRepository.app_config(user_id, name)` — is the WebUI bootstrap loop (BEP-1052 §6) and was previously a DB hit per request. Mirrors the legacy `AppConfigCacheSource` shape on top of the `(user, name)`-keyed merged view: - `cache_source/cache_source.py`: - `get_merged_config(user_id, name)` cache-aside read keyed by `app_config:merged:{user_id}:{name}` - `set_merged_config(merged, domain_name=...)` writes the deep-merged payload + indexes the cache key in `app_config:user_keys:{user_id}` and the user in `app_config:domain_users:{domain_name}` so invalidation runs without `SCAN` - `invalidate_for_scope(scope_type, scope_id)`: - `USER`: drop the user's per-name set - `DOMAIN` / `DOMAIN_USER_DEFAULTS`: cascade through every member of the domain user set - `PUBLIC`: rely on TTL (broad invalidation TBD) - `invalidate_for_user(user_id)` convenience for self-service writes - `db_source.user_domain_name(user_id)` single-column lookup so the cache layer can tag merged-view entries with their owning domain - `AppConfigFragmentRepository.app_config(...)` is now cache-aside (cache hit returns the cached payload + fresh DB fragments; cache miss writes through) - `AppConfigFragmentAdminRepository.{create,update,purge}` invalidate the affected scope after the DB write - `repositories.py` builds the cache source from `args.valkey_stat_client` and threads it into both repos - All cache calls pass through `suppress_with_log` — cache failures log a warning and fall through to the DB
1 parent cf5ae13 commit 91b3c6f

7 files changed

Lines changed: 296 additions & 10 deletions

File tree

changes/11297.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Front the `AppConfigFragment` merged-view repository with a Valkey cache. Read path is cache-aside (`get_merged_config(user_id, name)` → DB fallback → cache write); admin / self-service bulk writes invalidate per-user or per-domain via membership indexes (no `SCAN`). Cache failures fall through to the DB (BEP-1052 §5).

src/ai/backend/manager/repositories/app_config_fragment/admin_repository.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
)
1717
from ai.backend.manager.models.app_config_fragment.row import AppConfigFragmentRow
1818
from ai.backend.manager.models.utils import ExtendedAsyncSAEngine
19+
from ai.backend.manager.repositories.app_config_fragment.cache_source import (
20+
AppConfigFragmentCacheSource,
21+
)
1922
from ai.backend.manager.repositories.app_config_fragment.creators import (
2023
AppConfigFragmentCreatorSpec,
2124
)
@@ -64,9 +67,15 @@ class AppConfigFragmentAdminRepository:
6467
"""
6568

6669
_db_source: AppConfigFragmentDBSource
70+
_cache_source: AppConfigFragmentCacheSource | None
6771

68-
def __init__(self, db: ExtendedAsyncSAEngine) -> None:
72+
def __init__(
73+
self,
74+
db: ExtendedAsyncSAEngine,
75+
cache_source: AppConfigFragmentCacheSource | None = None,
76+
) -> None:
6977
self._db_source = AppConfigFragmentDBSource(db)
78+
self._cache_source = cache_source
7079

7180
# ── Mutations ─────────────────────────────────────────────────
7281

@@ -84,7 +93,9 @@ async def create(
8493
extra_config=extra_config,
8594
),
8695
)
87-
return await self._db_source.create(creator)
96+
result = await self._db_source.create(creator)
97+
await self._invalidate(key)
98+
return result
8899

89100
@app_config_fragment_admin_repository_resilience.apply()
90101
async def update(
@@ -95,11 +106,21 @@ async def update(
95106
"""Update a fragment by natural key. Raises
96107
``AppConfigFragmentNotFound`` when missing."""
97108
spec = AppConfigFragmentUpdaterSpec(extra_config=extra_config)
98-
return await self._db_source.update(key, spec)
109+
result = await self._db_source.update(key, spec)
110+
await self._invalidate(key)
111+
return result
99112

100113
@app_config_fragment_admin_repository_resilience.apply()
101114
async def purge(self, key: AppConfigFragmentKey) -> bool:
102-
return await self._db_source.purge(key)
115+
result = await self._db_source.purge(key)
116+
if result:
117+
await self._invalidate(key)
118+
return result
119+
120+
async def _invalidate(self, key: AppConfigFragmentKey) -> None:
121+
if self._cache_source is None:
122+
return
123+
await self._cache_source.invalidate_for_scope(key.scope_type, key.scope_id)
103124

104125
# ── Cross-scope reads ────────────────────────────────────────
105126

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .cache_source import AppConfigFragmentCacheSource
2+
3+
__all__ = ("AppConfigFragmentCacheSource",)
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
"""Cache source for AppConfigFragment merged-view reads.
2+
3+
The hot path is `AppConfigFragmentRepository.app_config(user_id, name)`,
4+
the per-`(user, name)` deep-merge of every contributing fragment in
5+
the policy's `scope_sources` chain (BEP-1052 §5). Each merged value
6+
gets cached under `app_config:merged:{user_id}:{name}` with a TTL,
7+
and is invalidated whenever a contributing fragment changes.
8+
9+
Membership indexes let invalidation work without `SCAN`:
10+
11+
- ``app_config:user_keys:{user_id}`` — set of `(user_id, name)` cache
12+
keys this user currently has cached. Per-user invalidation pops the
13+
set in one `SMEMBERS` + `DEL`.
14+
- ``app_config:domain_users:{domain_name}`` — set of `user_id`s
15+
currently observed in this domain. Per-domain invalidation expands
16+
to a per-user invalidation for each member.
17+
18+
Cache failures never break a request — every public method is wrapped
19+
in `suppress_with_log` so any Valkey error falls through to the DB.
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import logging
25+
import uuid
26+
from collections.abc import Mapping
27+
from typing import Any, cast
28+
29+
from glide import Batch, ExpirySet, ExpiryType
30+
31+
from ai.backend.common.json import dump_json, load_json
32+
from ai.backend.logging.utils import BraceStyleAdapter
33+
from ai.backend.manager.clients.valkey_client.valkey_cache import ValkeyCache
34+
from ai.backend.manager.data.app_config.types import AppConfigData
35+
from ai.backend.manager.data.app_config_fragment.types import AppConfigScopeType
36+
from ai.backend.manager.repositories.utils import suppress_with_log
37+
38+
log = BraceStyleAdapter(logging.getLogger(__spec__.name))
39+
40+
41+
class AppConfigFragmentCacheSource:
42+
"""Valkey-backed cache for the per-`(user, name)` merged AppConfig view."""
43+
44+
_valkey_cache: ValkeyCache
45+
_cache_ttl: int
46+
47+
def __init__(self, valkey_cache: ValkeyCache, cache_ttl: int = 600) -> None:
48+
"""Args:
49+
valkey_cache: Manager-side Valkey cache client.
50+
cache_ttl: TTL in seconds (default: 10 minutes).
51+
"""
52+
self._valkey_cache = valkey_cache
53+
self._cache_ttl = cache_ttl
54+
55+
# ── Key derivation ─────────────────────────────────────────────
56+
57+
@staticmethod
58+
def _merged_key(user_id: uuid.UUID | str, name: str) -> str:
59+
return f"app_config:merged:{user_id}:{name}"
60+
61+
@staticmethod
62+
def _user_keys_set(user_id: uuid.UUID | str) -> str:
63+
return f"app_config:user_keys:{user_id}"
64+
65+
@staticmethod
66+
def _domain_users_set(domain_name: str) -> str:
67+
return f"app_config:domain_users:{domain_name}"
68+
69+
# ── Read ───────────────────────────────────────────────────────
70+
71+
async def get_merged_config(
72+
self,
73+
user_id: uuid.UUID,
74+
name: str,
75+
) -> Mapping[str, Any] | None:
76+
"""Return the cached `config` payload, or `None` on miss / failure.
77+
78+
Only the merged `config` mapping is cached — `fragments` are
79+
cheap to recompute when needed and would bloat the cache.
80+
"""
81+
with suppress_with_log([Exception], "Failed to read merged config from cache"):
82+
cache_key = self._merged_key(user_id, name)
83+
async with self._valkey_cache.client() as conn:
84+
cached_value = await conn.get(cache_key)
85+
if cached_value:
86+
log.debug("Cache hit for merged config: {} {}", user_id, name)
87+
return cast(Mapping[str, Any] | None, load_json(cached_value))
88+
log.debug("Cache miss for merged config: {} {}", user_id, name)
89+
return None
90+
91+
# ── Write ──────────────────────────────────────────────────────
92+
93+
async def set_merged_config(
94+
self,
95+
merged: AppConfigData,
96+
domain_name: str | None = None,
97+
) -> None:
98+
"""Cache the merged `config` payload + index it for invalidation.
99+
100+
Indexes the cache key in the user's key set; if `domain_name`
101+
is supplied, also adds the user to the domain's user set so
102+
domain-level invalidation can cascade.
103+
"""
104+
if merged.config is None:
105+
# Nothing useful to cache — leave as miss-on-next-read.
106+
return
107+
with suppress_with_log([Exception], "Failed to write merged config to cache"):
108+
user_id = str(merged.user_id)
109+
cache_key = self._merged_key(user_id, merged.name)
110+
111+
batch = Batch(is_atomic=False)
112+
batch.set(
113+
cache_key,
114+
dump_json(merged.config),
115+
expiry=ExpirySet(ExpiryType.SEC, self._cache_ttl),
116+
)
117+
user_keys = self._user_keys_set(user_id)
118+
batch.sadd(user_keys, [cache_key])
119+
batch.expire(user_keys, self._cache_ttl)
120+
if domain_name is not None:
121+
domain_users = self._domain_users_set(domain_name)
122+
batch.sadd(domain_users, [user_id])
123+
batch.expire(domain_users, self._cache_ttl)
124+
125+
async with self._valkey_cache.client() as conn:
126+
await conn.exec(batch, raise_on_error=True)
127+
128+
log.trace(
129+
"Cached merged config for user {} name {} (domain={})",
130+
user_id,
131+
merged.name,
132+
domain_name,
133+
)
134+
135+
# ── Invalidate ────────────────────────────────────────────────
136+
137+
async def invalidate_for_scope(
138+
self,
139+
scope_type: AppConfigScopeType,
140+
scope_id: str,
141+
) -> None:
142+
"""Invalidate every cached merged view affected by a fragment write.
143+
144+
`scope_id` is the user UUID for `USER`, the domain name for
145+
`DOMAIN` / `DOMAIN_USER_DEFAULTS`, or the literal `"public"`
146+
for `PUBLIC`. `PUBLIC` invalidation is intentionally not
147+
wired here — its blast radius is the whole cache; rely on TTL
148+
for now (admin-only, low-frequency operation).
149+
"""
150+
with suppress_with_log([Exception], "Failed to invalidate merged-config cache"):
151+
match scope_type:
152+
case AppConfigScopeType.USER:
153+
await self._invalidate_user(scope_id)
154+
case AppConfigScopeType.DOMAIN | AppConfigScopeType.DOMAIN_USER_DEFAULTS:
155+
await self._invalidate_domain(scope_id)
156+
case AppConfigScopeType.PUBLIC:
157+
log.debug(
158+
"PUBLIC-scope invalidation not wired; relying on TTL ({}s)",
159+
self._cache_ttl,
160+
)
161+
162+
async def invalidate_for_user(self, user_id: uuid.UUID | str) -> None:
163+
"""Drop every cached merged view owned by `user_id`.
164+
165+
Convenience wrapper used by self-service bulk writes that
166+
always operate on the caller's `USER` row.
167+
"""
168+
with suppress_with_log([Exception], "Failed to invalidate user merged-config cache"):
169+
await self._invalidate_user(str(user_id))
170+
171+
async def _invalidate_user(self, user_id: str) -> None:
172+
user_keys = self._user_keys_set(user_id)
173+
async with self._valkey_cache.client() as conn:
174+
cached_keys = await conn.smembers(user_keys)
175+
if not cached_keys:
176+
log.debug("No cached keys for user {}, skipping invalidation", user_id)
177+
return
178+
keys_to_delete: list[str | bytes] = list(cached_keys)
179+
keys_to_delete.append(user_keys)
180+
async with self._valkey_cache.client() as conn:
181+
removed = await conn.delete(keys_to_delete)
182+
log.debug("Invalidated {} merged-config keys for user {}", removed, user_id)
183+
184+
async def _invalidate_domain(self, domain_name: str) -> None:
185+
domain_users = self._domain_users_set(domain_name)
186+
async with self._valkey_cache.client() as conn:
187+
user_ids = await conn.smembers(domain_users)
188+
if not user_ids:
189+
log.debug(
190+
"No tracked users for domain {}, skipping cache invalidation",
191+
domain_name,
192+
)
193+
return
194+
for raw in user_ids:
195+
user_id = raw.decode() if isinstance(raw, bytes) else str(raw)
196+
await self._invalidate_user(user_id)
197+
async with self._valkey_cache.client() as conn:
198+
await conn.delete([domain_users])
199+
log.debug(
200+
"Invalidated merged-config caches for {} users in domain {}",
201+
len(user_ids),
202+
domain_name,
203+
)

src/ai/backend/manager/repositories/app_config_fragment/db_source/db_source.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,19 @@ async def get_by_id(self, id: uuid.UUID) -> AppConfigFragmentData | None:
107107
)
108108
return row.to_data() if row is not None else None
109109

110+
@app_config_fragment_db_source_resilience.apply()
111+
async def user_domain_name(self, user_id: uuid.UUID) -> str | None:
112+
"""Single-column lookup of a user's `domain_name`.
113+
114+
Used by the cache layer to tag merged-view entries with their
115+
owning domain so domain-scoped fragment writes can target a
116+
bounded user set during invalidation.
117+
"""
118+
async with self._db.begin_readonly_session() as db_sess:
119+
return await db_sess.scalar(
120+
sa.select(UserRow.domain_name).where(UserRow.uuid == user_id)
121+
)
122+
110123
@app_config_fragment_db_source_resilience.apply()
111124
async def create(self, creator: Creator[AppConfigFragmentRow]) -> AppConfigFragmentData:
112125
"""Insert a new fragment via the shared Creator helper.

src/ai/backend/manager/repositories/app_config_fragment/repositories.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
from dataclasses import dataclass
22
from typing import Self
33

4+
from ai.backend.manager.clients.valkey_client.valkey_cache import ValkeyCache
45
from ai.backend.manager.repositories.app_config_fragment.admin_repository import (
56
AppConfigFragmentAdminRepository,
67
)
8+
from ai.backend.manager.repositories.app_config_fragment.cache_source import (
9+
AppConfigFragmentCacheSource,
10+
)
711
from ai.backend.manager.repositories.app_config_fragment.repository import (
812
AppConfigFragmentRepository,
913
)
@@ -17,7 +21,12 @@ class AppConfigFragmentRepositories:
1721

1822
@classmethod
1923
def create(cls, args: RepositoryArgs) -> Self:
24+
# The merged-view read path is fronted by a Valkey cache; the
25+
# repository falls through to the DB transparently if the
26+
# cache layer fails.
27+
valkey_cache = ValkeyCache(args.valkey_stat_client._client)
28+
cache_source = AppConfigFragmentCacheSource(valkey_cache)
2029
return cls(
21-
repository=AppConfigFragmentRepository(args.db),
22-
admin_repository=AppConfigFragmentAdminRepository(args.db),
30+
repository=AppConfigFragmentRepository(args.db, cache_source=cache_source),
31+
admin_repository=AppConfigFragmentAdminRepository(args.db, cache_source=cache_source),
2332
)

src/ai/backend/manager/repositories/app_config_fragment/repository.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
AppConfigFragmentSearchResult,
1515
)
1616
from ai.backend.manager.models.utils import ExtendedAsyncSAEngine
17+
from ai.backend.manager.repositories.app_config_fragment.cache_source import (
18+
AppConfigFragmentCacheSource,
19+
)
1720
from ai.backend.manager.repositories.app_config_fragment.db_source import (
1821
AppConfigFragmentDBSource,
1922
)
@@ -47,14 +50,23 @@ class AppConfigFragmentRepository:
4750
"""Read-side repository for AppConfigFragment.
4851
4952
Scope-bound reads on raw fragments plus the per-user merged
50-
`AppConfig` view. Mutations and admin cross-scope reads live on
51-
`AppConfigFragmentAdminRepository`.
53+
`AppConfig` view (BEP-1052 §5). Mutations and admin cross-scope
54+
reads live on `AppConfigFragmentAdminRepository`. Retry + metric
55+
policies are applied at the DB-source layer; the merged-view read
56+
path is fronted by a Valkey cache so repeated WebUI bootstrap
57+
queries don't hammer the DB.
5258
"""
5359

5460
_db_source: AppConfigFragmentDBSource
61+
_cache_source: AppConfigFragmentCacheSource | None
5562

56-
def __init__(self, db: ExtendedAsyncSAEngine) -> None:
63+
def __init__(
64+
self,
65+
db: ExtendedAsyncSAEngine,
66+
cache_source: AppConfigFragmentCacheSource | None = None,
67+
) -> None:
5768
self._db_source = AppConfigFragmentDBSource(db)
69+
self._cache_source = cache_source
5870

5971
# ── Raw fragment reads ────────────────────────────────────────
6072

@@ -82,7 +94,31 @@ async def app_config(
8294
user_id: uuid.UUID,
8395
config_name: str,
8496
) -> AppConfigData:
85-
return await self._db_source.get_user_app_config(user_id, config_name)
97+
"""Cache-aside read for the per-`(user, name)` merged view.
98+
99+
Returns a fresh `AppConfigData` whose `config` payload may
100+
come from cache (fragments still resolve from the DB on
101+
demand). Cache failures fall through transparently.
102+
"""
103+
if self._cache_source is not None:
104+
cached_config = await self._cache_source.get_merged_config(user_id, config_name)
105+
if cached_config is not None:
106+
# Re-fetch the fragment list from the DB; only the
107+
# deep-merged `config` payload is cached. Keeps the
108+
# response-shape contract unchanged.
109+
merged = await self._db_source.get_user_app_config(user_id, config_name)
110+
return AppConfigData(
111+
user_id=merged.user_id,
112+
name=merged.name,
113+
fragments=merged.fragments,
114+
config=cached_config,
115+
)
116+
117+
merged = await self._db_source.get_user_app_config(user_id, config_name)
118+
if self._cache_source is not None:
119+
domain_name = await self._db_source.user_domain_name(user_id)
120+
await self._cache_source.set_merged_config(merged, domain_name=domain_name)
121+
return merged
86122

87123
@app_config_fragment_repository_resilience.apply()
88124
async def search_app_configs(

0 commit comments

Comments
 (0)