|
5 | 5 |
|
6 | 6 | from __future__ import annotations |
7 | 7 |
|
| 8 | +import json |
| 9 | +import logging |
8 | 10 | from collections import defaultdict |
| 11 | +from collections.abc import Iterable |
9 | 12 | from dataclasses import dataclass |
10 | 13 | from typing import Generic, TypeVar |
11 | 14 |
|
12 | | -import json |
| 15 | +from rigging.timing import Timestamp |
13 | 16 |
|
14 | | -from iris.cluster.controller.db import ACTIVE_TASK_STATES, QuerySnapshot |
| 17 | +from iris.cluster.controller.db import ACTIVE_TASK_STATES, ControllerDB, QuerySnapshot |
15 | 18 | from iris.cluster.types import JobName |
16 | | -from iris.rpc import job_pb2 |
| 19 | +from iris.rpc import config_pb2, job_pb2 |
| 20 | + |
| 21 | +logger = logging.getLogger(__name__) |
17 | 22 |
|
18 | 23 | T = TypeVar("T") |
19 | 24 |
|
@@ -139,3 +144,59 @@ def interleave_by_user( |
139 | 144 | break |
140 | 145 | round_idx += 1 |
141 | 146 | return result |
| 147 | + |
| 148 | + |
| 149 | +# Bands accepted in user_budgets config entries. UNSPECIFIED is kept out of the |
| 150 | +# set so a missing/zeroed max_band field surfaces as a config error rather than |
| 151 | +# silently granting BATCH; callers must pick a real band. |
| 152 | +_VALID_TIER_BANDS = frozenset( |
| 153 | + ( |
| 154 | + job_pb2.PRIORITY_BAND_PRODUCTION, |
| 155 | + job_pb2.PRIORITY_BAND_INTERACTIVE, |
| 156 | + job_pb2.PRIORITY_BAND_BATCH, |
| 157 | + ) |
| 158 | +) |
| 159 | + |
| 160 | + |
| 161 | +def reconcile_user_budget_tiers( |
| 162 | + db: ControllerDB, |
| 163 | + tiers: Iterable[config_pb2.UserBudgetTier], |
| 164 | + now: Timestamp, |
| 165 | +) -> int: |
| 166 | + """Upsert per-user budgets from cluster config into the user_budgets table. |
| 167 | +
|
| 168 | + Runs at controller startup after auth is resolved. Each tier entry lists |
| 169 | + a set of user_ids that all receive the same budget_limit and max_band. |
| 170 | + Tiers are applied in order, so later tiers override earlier ones for |
| 171 | + users listed in both — lets ops promote a user by appending a later tier |
| 172 | + without editing earlier ones. |
| 173 | +
|
| 174 | + This complements migration 0037 (which fixes prod DBs that already have |
| 175 | + rows) by handling fresh DBs and listed users who haven't submitted yet: |
| 176 | + those users would otherwise land on the :class:`UserBudgetDefaults` row |
| 177 | + created via INSERT OR IGNORE at first submission time. |
| 178 | +
|
| 179 | + Returns the number of (user_id, tier) pairs applied; duplicate user_ids |
| 180 | + across tiers are counted per-apply since the later tier overwrites. |
| 181 | + """ |
| 182 | + count = 0 |
| 183 | + for tier in tiers: |
| 184 | + if tier.max_band not in _VALID_TIER_BANDS: |
| 185 | + raise ValueError( |
| 186 | + f"UserBudgetTier.max_band must be one of PRODUCTION/INTERACTIVE/BATCH; " |
| 187 | + f"got {tier.max_band} for users {list(tier.user_ids)}" |
| 188 | + ) |
| 189 | + for user_id in tier.user_ids: |
| 190 | + if not user_id: |
| 191 | + raise ValueError("UserBudgetTier.user_ids contains an empty entry") |
| 192 | + db.ensure_user(user_id, now) |
| 193 | + db.set_user_budget( |
| 194 | + user_id=user_id, |
| 195 | + budget_limit=tier.budget_limit, |
| 196 | + max_band=tier.max_band, |
| 197 | + now=now, |
| 198 | + ) |
| 199 | + count += 1 |
| 200 | + if count: |
| 201 | + logger.info("Reconciled %d user budget assignment(s) from cluster config", count) |
| 202 | + return count |
0 commit comments