feat: Add OCPP forwarder#29154
Conversation
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- In
forwarder.go,var wg sync.WaitGroupis used withwg.Go(...), butsync.WaitGrouphas noGomethod; you likely want towg.Add(1)andgo func(){ ...; wg.Done() }()for each goroutine and thenwg.Wait(). ApplyForwarderRulesupdates the package-levelforwarderRules, but the runningocppForwarderhandler keeps its ownrulesslice initialized only once inStartForwarder; this means rule changes at runtime won't affect routing until restart—consider either reading from the sharedforwarderRulesinsideresolveRuleor makingApplyForwarderRulesupdate the handler's rules.- The forwarder lifecycle flags and globals (
forwarderStarted,forwarderRules,forwarderRegistry_,forwarderUpdatedCb) are mutated from different call sites without synchronization; if configuration updates can happen concurrently, it would be safer to guard these with a mutex or otherwise ensure single-threaded access to avoid data races.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `forwarder.go`, `var wg sync.WaitGroup` is used with `wg.Go(...)`, but `sync.WaitGroup` has no `Go` method; you likely want to `wg.Add(1)` and `go func(){ ...; wg.Done() }()` for each goroutine and then `wg.Wait()`.
- `ApplyForwarderRules` updates the package-level `forwarderRules`, but the running `ocppForwarder` handler keeps its own `rules` slice initialized only once in `StartForwarder`; this means rule changes at runtime won't affect routing until restart—consider either reading from the shared `forwarderRules` inside `resolveRule` or making `ApplyForwarderRules` update the handler's rules.
- The forwarder lifecycle flags and globals (`forwarderStarted`, `forwarderRules`, `forwarderRegistry_`, `forwarderUpdatedCb`) are mutated from different call sites without synchronization; if configuration updates can happen concurrently, it would be safer to guard these with a mutex or otherwise ensure single-threaded access to avoid data races.
## Individual Comments
### Comment 1
<location path="charger/ocpp/forwarder.go" line_range="295-297" />
<code_context>
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ var wg sync.WaitGroup
+
+ wg.Go(func() { forwarderReadFromCharger(ctx, cancel, s) })
+ if upstreamConn != nil {
+ wg.Go(func() { forwarderReadFromServer(ctx, cancel, s, upstreamConn, "upstream") })
</code_context>
<issue_to_address>
**issue (bug_risk):** sync.WaitGroup is used incorrectly; it has no Go method, so this will not compile and goroutines won’t be waited for
You’ll need to switch to the standard pattern using Add/Done with explicit goroutines, e.g.:
```go
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
forwarderReadFromCharger(ctx, cancel, s)
}()
if upstreamConn != nil {
wg.Add(1)
go func() {
defer wg.Done()
forwarderReadFromServer(ctx, cancel, s, upstreamConn, "upstream")
}()
}
```
This way the goroutines run and `wg.Wait()` can correctly block on their completion.
</issue_to_address>
### Comment 2
<location path="charger/ocpp/instance.go" line_range="97-99" />
<code_context>
+// ApplyForwarderRules updates the forwarding rules at runtime and starts the forwarder
+// if not already started. Safe to call from config update handlers.
+func ApplyForwarderRules(rules []ForwarderRule) {
+ forwarderRules = rules
+ StartForwarder()
+}
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Forwarder rules updates after startup don’t affect existing forwarder instance because the slice header is replaced
Because `StartForwarder` copies `forwarderRules` into `ocppForwarder.rules`, subsequent calls to `ApplyForwarderRules` only replace the global slice header; the running forwarder continues using the old rules and never sees updates.
You could either:
- have `ocppForwarder` read `forwarderRules` directly (with appropriate locking), or
- keep a reference to the singleton `ocppForwarder` and update its `rules` field under a lock instead of reassigning the global slice.
</issue_to_address>
### Comment 3
<location path="charger/ocpp/forwarder.go" line_range="71" />
<code_context>
+ evcc *websocket.Conn // local evcc central system
+
+ mu sync.Mutex
+ msgOwner map[string]string // msgID → "upstream" | "evcc"
+ lastSeen time.Time
+}
</code_context>
<issue_to_address>
**issue (complexity):** Consider replacing string-based message ownership with a small typed enum and refactoring the watchdog to avoid nested locks for clearer, safer session handling.
One concrete simplification that keeps behavior but reduces complexity is to tighten the message ownership tracking and the watchdog logic.
### 1. Replace stringly-typed `msgOwner` with a small enum
Using `"upstream"` / `"evcc"` as magic strings invites typos and makes call sites harder to follow. You can keep the same logic with a typed enum and helpers:
```go
type owner int
const (
ownerUnknown owner = iota
ownerUpstream
ownerEvcc
)
func (o owner) String() string {
switch o {
case ownerUpstream:
return "upstream"
case ownerEvcc:
return "evcc"
default:
return "unknown"
}
}
```
Change the session field and helpers:
```go
type forwarderSession struct {
// ...
mu sync.Mutex
msgOwner map[string]owner // msgID → owner
lastSeen time.Time
}
func (s *forwarderSession) recordOwner(msgID string, owner owner) {
s.mu.Lock()
s.msgOwner[msgID] = owner
s.mu.Unlock()
}
func (s *forwarderSession) ownerOf(msgID string) (owner, bool) {
s.mu.Lock()
defer s.mu.Unlock()
v, ok := s.msgOwner[msgID]
return v, ok
}
```
Then the read loops become more explicit and safer:
```go
// charger → upstream/evcc
case ocppMsgCallResult, ocppMsgCallError:
if owner, ok := s.ownerOf(msgID); ok {
s.deleteOwner(msgID)
switch owner {
case ownerUpstream:
forwarderSend(s.upstream, msg, ownerUpstream.String(), s.id)
case ownerEvcc:
forwarderSend(s.evcc, msg, ownerEvcc.String(), s.id)
}
} else {
if s.upstream != nil {
forwarderSend(s.upstream, msg, ownerUpstream.String(), s.id)
} else {
forwarderSend(s.evcc, msg, ownerEvcc.String(), s.id)
}
}
```
```go
// server → charger
case ocppMsgCall:
if name == "upstream" {
s.recordOwner(msgID, ownerUpstream)
} else {
s.recordOwner(msgID, ownerEvcc)
}
forwarderSend(s.charger, msg, "charger", s.id)
```
This keeps behavior identical but removes string-based coupling.
### 2. Simplify watchdog lock usage
The watchdog currently holds `r.mu`, then locks `s.mu` and closes sockets inline. You can first collect stale sessions under `r.mu`, then close outside, avoiding nested locks and keeping the registry focused on tracking:
```go
func (r *forwarderRegistry) watchdog() {
for range time.Tick(watchdogPeriod / 2) {
deadline := time.Now().Add(-watchdogPeriod)
r.mu.Lock()
var stale []*forwarderSession
for id, s := range r.sessions {
s.mu.Lock()
staleSess := s.lastSeen.Before(deadline)
s.mu.Unlock()
if staleSess {
forwarderLog.WARN.Printf("closing stale session %s", id)
delete(r.sessions, id)
stale = append(stale, s)
}
}
r.mu.Unlock()
// Close sockets outside registry lock
for _, s := range stale {
_ = s.charger.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseGoingAway, "watchdog"))
s.charger.Close()
if s.upstream != nil {
_ = s.upstream.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseGoingAway, "watchdog"))
s.upstream.Close()
}
if s.evcc != nil {
_ = s.evcc.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseGoingAway, "watchdog"))
s.evcc.Close()
}
}
}
}
```
This keeps all functionality (same closing behavior and timing) but eliminates the lock layering and makes the watchdog’s responsibilities clearer.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| var wg sync.WaitGroup | ||
|
|
||
| wg.Go(func() { forwarderReadFromCharger(ctx, cancel, s) }) |
There was a problem hiding this comment.
issue (bug_risk): sync.WaitGroup is used incorrectly; it has no Go method, so this will not compile and goroutines won’t be waited for
You’ll need to switch to the standard pattern using Add/Done with explicit goroutines, e.g.:
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
forwarderReadFromCharger(ctx, cancel, s)
}()
if upstreamConn != nil {
wg.Add(1)
go func() {
defer wg.Done()
forwarderReadFromServer(ctx, cancel, s, upstreamConn, "upstream")
}()
}This way the goroutines run and wg.Wait() can correctly block on their completion.
| func ApplyForwarderRules(rules []ForwarderRule) { | ||
| forwarderRules = rules | ||
| StartForwarder() |
There was a problem hiding this comment.
issue (bug_risk): Forwarder rules updates after startup don’t affect existing forwarder instance because the slice header is replaced
Because StartForwarder copies forwarderRules into ocppForwarder.rules, subsequent calls to ApplyForwarderRules only replace the global slice header; the running forwarder continues using the old rules and never sees updates.
You could either:
- have
ocppForwarderreadforwarderRulesdirectly (with appropriate locking), or - keep a reference to the singleton
ocppForwarderand update itsrulesfield under a lock instead of reassigning the global slice.
| evcc *websocket.Conn // local evcc central system | ||
|
|
||
| mu sync.Mutex | ||
| msgOwner map[string]string // msgID → "upstream" | "evcc" |
There was a problem hiding this comment.
issue (complexity): Consider replacing string-based message ownership with a small typed enum and refactoring the watchdog to avoid nested locks for clearer, safer session handling.
One concrete simplification that keeps behavior but reduces complexity is to tighten the message ownership tracking and the watchdog logic.
1. Replace stringly-typed msgOwner with a small enum
Using "upstream" / "evcc" as magic strings invites typos and makes call sites harder to follow. You can keep the same logic with a typed enum and helpers:
type owner int
const (
ownerUnknown owner = iota
ownerUpstream
ownerEvcc
)
func (o owner) String() string {
switch o {
case ownerUpstream:
return "upstream"
case ownerEvcc:
return "evcc"
default:
return "unknown"
}
}Change the session field and helpers:
type forwarderSession struct {
// ...
mu sync.Mutex
msgOwner map[string]owner // msgID → owner
lastSeen time.Time
}
func (s *forwarderSession) recordOwner(msgID string, owner owner) {
s.mu.Lock()
s.msgOwner[msgID] = owner
s.mu.Unlock()
}
func (s *forwarderSession) ownerOf(msgID string) (owner, bool) {
s.mu.Lock()
defer s.mu.Unlock()
v, ok := s.msgOwner[msgID]
return v, ok
}Then the read loops become more explicit and safer:
// charger → upstream/evcc
case ocppMsgCallResult, ocppMsgCallError:
if owner, ok := s.ownerOf(msgID); ok {
s.deleteOwner(msgID)
switch owner {
case ownerUpstream:
forwarderSend(s.upstream, msg, ownerUpstream.String(), s.id)
case ownerEvcc:
forwarderSend(s.evcc, msg, ownerEvcc.String(), s.id)
}
} else {
if s.upstream != nil {
forwarderSend(s.upstream, msg, ownerUpstream.String(), s.id)
} else {
forwarderSend(s.evcc, msg, ownerEvcc.String(), s.id)
}
}// server → charger
case ocppMsgCall:
if name == "upstream" {
s.recordOwner(msgID, ownerUpstream)
} else {
s.recordOwner(msgID, ownerEvcc)
}
forwarderSend(s.charger, msg, "charger", s.id)This keeps behavior identical but removes string-based coupling.
2. Simplify watchdog lock usage
The watchdog currently holds r.mu, then locks s.mu and closes sockets inline. You can first collect stale sessions under r.mu, then close outside, avoiding nested locks and keeping the registry focused on tracking:
func (r *forwarderRegistry) watchdog() {
for range time.Tick(watchdogPeriod / 2) {
deadline := time.Now().Add(-watchdogPeriod)
r.mu.Lock()
var stale []*forwarderSession
for id, s := range r.sessions {
s.mu.Lock()
staleSess := s.lastSeen.Before(deadline)
s.mu.Unlock()
if staleSess {
forwarderLog.WARN.Printf("closing stale session %s", id)
delete(r.sessions, id)
stale = append(stale, s)
}
}
r.mu.Unlock()
// Close sockets outside registry lock
for _, s := range stale {
_ = s.charger.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseGoingAway, "watchdog"))
s.charger.Close()
if s.upstream != nil {
_ = s.upstream.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseGoingAway, "watchdog"))
s.upstream.Close()
}
if s.evcc != nil {
_ = s.evcc.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseGoingAway, "watchdog"))
s.evcc.Close()
}
}
}
}This keeps all functionality (same closing behavior and timing) but eliminates the lock layering and makes the watchdog’s responsibilities clearer.
|
@webalexeu still trying to figure out what this actually does: forward any device message to the 2nd CS? How do you handle messages from those CS and how do you handle replies by that CS? Sorry if obvious, my question is really about "how should an OCPP proxy work". |
|
|
Thanks for the link, that already helps a lot. I like the idea of priorities (if this works in reality). I assume evcc would be the "primary server" in this sense and the billing backend the "secondary server"? I think it should even suffice to restrict this logic to core protocol only, but that would of course mean that charger responses (i.e. supported protocols) needs be modified. Maybe that's not necessary with this approach. Let me know if I can help. |
5aaa51a to
d164988
Compare
Yes indeed, evcc is the primary and the backend the secondary |
0559d6e to
19f7103
Compare
|
@andig OCPP proxying is called Local Controller in the OCPP specs; page 27. |
|
Just some food for thought - should EVCC abide by responses from the CSMS? Functionaltiy wise, this seems like it can forward to multiple backends - question being, which one is the source of truth for request/response acceptance? I think there should be two modes of operation - proxy only, which would discard responses and meaning EVCC's state is the source of truth; the 2nd one being the CSMS as the source of truth. The problem with 2nd approach if forwarding to multiple backends is you would need a distinct backend to be a source of truth for the charging state. |
|
In the general evcc case there is no other OCPP backend, so evcc is single source of truth. Afaiu this is not quite the "local controller" scenario, but nonetheless. That said the only scenario I'd support or see demand for in context of evcc is the proxy only one where evcc remains single source of truth? |
Agreed, I think for most cases, EVCC would not be installed in commercial context. But it would be nice to have both options. |
f92a57a to
f1580f9
Compare
3ef4c14 to
450dc58
Compare
| Database DB | ||
| Mqtt Mqtt | ||
| ModbusProxy []ModbusProxy | ||
| OcppForwarder []ocpp.ForwarderRule |
There was a problem hiding this comment.
Maybe it would be better if it was named OcppForwarders or OcppProxies
There was a problem hiding this comment.
Updated as proposed
| } | ||
|
|
||
| // ForwarderRules returns the current forwarding rules. | ||
| func ForwarderRules() []ForwarderRule { |
There was a problem hiding this comment.
Honestly, having global mutexes in Go is an anti pattern that leads to performance problems. I would suggest wrapping forwarders in their own struct.
150b462 to
093291e
Compare
2930444 to
2e3633b
Compare
|
Oh, this is amazing and something I've been anticipating for a little more than a year now! Upon a quick read of the code & the ocpp-2w-proxy docs, this would serve my use-cases perfectly. |
e1b23e0 to
65d1b13
Compare
01793f3 to
6a61bf6
Compare
Forward OCPP calls from chargers to an external upstream server (billing platform, DSO, etc.) in addition to evcc. Architecture: hybrid relay/sidecar model. Chargers connect directly to evcc on the normal OCPP port. For each charger with a matching rule a WebSocket sidecar connection is dialled to the upstream server. Bidirectional message routing based on call origin: - Billing-critical messages (Authorize, StartTransaction, StopTransaction, DataTransfer): evcc's OCPP handler is bypassed; the call is forwarded to upstream and upstream's CallResult/CallError is relayed back to the charger. This ensures the pay backend controls authorization and issues its own transaction IDs, so RemoteStop and StopTransaction always use consistent IDs. - Informational messages (BootNotification, StatusNotification, MeterValues, Heartbeat, …): forwarded to upstream and also processed by evcc (sidecar mode). evcc maintains its energy management state normally. - Upstream-initiated commands (RemoteStartTransaction, GetConfiguration, ChangeConfiguration, …): injected into the charger; the charger's CallResult/CallError is routed back to upstream. - Read-only mode: upstream can observe but cannot send commands; incoming calls are answered with a SecurityError. - Pre-connection buffer ensures BootNotification and other early frames reach upstream even when the sidecar dial races the charger connect. Buffered relay calls get a CallError if the upstream dial fails. - MeterValues throttling: upstream can set MeterValueSampleInterval via ChangeConfiguration; the forwarder intercepts it, applies the interval as a throttle for upstream forwarding, and does not forward the config change to the charger so evcc's own interval remains unchanged. - UI: two-box layout (Charger → Upstream) matching the Modbus proxy modal, with SelectGroup for the read-only option under advanced settings. - Status badge on the upstream URL reflects live sidecar session state. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
6a61bf6 to
e20e968
Compare


Forward OCPP calls from chargers to an external upstream server (billing platform, DSO, etc.) in addition to evcc.
fixes #28496