Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 212 additions & 6 deletions app/core/balancer/logic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
import random
import time
from dataclasses import dataclass
Expand Down Expand Up @@ -27,7 +28,12 @@

SECONDS_PER_DAY = 60 * 60 * 24
UNKNOWN_RESET_BUCKET_DAYS = 10_000
RoutingStrategy = Literal["usage_weighted", "round_robin", "capacity_weighted"]
UNKNOWN_RESET_FALLBACK_SECONDS = 7 * SECONDS_PER_DAY
RELATIVE_AVAILABILITY_MIN_DIVISOR_SECONDS = 5 * 60
RELATIVE_AVAILABILITY_MIN_WEIGHT_FRACTION = 0.1
DEFAULT_RELATIVE_AVAILABILITY_POWER = 2.0
DEFAULT_RELATIVE_AVAILABILITY_TOP_K = 5
RoutingStrategy = Literal["usage_weighted", "round_robin", "capacity_weighted", "relative_availability"]
UNKNOWN_PLAN_FALLBACK = "free"
CAPACITY_PLAN_ALIASES = {
"education": "edu",
Expand All @@ -50,6 +56,12 @@
PROBE_QUIET_SECONDS = 60.0
PROBE_SUCCESS_STREAK_REQUIRED = 3

logger = logging.getLogger(__name__)

_RELATIVE_AVAILABILITY_LOG_PREFIX_CANDIDATE = "Relative availability candidate "
_RELATIVE_AVAILABILITY_LOG_PREFIX_TOP_K = "Relative availability top-k "
_RELATIVE_AVAILABILITY_LOG_PREFIX_WINNER = "Relative availability winner "


@dataclass
class AccountState:
Expand All @@ -64,6 +76,7 @@ class AccountState:
last_error_at: float | None = None
last_selected_at: float | None = None
error_count: int = 0
email: str | None = None
deactivation_reason: str | None = None
plan_type: str | None = None
capacity_credits: float | None = None
Expand Down Expand Up @@ -118,6 +131,8 @@ def select_account(
routing_strategy: RoutingStrategy = "capacity_weighted",
allow_backoff_fallback: bool = True,
deterministic_probe: bool = False,
relative_availability_power: float = DEFAULT_RELATIVE_AVAILABILITY_POWER,
relative_availability_top_k: int = DEFAULT_RELATIVE_AVAILABILITY_TOP_K,
primary_first_usage_weighted: bool = False,
) -> SelectionResult:
"""Select an eligible account by applying availability checks and routing strategy.
Expand All @@ -134,13 +149,17 @@ def select_account(
prefer_earlier_reset: Whether to bias selection toward accounts whose
secondary quota window resets sooner.
routing_strategy: Balancing strategy used to pick from the effective
pool (``"capacity_weighted"``, ``"round_robin"``, or
``"usage_weighted"``).
pool (``"capacity_weighted"``, ``"round_robin"``,
``"relative_availability"``, or ``"usage_weighted"``).
allow_backoff_fallback: Whether to allow a fallback attempt with the
backoff account nearest to recovery when no fully available
account exists.
deterministic_probe: Whether capacity-weighted routing should use a
deterministic_probe: Whether weighted strategies should use a
deterministic probe order instead of random weighted choice.
relative_availability_power: Exponent applied to normalized relative
availability weights.
relative_availability_top_k: Maximum number of highest-weight
relative-availability candidates retained before weighted draw.
primary_first_usage_weighted: Whether usage-weighted routing should
rank by primary-window pressure before secondary-window pressure.

Expand Down Expand Up @@ -253,22 +272,36 @@ def _round_robin_sort_key(state: AccountState) -> tuple[float, str]:
probing = [s for s in available if s.health_tier == HEALTH_TIER_PROBING]
draining = [s for s in available if s.health_tier == HEALTH_TIER_DRAINING]
effective_pool = healthy or probing or draining or available
effective_prefer_earlier_reset = prefer_earlier_reset and routing_strategy != "relative_availability"

if routing_strategy == "round_robin":
selected = min(effective_pool, key=_round_robin_sort_key)
elif routing_strategy == "capacity_weighted":
candidate_pool = (
_prefer_earlier_reset_candidates(effective_pool, current) if prefer_earlier_reset else effective_pool
_prefer_earlier_reset_candidates(effective_pool, current)
if effective_prefer_earlier_reset
else effective_pool
)
if deterministic_probe:
selected = min(candidate_pool, key=_capacity_probe_sort_key)
else:
selected = _select_capacity_weighted(candidate_pool)
elif routing_strategy == "relative_availability":
selected = _select_relative_availability(
effective_pool,
current=current,
power=relative_availability_power,
top_k=relative_availability_top_k,
deterministic_probe=deterministic_probe,
)
else:
if primary_first_usage_weighted:
selected = min(effective_pool, key=_primary_usage_sort_key)
else:
selected = min(effective_pool, key=_reset_first_sort_key if prefer_earlier_reset else _usage_sort_key)
selected = min(
effective_pool,
key=_reset_first_sort_key if effective_prefer_earlier_reset else _usage_sort_key,
)
return SelectionResult(selected, None)


Expand All @@ -293,6 +326,179 @@ def _capacity_probe_sort_key(state: AccountState) -> tuple[float, float, float,
return (-_remaining_secondary_credits(state), secondary_used, primary_used, last_selected, account_id)


def _relative_availability_divisor_seconds(state: AccountState, current: float) -> float:
if state.secondary_reset_at is None:
remaining_seconds = float(UNKNOWN_RESET_FALLBACK_SECONDS)
else:
remaining_seconds = max(0.0, float(state.secondary_reset_at) - current)
return max(remaining_seconds, float(RELATIVE_AVAILABILITY_MIN_DIVISOR_SECONDS))


def _relative_availability_remaining_seconds(state: AccountState, current: float) -> float:
if state.secondary_reset_at is None:
return float(UNKNOWN_RESET_FALLBACK_SECONDS)
return max(0.0, float(state.secondary_reset_at) - current)


def _relative_availability_raw_score(state: AccountState, current: float) -> float:
remaining_credits = _remaining_secondary_credits(state)
if remaining_credits <= 0.0:
return 0.0
return remaining_credits / _relative_availability_divisor_seconds(state, current)


def _relative_availability_label(state: AccountState) -> str:
return state.email or state.account_id

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Stop emitting account emails in routing logs

Using state.email as the primary routing label causes INFO diagnostics to log raw email addresses for candidate scoring, top-k selection, and winner selection. In deployments that ship logs off-host, this introduces unnecessary PII exposure compared with using stable internal account IDs (or a redacted/hash form), and the change affects every relative-availability routing decision.

Useful? React with 👍 / 👎.



def _relative_availability_score_per_minute(raw_score: float) -> float:
return raw_score * 60.0


def _log_relative_availability_candidate_scores(
raw_scores: list[tuple[AccountState, float]],
*,
current: float,
) -> None:
for state, raw_score in raw_scores:
remaining_seconds = _relative_availability_remaining_seconds(state, current)
logger.debug(
(
f"{_RELATIVE_AVAILABILITY_LOG_PREFIX_CANDIDATE}account=%s "
"remaining_credits=%.2f remaining_minutes=%.2f score_per_minute=%.6f"
),
_relative_availability_label(state),
_remaining_secondary_credits(state),
remaining_seconds / 60.0,
_relative_availability_score_per_minute(raw_score),
)


def _log_relative_availability_top_k(
weighted_candidates: list[tuple[AccountState, float, float]],
*,
current: float,
) -> None:
formatted_candidates = ", ".join(
(
f"account={_relative_availability_label(state)} "
f"remaining_credits={_remaining_secondary_credits(state):.2f} "
f"remaining_minutes={_relative_availability_remaining_seconds(state, current) / 60.0:.2f} "
f"score_per_minute={_relative_availability_score_per_minute(raw_score):.6f} "
f"weight={weight:.8f}"
)
for state, weight, raw_score in weighted_candidates
)
logger.info("%s%s", _RELATIVE_AVAILABILITY_LOG_PREFIX_TOP_K, formatted_candidates)


def _relative_availability_weighted_candidates(
available: list[AccountState],
*,
current: float,
power: float,
top_k: int,
) -> list[tuple[AccountState, float, float]]:
raw_scores = [(state, _relative_availability_raw_score(state, current)) for state in available]
_log_relative_availability_candidate_scores(raw_scores, current=current)
best_raw_score = max((score for _, score in raw_scores), default=0.0)
if best_raw_score <= 0.0:
return []

weighted: list[tuple[AccountState, float, float]] = []
safe_power = power if power > 0.0 else DEFAULT_RELATIVE_AVAILABILITY_POWER
for state, raw_score in raw_scores:
normalized_score = raw_score / best_raw_score
weight = normalized_score**safe_power
if weight < RELATIVE_AVAILABILITY_MIN_WEIGHT_FRACTION:
continue
weighted.append((state, weight, raw_score))

if not weighted:
return []

weighted.sort(
key=lambda item: (
-item[1],
-item[2],
*_usage_sort_key(item[0]),
)
)
safe_top_k = max(1, top_k)
top_candidates = weighted[:safe_top_k]
_log_relative_availability_top_k(top_candidates, current=current)
return top_candidates



def _log_relative_availability_winner(
winner: AccountState,
*,
current: float,
weight: float | None,
raw_score: float,
) -> None:
remaining_seconds = _relative_availability_remaining_seconds(winner, current)
logger.info(
(
f"{_RELATIVE_AVAILABILITY_LOG_PREFIX_WINNER}account=%s "
"remaining_credits=%.2f remaining_minutes=%.2f score_per_minute=%.6f weight=%s"
),
_relative_availability_label(winner),
_remaining_secondary_credits(winner),
remaining_seconds / 60.0,
_relative_availability_score_per_minute(raw_score),
f"{weight:.8f}" if weight is not None else "fallback",
)


def _select_relative_availability(
available: list[AccountState],
*,
current: float,
power: float,
top_k: int,
deterministic_probe: bool,
) -> AccountState:
weighted_candidates = _relative_availability_weighted_candidates(
available,
current=current,
power=power,
top_k=top_k,
)
if not weighted_candidates:
winner = min(available, key=_usage_sort_key)
_log_relative_availability_winner(
winner,
current=current,
weight=None,
raw_score=_relative_availability_raw_score(winner, current),
)
return winner
if deterministic_probe:
winner, weight, raw_score = weighted_candidates[0]
_log_relative_availability_winner(winner, current=current, weight=weight, raw_score=raw_score)
return winner
states = [state for state, _, _ in weighted_candidates]
weights = [weight for _, weight, _ in weighted_candidates]
total = sum(weights)
if total <= 0.0:
winner = min(available, key=_usage_sort_key)
_log_relative_availability_winner(
winner,
current=current,
weight=None,
raw_score=_relative_availability_raw_score(winner, current),
)
return winner
winner = random.choices(states, weights=weights, k=1)[0]
for state, weight, raw_score in weighted_candidates:
if state.account_id == winner.account_id:
_log_relative_availability_winner(winner, current=current, weight=weight, raw_score=raw_score)
break
return winner


def _select_capacity_weighted(available: list[AccountState]) -> AccountState:
"""Select an account with probability proportional to remaining secondary credits."""
weights = [_remaining_secondary_credits(s) for s in available]
Expand Down
10 changes: 10 additions & 0 deletions app/core/runtime_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def build_log_config() -> LogConfig:

config = copy.deepcopy(LOGGING_CONFIG)
formatters = config.setdefault("formatters", {})
handlers = config.setdefault("handlers", {})
settings = get_settings()

if settings.log_format == "json":
Expand All @@ -139,6 +140,15 @@ def build_log_config() -> LogConfig:
"datefmt": "%Y-%m-%dT%H:%M:%SZ",
"use_colors": None,
}

# Uvicorn's stock config only wires uvicorn.* loggers. Attach the same
# default handler to the root logger so application loggers such as
# app.core.balancer.logic surface in docker logs at INFO.
handlers.setdefault("default", {"class": "logging.StreamHandler", "formatter": "default", "stream": "ext://sys.stderr"})
config["root"] = {
"handlers": ["default"],
"level": "INFO",
}
return cast(LogConfig, config)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""add relative availability routing settings to dashboard_settings

Revision ID: 20260426_000000_add_dashboard_relative_availability_settings
Revises: 20260423_120000_add_api_key_limit_reset_at_index
Create Date: 2026-04-26
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy.engine import Connection

revision = "20260426_000000_add_dashboard_relative_availability_settings"
down_revision = "20260423_120000_add_api_key_limit_reset_at_index"
branch_labels = None
depends_on = None


def _columns(connection: Connection, table_name: str) -> set[str]:
inspector = sa.inspect(connection)
if not inspector.has_table(table_name):
return set()
return {str(column["name"]) for column in inspector.get_columns(table_name) if column.get("name") is not None}


def upgrade() -> None:
bind = op.get_bind()
columns = _columns(bind, "dashboard_settings")
if not columns:
return

if "relative_availability_power" not in columns:
with op.batch_alter_table("dashboard_settings") as batch_op:
batch_op.add_column(
sa.Column(
"relative_availability_power",
sa.Float(),
nullable=False,
server_default="2.0",
)
)

if "relative_availability_top_k" not in columns:
with op.batch_alter_table("dashboard_settings") as batch_op:
batch_op.add_column(
sa.Column(
"relative_availability_top_k",
sa.Integer(),
nullable=False,
server_default="5",
)
)


def downgrade() -> None:
bind = op.get_bind()
columns = _columns(bind, "dashboard_settings")
if not columns:
return

if "relative_availability_top_k" in columns:
with op.batch_alter_table("dashboard_settings") as batch_op:
batch_op.drop_column("relative_availability_top_k")

if "relative_availability_power" in columns:
with op.batch_alter_table("dashboard_settings") as batch_op:
batch_op.drop_column("relative_availability_power")
Loading