Skip to content

Commit 509fb9c

Browse files
committed
feat(meter): add ecoflow-stream-mqtt with public-API MQTT and cascade discovery
Adds a dedicated device template for the EcoFlow Stream family that uses EcoFlow's public-API MQTT broker for live data. The existing `ecoflow-stream` template (REST-only via the generic `ecoflow` meter) is kept untouched so no existing configuration changes behaviour. Why --- The REST `quota/all` endpoint returns a stale, largely aggregated snapshot for Stream devices. Notably: - individual PV string power (`powGetPv`..`powGetPv4`) is never populated, only `powGetPvSum`, which includes output from unrelated PV sources on the same cloud meter; - battery power on the CMS level (`powGetBpCms`) is 0 for Stream AC Hybrid installations; - `cmsBattSoc` is always 0; the real per-module SoC lives in `f32ShowSoc`; - in cascade/multi-unit setups only the master's view is reachable via REST. Home Assistant's `hassio-ecoflow-cloud` integration gets these values by subscribing to EcoFlow's public MQTT broker. evcc now does the same. What ---- - `meter/ecoflow_mqtt.go`: shared public-API MQTT client, one per (access key, region). HMAC-SHA256 signed certification call to `/iot-open/sign/certification` via `request.NewClient` (inherits `request.Timeout` and the instrumented transport), connect over TLS, subscribe to `/open/<certificateAccount>/<sn>/quota`. The client is reused across every ecoflow-stream-mqtt meter so only one of EcoFlow's 10 unique-client-ID/day slots is consumed. Client ID is hashed from access key + hostname so prod and dev instances on the same account can coexist without disconnecting each other. Handles both flat Stream payloads and the {"params":{...}} envelopes used by other device families. - `meter/ecoflow_stream_mqtt.go`: new meter type `ecoflow-stream-mqtt`. Responsibilities: * map evcc usage (grid/pv/battery) to the right EcoFlow keys (powGetSysGrid / powGetPv..4 / inputWatts-outputWatts + f32ShowSoc); * optionally discover sibling devices on the same account via `/iot-open/sign/device/list`, matching by the 2-char serial prefix so cascade systems (e.g. two Stream AC Hybrids linked as master+slave) appear as one aggregated meter; * subscribe each discovered serial to MQTT and cache the latest values; * prefer the MQTT cache for reads, fall back to per-serial REST with `util.Cached`; * sum CurrentPower across devices and return a plain-average SoC, matching the cascade view shown in the EcoFlow app; * tolerate per-device REST errors: a transient failure on one cascade unit is logged and skipped so healthy devices keep reporting; only when every device fails is the last error surfaced. - `templates/definition/meter/ecoflow-stream-mqtt.yaml`: new device entry listed as "EcoFlow Stream (MQTT)". Exposes the same access key / secret key / serial / usage inputs as `ecoflow-stream` plus an advanced `discover` toggle (default true). Verification ------------ Live-tested with a cascade of two EcoFlow Stream AC Hybrid units. With a single configured master serial and discover=true, evcc now reports: [ecoflow-stream-mqtt] INFO discover: tracking 2 device(s): BK11...,BK31... [site] pv 1 power: 593W [site] battery 1 power: -1067W [site] battery 1 soc: 37% matching the EcoFlow mobile-app cascade view within the usual smoothing lag. `go test ./...` and `golangci-lint run` are clean.
1 parent 4de7791 commit 509fb9c

3 files changed

Lines changed: 844 additions & 0 deletions

File tree

meter/ecoflow_mqtt.go

Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
package meter
2+
3+
import (
4+
"context"
5+
"crypto/hmac"
6+
"crypto/sha256"
7+
"crypto/tls"
8+
"encoding/hex"
9+
"encoding/json"
10+
"fmt"
11+
"io"
12+
"maps"
13+
"net/http"
14+
"os"
15+
"strconv"
16+
"sync"
17+
"time"
18+
19+
paho "github.com/eclipse/paho.mqtt.golang"
20+
"github.com/evcc-io/evcc/util"
21+
"github.com/evcc-io/evcc/util/request"
22+
)
23+
24+
// ecoflowMqttClient maintains a shared MQTT subscription to EcoFlow's public
25+
// IoT broker. The REST `quota/all` endpoint returns only a static subset of
26+
// parameters for Stream devices; live per-string PV power, per-second state
27+
// etc. are delivered exclusively via MQTT. Each configured EcoFlow meter
28+
// registers its device serial here and reads from the in-memory cache.
29+
type ecoflowMqttClient struct {
30+
log *util.Logger
31+
32+
accessKey string
33+
secretKey string
34+
uri string // REST base, e.g. https://api-e.ecoflow.com
35+
http *http.Client
36+
37+
startMu sync.Mutex // serializes the initial certification/connect
38+
mu sync.Mutex
39+
state map[string]map[string]any // sn -> key -> latest value
40+
devices map[string]struct{} // subscribed serials
41+
client paho.Client
42+
username string // certificateAccount, used in quota topic
43+
started bool
44+
}
45+
46+
// ecoflowMqttClients caches one client per (accessKey, uri) pair so we don't
47+
// exceed EcoFlow's 10 unique client-ID/day limit and to share the MQTT
48+
// connection across multiple ecoflow meters for the same account.
49+
var (
50+
ecoflowMqttClientsMu sync.Mutex
51+
ecoflowMqttClients = map[string]*ecoflowMqttClient{}
52+
)
53+
54+
func ecoflowMqttClientFor(accessKey, secretKey, uri string) *ecoflowMqttClient {
55+
ecoflowMqttClientsMu.Lock()
56+
defer ecoflowMqttClientsMu.Unlock()
57+
58+
key := accessKey + "|" + uri
59+
if c, ok := ecoflowMqttClients[key]; ok {
60+
return c
61+
}
62+
log := util.NewLogger("ecoflow-mqtt")
63+
c := &ecoflowMqttClient{
64+
log: log,
65+
accessKey: accessKey,
66+
secretKey: secretKey,
67+
uri: uri,
68+
http: request.NewClient(log),
69+
state: make(map[string]map[string]any),
70+
devices: make(map[string]struct{}),
71+
}
72+
ecoflowMqttClients[key] = c
73+
return c
74+
}
75+
76+
// ecoflowCertResponse is the payload from GET /iot-open/sign/certification.
77+
type ecoflowCertResponse struct {
78+
Code string `json:"code"`
79+
Message string `json:"message"`
80+
Data struct {
81+
CertificateAccount string `json:"certificateAccount"`
82+
CertificatePassword string `json:"certificatePassword"`
83+
URL string `json:"url"`
84+
Port string `json:"port"`
85+
Protocol string `json:"protocol"`
86+
} `json:"data"`
87+
}
88+
89+
// sign builds the HMAC-SHA256 signature for a signed EcoFlow public API
90+
// request as documented in EcoFlow's developer portal.
91+
func ecoflowSign(accessKey, secretKey, nonce, timestamp, query string) string {
92+
target := "accessKey=" + accessKey + "&nonce=" + nonce + "&timestamp=" + timestamp
93+
if query != "" {
94+
target = query + "&" + target
95+
}
96+
h := hmac.New(sha256.New, []byte(secretKey))
97+
h.Write([]byte(target))
98+
return hex.EncodeToString(h.Sum(nil))
99+
}
100+
101+
// certify fetches MQTT credentials from the public API.
102+
func (c *ecoflowMqttClient) certify(ctx context.Context) (*ecoflowCertResponse, error) {
103+
nonce := strconv.Itoa(int(time.Now().UnixNano() % 1_000_000))
104+
timestamp := strconv.FormatInt(time.Now().UnixMilli(), 10)
105+
sign := ecoflowSign(c.accessKey, c.secretKey, nonce, timestamp, "")
106+
107+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.uri+"/iot-open/sign/certification", nil)
108+
if err != nil {
109+
return nil, err
110+
}
111+
req.Header.Set("accessKey", c.accessKey)
112+
req.Header.Set("nonce", nonce)
113+
req.Header.Set("timestamp", timestamp)
114+
req.Header.Set("sign", sign)
115+
116+
resp, err := c.http.Do(req)
117+
if err != nil {
118+
return nil, err
119+
}
120+
defer resp.Body.Close()
121+
122+
body, err := io.ReadAll(resp.Body)
123+
if err != nil {
124+
return nil, err
125+
}
126+
if resp.StatusCode != http.StatusOK {
127+
return nil, fmt.Errorf("certification: http %d: %s", resp.StatusCode, string(body))
128+
}
129+
130+
var cert ecoflowCertResponse
131+
if err := json.Unmarshal(body, &cert); err != nil {
132+
return nil, fmt.Errorf("certification: %w (body=%s)", err, string(body))
133+
}
134+
if cert.Code != "0" {
135+
return nil, fmt.Errorf("certification: code=%s message=%s", cert.Code, cert.Message)
136+
}
137+
return &cert, nil
138+
}
139+
140+
// ensureStarted lazily establishes the MQTT connection on the first device
141+
// registration. Subsequent calls are no-ops.
142+
func (c *ecoflowMqttClient) ensureStarted(ctx context.Context) error {
143+
c.startMu.Lock()
144+
defer c.startMu.Unlock()
145+
146+
c.mu.Lock()
147+
started := c.started
148+
c.mu.Unlock()
149+
if started {
150+
return nil
151+
}
152+
153+
cert, err := c.certify(ctx)
154+
if err != nil {
155+
return err
156+
}
157+
158+
port, err := strconv.Atoi(cert.Data.Port)
159+
if err != nil {
160+
return fmt.Errorf("invalid mqtt port %q: %w", cert.Data.Port, err)
161+
}
162+
163+
// Stable client id that is also unique per host. EcoFlow's broker
164+
// permits only a single concurrent connection per client id and imposes
165+
// a 10 unique-ID/day limit per account. Hashing (accessKey + hostname)
166+
// keeps the id stable across restarts while allowing multiple evcc
167+
// instances (e.g. prod + dev) to coexist on the same access key.
168+
hostname, _ := os.Hostname()
169+
h := sha256.Sum256([]byte(c.accessKey + "|" + hostname))
170+
clientID := "evcc-" + hex.EncodeToString(h[:6])
171+
172+
scheme := cert.Data.Protocol
173+
if scheme == "" {
174+
scheme = "ssl"
175+
}
176+
broker := fmt.Sprintf("%s://%s:%d", scheme, cert.Data.URL, port)
177+
178+
opts := paho.NewClientOptions()
179+
opts.AddBroker(broker)
180+
opts.SetUsername(cert.Data.CertificateAccount)
181+
opts.SetPassword(cert.Data.CertificatePassword)
182+
opts.SetClientID(clientID)
183+
opts.SetCleanSession(true)
184+
opts.SetAutoReconnect(true)
185+
opts.SetMaxReconnectInterval(time.Minute)
186+
opts.SetConnectTimeout(request.Timeout)
187+
opts.SetKeepAlive(30 * time.Second)
188+
opts.SetTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12})
189+
opts.SetOnConnectHandler(c.onConnect)
190+
opts.SetConnectionLostHandler(func(_ paho.Client, err error) {
191+
c.log.WARN.Printf("%s connection lost: %v", broker, err)
192+
})
193+
194+
client := paho.NewClient(opts)
195+
196+
// Publish username before Connect() so the OnConnect handler (which may
197+
// fire on a separate goroutine as soon as CONNACK arrives) can build the
198+
// correct topic. The client handle is published after connect succeeds.
199+
c.mu.Lock()
200+
c.username = cert.Data.CertificateAccount
201+
c.mu.Unlock()
202+
203+
c.log.INFO.Printf("connecting %s at %s as %s", clientID, broker, cert.Data.CertificateAccount)
204+
tok := client.Connect()
205+
if !tok.WaitTimeout(request.Timeout) {
206+
return fmt.Errorf("mqtt connect: timeout")
207+
}
208+
if err := tok.Error(); err != nil {
209+
return fmt.Errorf("mqtt connect: %w", err)
210+
}
211+
212+
c.mu.Lock()
213+
c.client = client
214+
c.started = true
215+
c.mu.Unlock()
216+
217+
return nil
218+
}
219+
220+
// Register subscribes the client to live updates for the given device serial.
221+
// Safe to call multiple times; extra calls are no-ops.
222+
//
223+
// The serial is added to the device set *before* the connection is
224+
// established so that the OnConnect callback picks it up and subscribes.
225+
// This avoids a race where a duplicate manual Subscribe() would overlap with
226+
// the one fired from OnConnect on initial connect.
227+
func (c *ecoflowMqttClient) Register(ctx context.Context, sn string) error {
228+
c.mu.Lock()
229+
_, existed := c.devices[sn]
230+
c.devices[sn] = struct{}{}
231+
started := c.started
232+
client := c.client
233+
username := c.username
234+
c.mu.Unlock()
235+
236+
if existed {
237+
return nil
238+
}
239+
240+
if !started {
241+
// First caller bootstraps the connection; OnConnect will subscribe
242+
// to every registered device once the link is up.
243+
return c.ensureStarted(ctx)
244+
}
245+
246+
// Connection already established by a previous Register call;
247+
// subscribe just for this new device.
248+
return c.subscribe(client, username, sn)
249+
}
250+
251+
func (c *ecoflowMqttClient) subscribe(client paho.Client, username, sn string) error {
252+
topic := fmt.Sprintf("/open/%s/%s/quota", username, sn)
253+
c.log.DEBUG.Printf("subscribe %s", topic)
254+
255+
token := client.Subscribe(topic, 1, func(_ paho.Client, msg paho.Message) {
256+
c.handleMessage(sn, msg.Payload())
257+
})
258+
if !token.WaitTimeout(request.Timeout) {
259+
return fmt.Errorf("subscribe %s: timeout", topic)
260+
}
261+
return token.Error()
262+
}
263+
264+
// onConnect is invoked after a (re)connection and re-subscribes to every
265+
// registered device.
266+
func (c *ecoflowMqttClient) onConnect(client paho.Client) {
267+
c.mu.Lock()
268+
username := c.username
269+
sns := make([]string, 0, len(c.devices))
270+
for sn := range c.devices {
271+
sns = append(sns, sn)
272+
}
273+
c.mu.Unlock()
274+
275+
c.log.DEBUG.Printf("connected, resubscribing %d device(s)", len(sns))
276+
for _, sn := range sns {
277+
if err := c.subscribe(client, username, sn); err != nil {
278+
c.log.ERROR.Printf("resubscribe %s: %v", sn, err)
279+
}
280+
}
281+
}
282+
283+
// handleMessage parses a quota payload and updates the cache. Payload shapes
284+
// observed on EcoFlow public MQTT:
285+
//
286+
// 1. Stream devices emit a flat JSON object with parameters at the top
287+
// level, e.g. {"powGetPv2":214.14,"gridConnectionPower":283.99}.
288+
// 2. Other devices wrap updates in a {"params": {...}} (or "param") envelope
289+
// and may include metadata like timestamp/typeCode at the top level.
290+
//
291+
// We accept both shapes: if "params"/"param" is present we take it, otherwise
292+
// we merge the top-level object as-is (ignoring a few well-known metadata
293+
// fields that are not quota parameters).
294+
func (c *ecoflowMqttClient) handleMessage(sn string, payload []byte) {
295+
c.log.TRACE.Printf("recv %s: %s", sn, string(payload))
296+
297+
var raw map[string]any
298+
if err := json.Unmarshal(payload, &raw); err != nil {
299+
c.log.TRACE.Printf("ignoring non-JSON payload on %s: %v", sn, err)
300+
return
301+
}
302+
303+
c.mu.Lock()
304+
defer c.mu.Unlock()
305+
306+
dev := c.state[sn]
307+
if dev == nil {
308+
dev = make(map[string]any)
309+
c.state[sn] = dev
310+
}
311+
312+
var merged bool
313+
if p, ok := raw["params"].(map[string]any); ok {
314+
maps.Copy(dev, p)
315+
merged = true
316+
}
317+
if p, ok := raw["param"].(map[string]any); ok {
318+
maps.Copy(dev, p)
319+
merged = true
320+
}
321+
322+
if !merged {
323+
// flat payload: treat top-level keys as parameters
324+
for k, v := range raw {
325+
switch k {
326+
case "id", "version", "timestamp", "typeCode", "cmdFunc", "cmdId":
327+
// skip message envelope fields
328+
continue
329+
}
330+
dev[k] = v
331+
}
332+
}
333+
}
334+
335+
// Lookup returns the most recently cached MQTT value for the given serial and
336+
// parameter name. The boolean is false if either the device has not been seen
337+
// yet or the key has not been reported.
338+
func (c *ecoflowMqttClient) Lookup(sn, key string) (any, bool) {
339+
c.mu.Lock()
340+
defer c.mu.Unlock()
341+
342+
dev, ok := c.state[sn]
343+
if !ok {
344+
return nil, false
345+
}
346+
v, ok := dev[key]
347+
return v, ok
348+
}

0 commit comments

Comments
 (0)