Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ mid-interval failures still trigger a swap.
healthy deprioritized battery instead of waiting for the next timed rotation.
During a probe, the probe timeout is the main "never ramps" control; this
threshold still matters after the probe succeeds and for already-active
batteries. Set to 0 to disable.
batteries. Set to 0 to disable. The saturation EMA is time-weighted, so
batteries with slower powermeters (>10 s update interval) accumulate saturation
faster per sample — if you see unnecessary swaps with a slow powermeter,
raise this value (e.g. to 0.8).
- **SATURATION_DETECTION** (default true) — Track how well each battery follows
its target. When a battery cannot deliver (full or empty), its share is
reduced and redistributed to others.
Expand Down
4 changes: 3 additions & 1 deletion config.ini.example
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ THROTTLE_INTERVAL = 0
## Force-rotate when a battery can't follow its target (e.g. externally limited
## to 0 W or fully discharged). After a probe succeeds, this still swaps out an
## active battery within a few seconds instead of waiting for the full rotation
## interval. 0 disables.
## interval. 0 disables. The saturation EMA is time-weighted, so slow
## powermeters (>10 s updates) accumulate saturation faster per sample; raise
## this value (e.g. 0.8) if you see unnecessary swaps with a slow meter.
#EFFICIENCY_SATURATION_THRESHOLD = 0.4
## How quickly a swapped-out battery becomes eligible again. Applied each cycle
## while the battery has no target. 1.0 = never recover, lower = faster recovery.
Expand Down
110 changes: 101 additions & 9 deletions src/astrameter/ct002/balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@
# ramping up. In that case we bypass the remaining grace window and mark it
# saturated immediately so the balancer can rotate to a healthy unit.
SATURATION_STALL_TIMEOUT_SECONDS = 60.0
# Reference poll interval (seconds) at which the configured ``SATURATION_ALPHA``
# and ``SATURATION_DECAY_FACTOR`` apply one full step. The EMA is time-
# weighted against this reference so that batteries polling at different
# cadences (e.g. V3 at ~0.45 s vs V2 at ~3.1 s) converge to the same
# saturation score under the same physical conditions. Chosen to match
# the ~1 Hz cadence the previous per-sample defaults were implicitly tuned
# against.
SATURATION_REFERENCE_DT = 1.0
# If more than this many seconds pass between saturation updates (e.g. a
# battery drops off the network), treat the next sample as a fresh start
# rather than dosing the EMA with a huge rise or decay step.
SATURATION_LONG_GAP_SECONDS = 30.0


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -97,6 +109,11 @@ class BalancerConsumerState:
saturation_score: float = 0.0
saturation_grace_until: float = 0.0
saturation_grace_started_at: float = 0.0
# Wall-clock timestamp of the most recent saturation EMA step for this
# consumer. 0.0 is a sentinel meaning "no prior update"; it also flags
# the first post-grace sample, so the next update re-seeds instead of
# applying stale dt.
last_saturation_update: float = 0.0


@dataclasses.dataclass
Expand All @@ -119,11 +136,19 @@ class ProbeState:


class SaturationTracker:
"""EMA-based actuator saturation detector with grace periods.
"""Time-weighted EMA saturation detector with grace periods.

A saturation score of 1.0 means the actuator cannot follow its target
(e.g. battery full/empty); 0.0 means it is tracking well.

The EMA is weighted against :data:`SATURATION_REFERENCE_DT` so that
batteries polling at different cadences converge to the same score
under the same physical conditions. Concretely, for a real
inter-sample interval ``dt`` the effective per-update weight is
``1 - (1 - alpha) ** (dt / dt_ref)`` and the decay is
``decay_factor ** (dt / dt_ref)``. At ``dt == dt_ref`` both reduce
to the previous per-sample formulas.

State is stored externally in :class:`BalancerConsumerState` objects;
this class holds only configuration and algorithm logic.
"""
Expand Down Expand Up @@ -159,6 +184,9 @@ def update(
if abs(actual) >= self._min_target:
state.saturation_grace_until = 0.0
state.saturation_grace_started_at = 0.0
# Re-seed so the first post-grace update applies one
# reference-period step rather than a stale dt dose.
state.last_saturation_update = 0.0
elif (
target_abs >= self._min_target
and state.saturation_grace_started_at > 0
Expand All @@ -168,12 +196,14 @@ def update(
state.saturation_score = 1.0
state.saturation_grace_until = 0.0
state.saturation_grace_started_at = 0.0
state.last_saturation_update = 0.0
return
else:
return
else:
state.saturation_grace_until = 0.0
state.saturation_grace_started_at = 0.0
state.last_saturation_update = 0.0
# Detect sign reversal: target says one direction, actual is still
# in the opposite direction. The battery is healthy but ramping to
# the new direction — not saturated. Treat like low-target (decay).
Expand All @@ -182,32 +212,52 @@ def update(
sign_reversing = (
target_sign != 0 and actual_sign != 0 and target_sign != actual_sign
)
# Compute elapsed time since the previous EMA step with guards.
# First sample (prev_t == 0) is treated as a full reference-period
# step so a cold start still responds to the very first poll; this
# is the "option (b)" seeding described in the class docstring.
# A backwards clock (NTP correction) is clamped to zero; a long
# gap (battery offline) is dropped and re-seeded so we never dose
# the EMA with hundreds of seconds of rise or decay.
prev_t = state.last_saturation_update
if prev_t <= 0.0:
prev_t = now - SATURATION_REFERENCE_DT
dt = max(0.0, now - prev_t)
state.last_saturation_update = now
if dt == 0.0:
return
if dt > SATURATION_LONG_GAP_SECONDS:
return
ratio = dt / SATURATION_REFERENCE_DT
if target_abs < self._min_target or sign_reversing:
prev = state.saturation_score
if prev > 0:
decayed = prev * self._decay_factor
decayed = prev * (self._decay_factor**ratio)
if decayed < 0.001:
state.saturation_score = 0.0
else:
state.saturation_score = decayed
return
inst_saturation = 1.0 if abs(actual) < self._min_target else 0.0
alpha_eff = 1.0 - (1.0 - self._alpha) ** ratio
prev = state.saturation_score
state.saturation_score = (
self._alpha * inst_saturation + (1 - self._alpha) * prev
)
state.saturation_score = alpha_eff * inst_saturation + (1 - alpha_eff) * prev

def get(self, state: BalancerConsumerState) -> float:
return state.saturation_score

def set_grace(self, state: BalancerConsumerState, deadline: float) -> None:
state.saturation_grace_until = deadline
state.saturation_grace_started_at = self._clock()
# Pause tracking until grace ends; the next real update will
# re-seed via the prev_t <= 0 path.
state.last_saturation_update = 0.0

def clear(self, state: BalancerConsumerState) -> None:
state.saturation_score = 0.0
state.saturation_grace_until = 0.0
state.saturation_grace_started_at = 0.0
state.last_saturation_update = 0.0


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -586,7 +636,22 @@ def compute_target(
cid: r for cid, r in all_reports.items() if cid not in inactive
}

# Update saturation (skip manual consumers)
# Update saturation (skip manual, probe, and deprioritized consumers).
#
# Deprioritized consumers are steered toward zero, but while their
# ``_fade_efficiency_weights`` EMA is still winding down from 1.0
# their ``last_target`` carries a transient, non-zero value from
# the fade path (see ``_compute_auto_target``). Feeding that
# transient into the saturation EMA causes a false-positive
# "cannot follow target" spike for a battery that's really just
# in the process of being phased out — and with the time-weighted
# EMA that spike is large enough to stay above the swap threshold
# for many ticks, locking ``_maybe_force_swap_saturated`` out of
# ever promoting the consumer back. Simply skipping the update
# while the consumer is deprioritized leaves the score pinned to
# whatever the symmetric clear in ``_compute_efficiency_deprioritized``
# set it to (zero), which is exactly what the swap path expects
# for a "healthy" candidate.
state = self._get_consumer(consumer_id) if consumer_id else None
last_target = state.last_target if state else None
if (
Expand All @@ -595,6 +660,7 @@ def compute_target(
and consumer_id in active_reports
and consumer_mode.mode != "manual"
and consumer_id not in self._probe_participants()
and consumer_id not in self._deprioritized
):
actual = parse_int(active_reports.get(consumer_id, {}).get("power", 0))
self._saturation.update(state, last_target, actual)
Expand Down Expand Up @@ -994,8 +1060,20 @@ def _compute_efficiency_deprioritized(
for cid in deprioritized - self._deprioritized:
state = self._consumers.get(cid)
if state:
state.saturation_grace_until = 0.0
state.saturation_grace_started_at = 0.0
# Clearing saturation here is symmetric with the
# `deprioritized -> active` branch above (line 1018):
# the score is a memory of the *previous* role, and once
# the consumer is moved into the deprioritized set it
# will be steered toward zero, so any residual score is
# no longer an accurate estimate of whether it could
# follow an active-slot target. Without this clear,
# a consumer that accumulated saturation during an
# active-to-deprioritized transition (common when the
# time-weighted EMA integrates over the fading window)
# cannot be promoted back via `_maybe_force_swap_saturated`
# because that path requires a healthy deprioritized
# candidate.
self._saturation.clear(state)
logger.info(
"Efficiency: deprioritizing consumer %s (demand %.0fW, %d active)",
cid[:16],
Expand All @@ -1018,7 +1096,21 @@ def _compute_efficiency_deprioritized(
def _maybe_force_swap_saturated(
self, priority: list[str], slots: int, now: float
) -> bool:
"""Swap a saturated active battery with a healthy deprioritized one."""
"""Swap a saturated active battery with a healthy deprioritized one.

A healthy candidate is one whose saturation score is *strictly
below* ``efficiency_saturation_threshold``. Note that this works
in concert with the symmetric-clear logic in
:meth:`_compute_efficiency_deprioritized`: when a consumer
transitions from active → deprioritized the saturation score is
cleared to zero (the score is a memory of the previous role and
no longer reflects the can-it-follow question relevant to the
new role). That clear guarantees a healthy candidate is
available the first time the balancer decides to swap a
newly-saturated active unit post-probe, which previously
dead-locked because both consumers were still above the threshold
during the fade window.
"""
cfg = self._cfg
if cfg.efficiency_saturation_threshold <= 0 or slots >= len(priority):
return False
Expand Down
2 changes: 1 addition & 1 deletion src/astrameter/ct002/ct002.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def __init__(
ct_mac="",
ct_type="HME-4",
wifi_rssi=-50,
dedupe_time_window=0,
dedupe_time_window=0.0,
consumer_ttl=120,
debug_status=False,
active_control=True,
Expand Down
4 changes: 3 additions & 1 deletion src/astrameter/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ async def run_device(
ct_mac = cfg.get(ct_section, "CT_MAC", fallback="")
ct_udp_port = cfg.getint(ct_section, "UDP_PORT", fallback=UDP_PORT)
wifi_rssi = cfg.getint(ct_section, "WIFI_RSSI", fallback=-50)
dedupe_time_window = cfg.getint(ct_section, "DEDUPE_TIME_WINDOW", fallback=0)
dedupe_time_window = cfg.getfloat(
ct_section, "DEDUPE_TIME_WINDOW", fallback=0.0
)
consumer_ttl = cfg.getint(ct_section, "CONSUMER_TTL", fallback=120)
debug_status = cfg.getboolean(ct_section, "DEBUG_STATUS", fallback=False)
if os.environ.get("DEBUG_STATUS", "").lower() in ("1", "true", "yes"):
Expand Down
2 changes: 1 addition & 1 deletion src/astrameter/web_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def _pm(**extras: dict[str, object]) -> dict[str, dict[str, object]]:
"CT002": {
"UDP_PORT": {"type": "integer"},
"WIFI_RSSI": {"type": "integer"},
"DEDUPE_TIME_WINDOW": {"type": "integer"},
"DEDUPE_TIME_WINDOW": {"type": "float", "min": 0},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

min: 0 is schema-only unless runtime also enforces it

Good change on Line 277, but this constraint currently appears enforced only by the web editor metadata. CT002 logic still accepts direct negative values from config files, which can bypass dedupe behavior unexpectedly. Consider adding runtime validation/clamping where dedupe_time_window is consumed (e.g., in CT002.__init__).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/astrameter/web_config.py` at line 277, The web schema added
"DEDUPE_TIME_WINDOW" with min: 0 but runtime still accepts negative values;
update the runtime where dedupe_time_window is consumed (e.g., in
CT002.__init__) to validate and clamp/raise on invalid values: read the incoming
config value for dedupe_time_window (or use the DEDUPE_TIME_WINDOW key), check
if value is None or < 0, then either set it to 0 (clamp) or raise a clear
ValueError/ConfigError, and ensure downstream code uses this validated value;
add a small unit test for CT002 to assert negative inputs are rejected or
clamped accordingly.

"CONSUMER_TTL": {"type": "integer"},
"DEBUG_STATUS": {"type": "boolean"},
"ACTIVE_CONTROL": {"type": "boolean"},
Expand Down
Loading
Loading