Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next

- **Changed** the CT002/CT003 emulator (Python and ESPHome) to behave more like a real CT: with active control off (`ACTIVE_CONTROL = False`), batteries now see each other's reported power, and batteries still detecting their phase or running in combined mode no longer skew the phase-A share split; a battery that goes silent now drops out after missing ~2 of its own poll cycles instead of after a fixed 120 s — set `CONSUMER_TTL` / `consumer_ttl` to keep a fixed window ([#457](https://github.com/tomquist/astrameter/issues/457), [#460](https://github.com/tomquist/astrameter/issues/460), [#462](https://github.com/tomquist/astrameter/issues/462)).
- **Changed** the bundled **battery simulator** and CT002/CT003 emulator to match real Marstek behavior more closely: the simulator now steers with the gain-scheduled, accelerating self-consumption controller real Venus-class hardware runs (replacing the old `output + grid` integral rule), including the firmware's pre-ramp input-conditioning gate — a >50 W single-sample spike filter, a ±20 W deadband, and a small-residual-import hold — so it no longer jitters against sub-threshold or transient grid noise near the null point; the non-physical `idle_on_cross_phase_discharge` simulator option (which modelled cross-phase coordination real firmware doesn't implement) was removed; relay mode (`ACTIVE_CONTROL = False`) now forwards the real per-phase battery count so several batteries on one phase each take their 1/N share instead of all chasing the full residual; the optional request `participate` field (the 7th field newer batteries such as the B2500 send) is now honored — a battery sending `participate=0` is excluded from per-phase aggregation and active-control distribution (absent/non-zero preserves prior behavior); and the simulator can now model a **DC-coupled B2500** (set a battery's `meter_dev_type` to an `HMA*`/`HMJ*`/`HMK*` type), which steers its DC output into an external microinverter with the B2500's integer hysteresis law and never charges from AC, instead of the Venus ramp.
- **Added** the AVM **FRITZ!Smart Energy 250** smart-meter read head as a power source (`[FRITZ]`). AstraMeter logs into the FRITZ!Box over the AHA-HTTP-Interface (`login_sid.lua` + `getdevicelistinfos`) and reads the read head's signed grid power by AIN — positive = import, negative = feed-in. See [docs/powermeters.md](docs/powermeters.md#fritzsmart-energy-250).
- **Added** the **Min DC Output** option to the Home Assistant add-on's Configuration tab, so it no longer requires a custom config file to set. The web config generator's add-on target now emits it too ([#446](https://github.com/tomquist/astrameter/issues/446)).
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ WIFI_RSSI = -50
# Also supported by the Shelly emulator (keyed by battery IP); set it under
# [GENERAL] to apply regardless of the emulated device type.
DEDUPE_TIME_WINDOW = 0
# Forget consumers after this many seconds without updates (multi-consumer support)
# Forget consumers after this many seconds without updates (multi-consumer support).
# Unset (default): adaptive — a battery is dropped after missing ~2 of its own
# poll cycles (min 5s), like the real CT. Set a number for a fixed window.
CONSUMER_TTL = 120
```

Expand Down
4 changes: 3 additions & 1 deletion config.ini.example
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ THROTTLE_INTERVAL = 0
## Ignore repeated requests from the same consumer within this window (seconds).
## Overrides the [GENERAL] DEDUPE_TIME_WINDOW for this section.
# DEDUPE_TIME_WINDOW = 0
## Forget consumers after this many seconds without updates (multi-consumer support)
## Forget consumers after this many seconds without updates (multi-consumer support).
## Unset (default): adaptive — a battery is dropped after missing ~2 of its own
## poll cycles (min 5s), like the real CT. Set a number for a fixed window.
# CONSUMER_TTL = 120
## Log concise status (phase consumption + consumer charge/discharge) on each request when enabled
# DEBUG_STATUS = False
Expand Down
12 changes: 9 additions & 3 deletions docs/ct002-ct003-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,12 @@ phase `D`: the meter is wired across a 3‑phase supply and the battery is set t
compensate the **total** grid exchange, not a single phase. The `x` bucket is
the transient state while a device is still detecting its phase (`phase_t = 0`).

> The AstraMeter emulator models phases `A`/`B`/`C` and folds `D` and any
> non‑`A/B/C` value into the unassigned/inspection path; it does not yet
> implement the combined (`ABC`) control mode.
> The AstraMeter emulator mirrors this bucketing: `'0'`/unassigned reporters
> aggregate into the `x_*` fields, phase‑`D` reporters into the `ABC_*` fields
> and the `ABC_chrg_nb` count, and `A`/`B`/`C` into their own buckets. It does
> not yet implement a combined (`ABC`) **control** mode, though: a phase‑`D`
> battery is served the relay path (raw grid reading + aggregates) even when
> active control is on, exactly as during inspection.

### CT003 energy fields (fields 25–28)

Expand All @@ -226,6 +229,9 @@ the reporter's IP, type, phase, signed power, and `participate` flag.
Per response cycle the CT:

1. **Evicts stale slots** — a slot not refreshed within ~1–2 cycles is cleared.
(AstraMeter mirrors this by default: a consumer that misses ~2 of its own
poll cycles drops out of the counts/aggregates; set `CONSUMER_TTL` /
`consumer_ttl` to use a fixed window instead.)
2. **Builds per‑bucket aggregates** over the live slots, but only for a slot that
is occupied **and** has `participate != 0`:
- bucket by phase (`A`/`B`/`C`, the combined `ABC` bucket for phase `D`, or
Expand Down
4 changes: 3 additions & 1 deletion esphome.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ ct002:
# # false → forward raw grid readings as-is
# max_sensor_age: 30s # treat the input as 0 if no sensor update
# # arrives within this window
# consumer_ttl: 120s # evict a battery this long after it goes silent
# consumer_ttl: 120s # fixed eviction window for a silent battery;
# # unset (default) adapts to each battery's
# # poll rate (~2 missed polls, like the real CT)
# dedupe_window: 0s # 0 = off; >0 drops duplicate polls from the
# # same battery within the window

Expand Down
18 changes: 10 additions & 8 deletions esphome/components/ct002/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,12 @@ def _resolve_mqtt_insights_device_id(device_id_opt: str) -> str:
cv.Optional(
CONF_MAX_SENSOR_AGE, default="30s"
): cv.positive_time_period_milliseconds,
# TTL after which a silent consumer is evicted, matching
# Python's consumer_ttl default. Lower it for fleets with
# short-lived bench-test batteries; raise it if your network
# has long polling gaps.
cv.Optional(
CONF_CONSUMER_TTL, default="120s"
): cv.positive_time_period_seconds,
# Fixed TTL after which a silent consumer is evicted. Unset
# (default) = adaptive eviction (~2 missed poll cycles per
# consumer, like the real CT), matching Python's consumer_ttl
# default. Set a fixed value if your network has long polling
# gaps.
cv.Optional(CONF_CONSUMER_TTL): cv.positive_time_period_seconds,
# Drop repeat polls from the same battery within this window.
# 0 (default) disables dedup, matching Python's
# dedupe_time_window=0.0. Useful on noisy networks where a
Expand Down Expand Up @@ -380,7 +379,10 @@ async def to_code(config):
cg.add(var.set_udp_port(config[CONF_UDP_PORT]))
cg.add(var.set_active_control(config[CONF_ACTIVE_CONTROL]))
cg.add(var.set_max_sensor_age_ms(config[CONF_MAX_SENSOR_AGE].total_milliseconds))
cg.add(var.set_consumer_ttl_seconds(int(config[CONF_CONSUMER_TTL].total_seconds)))
if CONF_CONSUMER_TTL in config:
cg.add(
var.set_consumer_ttl_seconds(int(config[CONF_CONSUMER_TTL].total_seconds))
)
cg.add(var.set_dedupe_window_ms(int(config[CONF_DEDUPE_WINDOW].total_milliseconds)))

if CONF_TEST_CONTROL_PORT in config:
Expand Down
140 changes: 100 additions & 40 deletions esphome/components/ct002/ct002.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,24 @@ int parse_int_strict(const std::string &s, int default_value) {
return static_cast<int>(parsed);
}

// Mirror of Python's _bucket_for_phase: A/B/C → their buckets, "D" → the
// combined ABC bucket, anything else (the normalized "0") → x.
size_t bucket_index_for_phase(const std::string &phase) {
if (phase == "A") return BUCKET_A;
if (phase == "B") return BUCKET_B;
if (phase == "C") return BUCKET_C;
if (phase == "D") return BUCKET_ABC;
return BUCKET_X;
}

} // namespace

// Wall-clock seconds for balancer/saturation accounting. Uses ESPHome's
// monotonic millis() because absolute wall time isn't available on bare
// metal at boot; the balancer only cares about deltas, so a monotonic
// reference is fine. Under the test-hooks build the e2e harness can engage
// a mock clock for deterministic time-gated behaviour.
double CT002Component::now_seconds_() {
double CT002Component::now_seconds_() const {
#ifdef USE_CT002_TEST_HOOKS
if (this->mock_clock_enabled_) return this->mock_clock_seconds_;
#endif
Expand Down Expand Up @@ -150,11 +160,11 @@ void CT002Component::setup() {
this->start_control_server_();
#endif

// Consumer eviction — fires every 5 s and evicts anything older than
// consumer_ttl_seconds_ (default 120 s). Mirrors Python's
// _cleanup_consumers loop (ct002.py:330-341). Without this the
// consumers_ map grows unbounded across battery turnover and the
// mqtt_insights "offline" availability is never published.
// Consumer eviction — fires every 5 s and evicts anything older than its
// TTL (the configured consumer_ttl, or by default ~2 missed poll cycles —
// see consumer_ttl_for_). Mirrors Python's _cleanup_consumers loop.
// Without this the consumers_ map grows unbounded across battery turnover
// and the mqtt_insights "offline" availability is never published.
this->set_interval("ct002_evict", 5000, [this]() { this->evict_stale_consumers_(); });

ESP_LOGCONFIG(TAG, "CT002 setup: %u phase(s), ct_type=%s, udp_port=%u",
Expand Down Expand Up @@ -270,7 +280,11 @@ void CT002Component::handle_request_(const uint8_t *data, size_t len,
}

const std::string meter_dev_type = fields[0];
this->update_consumer_report_(consumer_id, in_inspection_mode ? "A" : reported_phase,
// Store the phase exactly as reported: "D" selects the combined ABC bucket
// and any inspection marker is normalized to "0" (the x bucket) inside
// update_consumer_report_ — forcing "A" here would mis-count inspection
// and combined reporters into phase A (issue #460).
this->update_consumer_report_(consumer_id, reported_phase,
static_cast<float>(reported_power), meter_dev_type, addr_ip,
participates);

Expand Down Expand Up @@ -340,8 +354,15 @@ void CT002Component::update_consumer_report_(const std::string &consumer_id,
const std::string &phase, float power,
const std::string &device_type,
const std::string &source_ip, bool participates) {
std::string normalized_phase = phase.empty() ? std::string("A") : phase;
std::string normalized_phase = phase;
for (auto &c : normalized_phase) c = static_cast<char>(std::toupper(c));
if (normalized_phase != "A" && normalized_phase != "B" && normalized_phase != "C" &&
normalized_phase != "D") {
// Anything else ("0", empty, future markers) is the unassigned/
// inspection state; store the wire's canonical "0" so aggregation
// routes it to the x bucket instead of inventing a phase (issue #460).
normalized_phase = "0";
}
auto &consumer = this->get_consumer_(consumer_id);
const double now = now_seconds_();
// Capture the prior phase BEFORE the update. Python keys "is there a
Expand Down Expand Up @@ -374,15 +395,14 @@ void CT002Component::update_consumer_report_(const std::string &consumer_id,
if (!source_ip.empty()) consumer.last_ip = source_ip;

// Phase detected (new battery) / phase changed (re-detected on a
// different leg) — mirrors ct002.py:315-328. Only fires for a real
// A/B/C phase that differs from the prior one; inspection-mode polls
// (phase "A" forced) don't spuriously trigger because their phase is
// normalized to "A" and an actual A reporter wouldn't change.
const bool valid_phase =
normalized_phase == "A" || normalized_phase == "B" || normalized_phase == "C";
if (valid_phase && previous_phase != normalized_phase) {
const bool prior_valid =
previous_phase == "A" || previous_phase == "B" || previous_phase == "C";
// different leg) — mirrors Python's _update_consumer_report. Only fires
// for a declared A/B/C/D phase that differs from the prior one;
// inspection-mode polls (normalized to "0") never trigger it.
const auto is_declared = [](const std::string &p) {
return p == "A" || p == "B" || p == "C" || p == "D";
};
if (is_declared(normalized_phase) && previous_phase != normalized_phase) {
const bool prior_valid = is_declared(previous_phase);
if (prior_valid) {
ESP_LOGI(TAG, "CT002 consumer %s phase changed: %s -> %s", consumer_id.c_str(),
previous_phase.c_str(), normalized_phase.c_str());
Expand Down Expand Up @@ -419,26 +439,50 @@ ReportMap CT002Component::collect_reports_for_balancer_() const {
return out;
}

double CT002Component::consumer_ttl_for_(const Consumer &c) const {
if (this->consumer_ttl_seconds_.has_value())
return static_cast<double>(*this->consumer_ttl_seconds_);
if (!c.poll_interval.has_value()) return ADAPTIVE_TTL_FALLBACK_SECONDS;
return std::max(ADAPTIVE_TTL_MIN_SECONDS,
ADAPTIVE_TTL_POLL_MULTIPLIER * static_cast<double>(*c.poll_interval));
}

bool CT002Component::consumer_expired_(const Consumer &c, double now) const {
return c.timestamp > 0.0 && now - c.timestamp > this->consumer_ttl_for_(c);
}

CT002Component::PhaseReports CT002Component::collect_reports_by_phase_() const {
PhaseReports out;
const double now = this->now_seconds_();
for (const auto &kv : this->consumers_) {
const auto &c = kv.second;
if (c.timestamp <= 0.0) continue;
// Respect the request's "participate" flag: a battery that opted out (7th
// field == 0) is not aggregated into the per-phase buckets or the count.
if (!c.participates) continue;
std::string phase = c.phase;
for (auto &ch : phase) ch = static_cast<char>(std::toupper(ch));
size_t idx = 0;
if (phase == "A") idx = 0;
else if (phase == "B") idx = 1;
else if (phase == "C") idx = 2;
else idx = 0;
// Count every battery reporting on the phase (regardless of power) so relay
// mode can forward the real per-phase battery count (each battery divides
// the forwarded aggregate by it to take its 1/N share).
// The real CT clears a slot that missed ~1-2 poll cycles before
// aggregating, so a battery that drops off the network stops being
// counted almost immediately. Mirror that per response here; the cleanup
// interval removes the entry shortly after (issue #462).
if (this->consumer_expired_(c, now)) continue;
const size_t idx = bucket_index_for_phase(c.phase);
// Count every battery reporting into the bucket (regardless of power) so
// relay mode can forward the real per-phase battery count (each battery
// divides the forwarded aggregate by it to take its 1/N share).
out.count[idx] += 1;
const float power = static_cast<float>(round_half_even(c.last_instructed_power));
float power;
if (this->active_control_ && idx >= BUCKET_A && idx <= BUCKET_C) {
// Active control: aggregate the net power we *instructed* this
// consumer to be at, so PV passthrough doesn't masquerade as
// discharge (issue #376).
power = static_cast<float>(round_half_even(c.last_instructed_power));
} else {
// Relay mode forwards each battery's *reported* power, exactly like
// the real CT (issue #457). x/ABC consumers are never actively
// instructed, so their reported power is the only truthful signal in
// either mode.
power = static_cast<float>(round_half_even(static_cast<double>(c.power)));
}
if (power == 0.0f) continue;
out.active[idx] = true;
if (power < 0.0f) out.chrg_power[idx] += power;
Expand Down Expand Up @@ -541,20 +585,31 @@ std::vector<std::string> CT002Component::build_response_fields_(
const auto phase_reports = this->collect_reports_by_phase_();
const float phase_power[3] = {phase_a, phase_b, phase_c};
for (size_t i = 0; i < 3; ++i) {
const size_t bucket = BUCKET_A + i;
if (this->active_control_) {
// Active control distributes a per-consumer target, so each battery
// applies it as-is: report a count of 1 when the phase is active.
if (phase_reports.active[i] || phase_power[i] != 0.0f) {
if (phase_reports.active[bucket] || phase_power[i] != 0.0f) {
fields[8 + i] = "1";
}
} else {
// Relay mode forwards the per-phase aggregate; report the real battery
// count so each battery takes its 1/N share.
fields[8 + i] = std::to_string(phase_reports.count[i]);
fields[8 + i] = std::to_string(phase_reports.count[bucket]);
}
fields[15 + i] = to_int_str(phase_reports.chrg_power[i]);
fields[20 + i] = to_int_str(phase_reports.dchrg_power[i]);
}
fields[15 + i] = to_int_str(phase_reports.chrg_power[bucket]);
fields[20 + i] = to_int_str(phase_reports.dchrg_power[bucket]);
}
// x (unassigned/inspection) bucket — chrg/dchrg only; the response carries
// no x count field.
fields[14] = to_int_str(phase_reports.chrg_power[BUCKET_X]);
fields[19] = to_int_str(phase_reports.dchrg_power[BUCKET_X]);
// ABC (combined, phase "D") bucket. Combined-mode consumers are never
// actively instructed (the emulator has no combined control mode), so they
// are effectively relayed in both modes: forward the real count.
fields[11] = std::to_string(phase_reports.count[BUCKET_ABC]);
fields[18] = to_int_str(phase_reports.chrg_power[BUCKET_ABC]);
fields[23] = to_int_str(phase_reports.dchrg_power[BUCKET_ABC]);

while (fields.size() < RESPONSE_LABEL_COUNT) fields.push_back("0");
this->info_idx_counter_ = (this->info_idx_counter_ + 1) % 256;
Expand Down Expand Up @@ -711,10 +766,9 @@ void CT002Component::set_consumer_auto_target(const std::string &consumer_id, bo

void CT002Component::evict_stale_consumers_() {
const double now = now_seconds_();
const double ttl = static_cast<double>(this->consumer_ttl_seconds_);
std::vector<std::string> stale;
for (const auto &kv : this->consumers_) {
if (kv.second.timestamp > 0.0 && now - kv.second.timestamp > ttl) {
if (this->consumer_expired_(kv.second, now)) {
stale.push_back(kv.first);
}
}
Expand All @@ -727,13 +781,19 @@ void CT002Component::evict_stale_consumers_() {
if (this->balancer_) this->balancer_->remove_consumer(id);
}
if (!stale.empty()) {
ESP_LOGD(TAG, "Evicted %u stale consumer(s) (ttl=%us)",
static_cast<unsigned>(stale.size()), this->consumer_ttl_seconds_);
ESP_LOGD(TAG, "Evicted %u stale consumer(s)", static_cast<unsigned>(stale.size()));
}
// Purge dedup timestamps older than the TTL (Python: ct002.py:341
// _dedup.purge_older_than(self.consumer_ttl)).
// Purge dedup timestamps — entries only matter within the dedupe window;
// with an adaptive TTL there is no single number, so purge on a horizon
// safely past any per-consumer TTL and the dedupe window itself (mirrors
// Python's _cleanup_consumers purge).
if (!this->dedup_last_.empty()) {
const double cutoff = now - ttl;
const double horizon =
this->consumer_ttl_seconds_.has_value()
? static_cast<double>(*this->consumer_ttl_seconds_)
: std::max(ADAPTIVE_TTL_FALLBACK_SECONDS,
static_cast<double>(this->dedupe_window_ms_) / 1000.0);
const double cutoff = now - horizon;
for (auto it = this->dedup_last_.begin(); it != this->dedup_last_.end();) {
if (it->second < cutoff) it = this->dedup_last_.erase(it);
else ++it;
Expand Down
Loading
Loading