-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathecoflow_mqtt.go
More file actions
292 lines (257 loc) Β· 8.65 KB
/
ecoflow_mqtt.go
File metadata and controls
292 lines (257 loc) Β· 8.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
package meter
import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"maps"
"net/http"
"os"
"strconv"
"sync"
"time"
"github.com/evcc-io/evcc/plugin/mqtt"
"github.com/evcc-io/evcc/util"
"github.com/evcc-io/evcc/util/request"
)
// ecoflowMqttClient maintains a shared MQTT subscription to EcoFlow's public
// IoT broker. The REST `quota/all` endpoint returns only a static subset of
// parameters for Stream devices; live per-string PV power, per-second state
// etc. are delivered exclusively via MQTT. Each configured EcoFlow meter
// registers its device serial here and reads from the in-memory cache.
//
// Transport is delegated to evcc's shared `plugin/mqtt.Client` (which handles
// the Paho options, TLS, reconnect, and automatic re-subscription of every
// `Listen`ed topic on reconnect). The EcoFlow-specific parts that remain
// here are the HMAC-signed HTTPS certification call, the stable
// per-host client ID (to respect EcoFlow's 10-IDs/account/day quota) and
// the flat-vs-envelope quota-payload parser.
type ecoflowMqttClient struct {
log *util.Logger
accessKey string
secretKey string
uri string // REST base, e.g. https://api-e.ecoflow.com
http *http.Client
startMu sync.Mutex // serializes the initial certification/connect
mu sync.Mutex
state map[string]map[string]any // sn -> key -> latest value
devices map[string]struct{} // serials for which Listen has been called
client *mqtt.Client
username string // certificateAccount, used in quota topic
}
// ecoflowMqttClients caches one client per (accessKey, uri) pair so the HTTPS
// certification call is made once per account, and so the resulting MQTT
// connection (and its 1-of-10 daily client-ID slot) is shared across every
// ecoflow meter for that account.
var (
ecoflowMqttClientsMu sync.Mutex
ecoflowMqttClients = map[string]*ecoflowMqttClient{}
)
func ecoflowMqttClientFor(accessKey, secretKey, uri string) *ecoflowMqttClient {
ecoflowMqttClientsMu.Lock()
defer ecoflowMqttClientsMu.Unlock()
key := accessKey + "|" + uri
if c, ok := ecoflowMqttClients[key]; ok {
return c
}
log := util.NewLogger("ecoflow-mqtt")
c := &ecoflowMqttClient{
log: log,
accessKey: accessKey,
secretKey: secretKey,
uri: uri,
http: request.NewClient(log),
state: make(map[string]map[string]any),
devices: make(map[string]struct{}),
}
ecoflowMqttClients[key] = c
return c
}
// ecoflowCertResponse is the payload from GET /iot-open/sign/certification.
type ecoflowCertResponse struct {
Code string `json:"code"`
Message string `json:"message"`
Data struct {
CertificateAccount string `json:"certificateAccount"`
CertificatePassword string `json:"certificatePassword"`
URL string `json:"url"`
Port string `json:"port"`
Protocol string `json:"protocol"`
} `json:"data"`
}
// ecoflowSign builds the HMAC-SHA256 signature for a signed EcoFlow public
// API request as documented in EcoFlow's developer portal.
func ecoflowSign(accessKey, secretKey, nonce, timestamp, query string) string {
target := "accessKey=" + accessKey + "&nonce=" + nonce + "×tamp=" + timestamp
if query != "" {
target = query + "&" + target
}
h := hmac.New(sha256.New, []byte(secretKey))
h.Write([]byte(target))
return hex.EncodeToString(h.Sum(nil))
}
// certify fetches MQTT credentials from the public API.
func (c *ecoflowMqttClient) certify(ctx context.Context) (*ecoflowCertResponse, error) {
nonce := strconv.Itoa(int(time.Now().UnixNano() % 1_000_000))
timestamp := strconv.FormatInt(time.Now().UnixMilli(), 10)
sign := ecoflowSign(c.accessKey, c.secretKey, nonce, timestamp, "")
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.uri+"/iot-open/sign/certification", nil)
if err != nil {
return nil, err
}
req.Header.Set("accessKey", c.accessKey)
req.Header.Set("nonce", nonce)
req.Header.Set("timestamp", timestamp)
req.Header.Set("sign", sign)
resp, err := c.http.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("certification: http %d: %s", resp.StatusCode, string(body))
}
var cert ecoflowCertResponse
if err := json.Unmarshal(body, &cert); err != nil {
return nil, fmt.Errorf("certification: %w (body=%s)", err, string(body))
}
if cert.Code != "0" {
return nil, fmt.Errorf("certification: code=%s message=%s", cert.Code, cert.Message)
}
return &cert, nil
}
// ensureStarted lazily performs the HTTPS certification and opens a shared
// `plugin/mqtt.Client` on the first device registration. Subsequent calls
// are no-ops.
func (c *ecoflowMqttClient) ensureStarted(ctx context.Context) error {
c.startMu.Lock()
defer c.startMu.Unlock()
c.mu.Lock()
started := c.client != nil
c.mu.Unlock()
if started {
return nil
}
cert, err := c.certify(ctx)
if err != nil {
return err
}
port, err := strconv.Atoi(cert.Data.Port)
if err != nil {
return fmt.Errorf("invalid mqtt port %q: %w", cert.Data.Port, err)
}
// Stable client id that is also unique per host. EcoFlow's broker
// permits only a single concurrent connection per client id and imposes
// a 10 unique-ID/day limit per account. Hashing (accessKey + hostname)
// keeps the id stable across restarts while allowing multiple evcc
// instances (e.g. prod + dev) to coexist on the same access key.
hostname, _ := os.Hostname()
h := sha256.Sum256([]byte(c.accessKey + "|" + hostname))
clientID := "evcc-" + hex.EncodeToString(h[:6])
// EcoFlow's certification response uses the `ssl` scheme which is
// paho's alias for TLS. plugin/mqtt specifically recognises the
// `tls://` prefix and preserves it through its DefaultPort helper;
// normalise to that form here.
broker := fmt.Sprintf("tls://%s:%d", cert.Data.URL, port)
client, err := mqtt.NewClient(c.log, broker,
cert.Data.CertificateAccount, cert.Data.CertificatePassword,
clientID, 1, false, "", "", "")
if err != nil {
return fmt.Errorf("mqtt connect: %w", err)
}
c.mu.Lock()
c.client = client
c.username = cert.Data.CertificateAccount
c.mu.Unlock()
return nil
}
// Register subscribes the client to live updates for the given device serial.
// Safe to call multiple times; extra calls are no-ops.
//
// Re-subscription on reconnect is handled automatically by `plugin/mqtt`'s
// ConnectionHandler, which replays every `Listen`ed topic.
func (c *ecoflowMqttClient) Register(ctx context.Context, sn string) error {
if err := c.ensureStarted(ctx); err != nil {
return err
}
c.mu.Lock()
_, existed := c.devices[sn]
if !existed {
c.devices[sn] = struct{}{}
}
client := c.client
username := c.username
c.mu.Unlock()
if existed {
return nil
}
topic := fmt.Sprintf("/open/%s/%s/quota", username, sn)
return client.Listen(topic, func(payload string) {
c.handleMessage(sn, []byte(payload))
})
}
// handleMessage parses a quota payload and updates the cache. Payload shapes
// observed on EcoFlow public MQTT:
//
// 1. Stream devices emit a flat JSON object with parameters at the top
// level, e.g. {"powGetPv2":214.14,"gridConnectionPower":283.99}.
// 2. Other devices wrap updates in a {"params": {...}} (or "param") envelope
// and may include metadata like timestamp/typeCode at the top level.
//
// We accept both shapes: if "params"/"param" is present we take it, otherwise
// we merge the top-level object as-is (ignoring a few well-known metadata
// fields that are not quota parameters).
func (c *ecoflowMqttClient) handleMessage(sn string, payload []byte) {
var raw map[string]any
if err := json.Unmarshal(payload, &raw); err != nil {
c.log.TRACE.Printf("ignoring non-JSON payload on %s: %v", sn, err)
return
}
c.mu.Lock()
defer c.mu.Unlock()
dev := c.state[sn]
if dev == nil {
dev = make(map[string]any)
c.state[sn] = dev
}
var merged bool
if p, ok := raw["params"].(map[string]any); ok {
maps.Copy(dev, p)
merged = true
}
if p, ok := raw["param"].(map[string]any); ok {
maps.Copy(dev, p)
merged = true
}
if !merged {
// flat payload: treat top-level keys as parameters
for k, v := range raw {
switch k {
case "id", "version", "timestamp", "typeCode", "cmdFunc", "cmdId":
// skip message envelope fields
continue
}
dev[k] = v
}
}
}
// Lookup returns the most recently cached MQTT value for the given serial and
// parameter name. The boolean is false if either the device has not been seen
// yet or the key has not been reported.
func (c *ecoflowMqttClient) Lookup(sn, key string) (any, bool) {
c.mu.Lock()
defer c.mu.Unlock()
dev, ok := c.state[sn]
if !ok {
return nil, false
}
v, ok := dev[key]
return v, ok
}