Skip to content

Commit 67b610c

Browse files
committed
Include new callback and context logic in v1.6 and v2.0.1 charging stations
Signed-off-by: Lorenzo <[email protected]>
1 parent a78d542 commit 67b610c

File tree

6 files changed

+145
-111
lines changed

6 files changed

+145
-111
lines changed

ocpp1.6/charge_point.go

Lines changed: 40 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package ocpp16
22

33
import (
4+
"context"
45
"fmt"
56
"reflect"
67

7-
"github.com/lorenzodonini/ocpp-go/internal/callbackqueue"
88
"github.com/lorenzodonini/ocpp-go/ocpp"
99
"github.com/lorenzodonini/ocpp-go/ocpp1.6/certificates"
1010
"github.com/lorenzodonini/ocpp-go/ocpp1.6/core"
@@ -47,10 +47,15 @@ func (cp *chargePoint) error(err error) {
4747
}
4848
}
4949

50-
// Callback invoked whenever a queued request is canceled, due to timeout.
51-
// By default, the callback returns a GenericError to the caller, who sent the original request.
52-
func (cp *chargePoint) onRequestTimeout(_ string, _ ocpp.Request, err *ocpp.Error) {
53-
cp.errorHandler <- err
50+
// Function invoked whenever a Response or an error is received for a request.
51+
// The callback includes the user-defined callback function, which may be used to handle the response accordingly.
52+
// The identifier of the original request is attached to the function as reference.
53+
func (cp *chargePoint) onResponse(requestID string, response ocpp.Response, err error, callback ocpp.Callback) {
54+
if callback == nil {
55+
cp.error(fmt.Errorf("no callback defined for confirmation to request %v, dropping confirmation", requestID))
56+
return
57+
}
58+
callback(response, err)
5459
}
5560

5661
// Errors returns a channel for error messages. If it doesn't exist it es created.
@@ -284,39 +289,54 @@ func (cp *chargePoint) SetCertificateHandler(handler certificates.ChargePointHan
284289
}
285290

286291
func (cp *chargePoint) SendRequest(request ocpp.Request) (ocpp.Response, error) {
292+
return cp.SendRequestWithContext(request, context.TODO())
293+
}
294+
295+
func (cp *chargePoint) SendRequestWithContext(request ocpp.Request, ctx context.Context) (ocpp.Response, error) {
287296
featureName := request.GetFeatureName()
288297
if _, found := cp.client.GetProfileForFeature(featureName); !found {
289298
return nil, fmt.Errorf("feature %v is unsupported on charge point (missing profile), cannot send request", featureName)
290299
}
291-
300+
if ctx == nil {
301+
ctx = context.TODO()
302+
}
292303
// Wraps an asynchronous response
293304
type asyncResponse struct {
294305
r ocpp.Response
295306
e error
296307
}
297-
// Create channel and pass it to a callback function, for retrieving asynchronous response
308+
// Create channel and use it within a callback function, for retrieving asynchronous response
298309
asyncResponseC := make(chan asyncResponse, 1)
299-
send := func() error {
300-
return cp.client.SendRequest(request)
310+
callback := func(response ocpp.Response, err error) {
311+
if ctx.Err() != nil {
312+
// Request was canceled already, ignore callback.
313+
// Confirmation will be handled by select switch below.
314+
return
315+
}
316+
asyncResponseC <- asyncResponse{response, err}
301317
}
302-
err := cp.callbacks.TryQueue("main", send, func(confirmation ocpp.Response, err error) {
303-
asyncResponseC <- asyncResponse{r: confirmation, e: err}
304-
})
318+
// Send request, then start blocking wait for asynchronous response
319+
_, err := cp.client.SendRequest(request, callback, ctx)
305320
if err != nil {
306321
return nil, err
307322
}
308323
select {
309324
case asyncResult, ok := <-asyncResponseC:
310325
if !ok {
311-
return nil, fmt.Errorf("internal error while receiving result for %v request", request.GetFeatureName())
326+
return nil, fmt.Errorf("internal error while receiving result for %s request", request.GetFeatureName())
312327
}
313328
return asyncResult.r, asyncResult.e
314-
case <-cp.stopC:
315-
return nil, fmt.Errorf("client stopped while waiting for response to %v", request.GetFeatureName())
329+
case <-ctx.Done():
330+
// Request was canceled by user. Return an error and don't handle callbacks any longer
331+
return nil, fmt.Errorf("request timed out/canceled by user. Any incoming response will be ignored")
316332
}
317333
}
318334

319-
func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error {
335+
func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback ocpp.Callback) error {
336+
return cp.SendRequestAsyncWithContext(request, callback, context.TODO())
337+
}
338+
339+
func (cp *chargePoint) SendRequestAsyncWithContext(request ocpp.Request, callback ocpp.Callback, ctx context.Context) error {
320340
featureName := request.GetFeatureName()
321341
if _, found := cp.client.GetProfileForFeature(featureName); !found {
322342
return fmt.Errorf("feature %v is unsupported on charge point (missing profile), cannot send request", featureName)
@@ -331,51 +351,14 @@ func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(conf
331351
default:
332352
return fmt.Errorf("unsupported action %v on charge point, cannot send request", featureName)
333353
}
334-
// Response will be retrieved asynchronously via asyncHandler
335-
send := func() error {
336-
return cp.client.SendRequest(request)
354+
if ctx == nil {
355+
ctx = context.TODO()
337356
}
338-
err := cp.callbacks.TryQueue("main", send, callback)
357+
// Response will be handled asynchronously and passed to the callback function
358+
_, err := cp.client.SendRequest(request, callback, ctx)
339359
return err
340360
}
341361

342-
func (cp *chargePoint) asyncCallbackHandler() {
343-
for {
344-
select {
345-
case confirmation := <-cp.confirmationHandler:
346-
// Get and invoke callback
347-
if callback, ok := cp.callbacks.Dequeue("main"); ok {
348-
callback(confirmation, nil)
349-
} else {
350-
err := fmt.Errorf("no handler available for incoming response %v", confirmation.GetFeatureName())
351-
cp.error(err)
352-
}
353-
case protoError := <-cp.errorHandler:
354-
// Get and invoke callback
355-
if callback, ok := cp.callbacks.Dequeue("main"); ok {
356-
callback(nil, protoError)
357-
} else {
358-
err := fmt.Errorf("no handler available for error %v", protoError.Error())
359-
cp.error(err)
360-
}
361-
case <-cp.stopC:
362-
// Handler stopped, cleanup callbacks.
363-
// No callback invocation, since the user manually stopped the client.
364-
cp.clearCallbacks(false)
365-
return
366-
}
367-
}
368-
}
369-
370-
func (cp *chargePoint) clearCallbacks(invokeCallback bool) {
371-
for cb, ok := cp.callbacks.Dequeue("main"); ok; cb, ok = cp.callbacks.Dequeue("main") {
372-
if invokeCallback {
373-
err := ocpp.NewError(ocppj.GenericError, "client stopped, no response received from server", "")
374-
cb(nil, err)
375-
}
376-
}
377-
}
378-
379362
func (cp *chargePoint) sendResponse(confirmation ocpp.Response, err error, requestId string) {
380363
if err != nil {
381364
// Send error response

ocpp1.6/v16.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
package ocpp16
33

44
import (
5+
"context"
56
"crypto/tls"
67
"net"
78

8-
"github.com/lorenzodonini/ocpp-go/internal/callbackqueue"
99
"github.com/lorenzodonini/ocpp-go/ocpp"
1010
"github.com/lorenzodonini/ocpp-go/ocpp1.6/certificates"
1111
"github.com/lorenzodonini/ocpp-go/ocpp1.6/core"
@@ -110,11 +110,30 @@ type ChargePoint interface {
110110
//
111111
// The request is synchronous blocking.
112112
SendRequest(request ocpp.Request) (ocpp.Response, error)
113+
// Sends a request to the central system.
114+
// The central system will respond with a confirmation, or with an error if the request was invalid or could not be processed.
115+
// In case of network issues (i.e. the remote host couldn't be reached), the function also returns an error.
116+
//
117+
// An optional context can be passed to the function. If the context is canceled before the request is completed,
118+
// the function returns an error.
119+
//
120+
// The request is synchronous blocking.
121+
SendRequestWithContext(request ocpp.Request, ctx context.Context) (ocpp.Response, error)
113122
// Sends an asynchronous request to the central system.
114-
// The central system will respond with a confirmation messages, or with an error if the request was invalid or could not be processed.
123+
// The central system will reply with a confirmation message, or with an error if the request was invalid or could not be processed.
115124
// This result is propagated via a callback, called asynchronously.
125+
//
126+
// In case of network issues (i.e. the remote host couldn't be reached), the function returns an error directly. In this case, the callback is never called.
127+
SendRequestAsync(request ocpp.Request, callback ocpp.Callback) error
128+
// Sends an asynchronous request to the central system.
129+
// The central system will reply with a confirmation message, or with an error if the request was invalid or could not be processed.
130+
// This result is propagated via a callback, called asynchronously.
131+
//
116132
// In case of network issues (i.e. the remote host couldn't be reached), the function returns an error directly. In this case, the callback is never called.
117-
SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
133+
//
134+
// An optional context can be passed to the function. If the context is canceled before the request is completed,
135+
// the callback is invoked with an error.
136+
SendRequestAsyncWithContext(request ocpp.Request, callback ocpp.Callback, ctx context.Context) error
118137
// Connects to the central system and starts the charge point routine.
119138
// The function doesn't block and returns right away, after having attempted to open a connection to the central system.
120139
// If the connection couldn't be opened, an error is returned.
@@ -204,6 +223,12 @@ func NewChargePoint(id string, endpoint *ocppj.Client, client ws.Client) ChargeP
204223
cp.errorHandler <- err
205224
})
206225
cp.client.SetRequestHandler(cp.handleIncomingRequest)
226+
// Callback invoked by ocppj layer, whenever:
227+
// - a response/error is received
228+
// - an internal error occurs
229+
// - the request is cancelled
230+
// - the request times out
231+
cp.client.SetResponseHandler(cp.onResponse)
207232
return &cp
208233
}
209234

ocpp2.0.1/charging_station.go

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package ocpp2
22

33
import (
4+
"context"
45
"fmt"
56
"reflect"
67

7-
"github.com/lorenzodonini/ocpp-go/internal/callbackqueue"
88
"github.com/lorenzodonini/ocpp-go/ocpp"
99
"github.com/lorenzodonini/ocpp-go/ocpp2.0.1/authorization"
1010
"github.com/lorenzodonini/ocpp-go/ocpp2.0.1/availability"
@@ -44,9 +44,6 @@ type chargingStation struct {
4444
diagnosticsHandler diagnostics.ChargingStationHandler
4545
displayHandler display.ChargingStationHandler
4646
dataHandler data.ChargingStationHandler
47-
responseHandler chan ocpp.Response
48-
errorHandler chan error
49-
callbacks callbackqueue.CallbackQueue
5047
stopC chan struct{}
5148
errC chan error // external error channel
5249
}
@@ -65,10 +62,15 @@ func (cs *chargingStation) Errors() <-chan error {
6562
return cs.errC
6663
}
6764

68-
// Callback invoked whenever a queued request is canceled, due to timeout.
69-
// By default, the callback returns a GenericError to the caller, who sent the original request.
70-
func (cs *chargingStation) onRequestTimeout(_ string, _ ocpp.Request, err *ocpp.Error) {
71-
cs.errorHandler <- err
65+
// Function invoked whenever a Response or an error is received for a request.
66+
// The callback includes the user-defined callback function, which may be used to handle the response accordingly.
67+
// A reference to the original request is attached to the function, so that the user may identify the request.
68+
func (cs *chargingStation) onResponse(requestID string, response ocpp.Response, err error, callback ocpp.Callback) {
69+
if callback == nil {
70+
cs.error(fmt.Errorf("no callback defined for response to request %v, dropping response", requestID))
71+
return
72+
}
73+
callback(response, err)
7274
}
7375

7476
func (cs *chargingStation) BootNotification(reason provisioning.BootReason, model string, vendor string, props ...func(request *provisioning.BootNotificationRequest)) (*provisioning.BootNotificationResponse, error) {
@@ -461,10 +463,17 @@ func (cs *chargingStation) SetDataHandler(handler data.ChargingStationHandler) {
461463
}
462464

463465
func (cs *chargingStation) SendRequest(request ocpp.Request) (ocpp.Response, error) {
466+
return cs.SendRequestWithContext(request, context.TODO())
467+
}
468+
469+
func (cs *chargingStation) SendRequestWithContext(request ocpp.Request, ctx context.Context) (ocpp.Response, error) {
464470
featureName := request.GetFeatureName()
465471
if _, found := cs.client.GetProfileForFeature(featureName); !found {
466472
return nil, fmt.Errorf("feature %v is unsupported on charging station (missing profile), cannot send request", featureName)
467473
}
474+
if ctx == nil {
475+
ctx = context.TODO()
476+
}
468477

469478
// Wraps an asynchronous response
470479
type asyncResponse struct {
@@ -473,23 +482,36 @@ func (cs *chargingStation) SendRequest(request ocpp.Request) (ocpp.Response, err
473482
}
474483
// Create channel and pass it to a callback function, for retrieving asynchronous response
475484
asyncResponseC := make(chan asyncResponse, 1)
476-
send := func() error {
477-
return cs.client.SendRequest(request)
485+
callback := func(response ocpp.Response, err error) {
486+
if ctx.Err() != nil {
487+
// Request was canceled already, ignore callback.
488+
// Response will be handled by select statement below.
489+
return
490+
}
491+
asyncResponseC <- asyncResponse{response, err}
478492
}
479-
err := cs.callbacks.TryQueue("main", send, func(confirmation ocpp.Response, err error) {
480-
asyncResponseC <- asyncResponse{r: confirmation, e: err}
481-
})
493+
// Send request, then start blocking wait for asynchronous response
494+
_, err := cs.client.SendRequest(request, callback, ctx)
482495
if err != nil {
483496
return nil, err
484497
}
485-
asyncResult, ok := <-asyncResponseC
486-
if !ok {
487-
return nil, fmt.Errorf("internal error while receiving result for %v request", request.GetFeatureName())
498+
select {
499+
case asyncResult, ok := <-asyncResponseC:
500+
if !ok {
501+
return nil, fmt.Errorf("internal error while receiving result for %s request", request.GetFeatureName())
502+
}
503+
return asyncResult.r, asyncResult.e
504+
case <-ctx.Done():
505+
// Request was canceled by user. Return an error and don't handle callbacks any longer
506+
return nil, fmt.Errorf("request timed out/canceled by user. Any incoming response will be ignored")
488507
}
489-
return asyncResult.r, asyncResult.e
490508
}
491509

492-
func (cs *chargingStation) SendRequestAsync(request ocpp.Request, callback func(response ocpp.Response, err error)) error {
510+
func (cs *chargingStation) SendRequestAsync(request ocpp.Request, callback ocpp.Callback) error {
511+
return cs.SendRequestAsyncWithContext(request, callback, context.TODO())
512+
}
513+
514+
func (cs *chargingStation) SendRequestAsyncWithContext(request ocpp.Request, callback ocpp.Callback, ctx context.Context) error {
493515
featureName := request.GetFeatureName()
494516
if _, found := cs.client.GetProfileForFeature(featureName); !found {
495517
return fmt.Errorf("feature %v is unsupported on charging station (missing profile), cannot send request", featureName)
@@ -524,37 +546,14 @@ func (cs *chargingStation) SendRequestAsync(request ocpp.Request, callback func(
524546
default:
525547
return fmt.Errorf("unsupported action %v on charging station, cannot send request", featureName)
526548
}
527-
// Response will be retrieved asynchronously via asyncHandler
528-
send := func() error {
529-
return cs.client.SendRequest(request)
549+
if ctx == nil {
550+
ctx = context.TODO()
530551
}
531-
err := cs.callbacks.TryQueue("main", send, callback)
552+
// Response will be handled asynchronously and passed to the callback function
553+
_, err := cs.client.SendRequest(request, callback, ctx)
532554
return err
533555
}
534556

535-
func (cs *chargingStation) asyncCallbackHandler() {
536-
for {
537-
select {
538-
case confirmation := <-cs.responseHandler:
539-
// Get and invoke callback
540-
if callback, ok := cs.callbacks.Dequeue("main"); ok {
541-
callback(confirmation, nil)
542-
} else {
543-
cs.error(fmt.Errorf("no callback available for incoming response %v", confirmation.GetFeatureName()))
544-
}
545-
case protoError := <-cs.errorHandler:
546-
// Get and invoke callback
547-
if callback, ok := cs.callbacks.Dequeue("main"); ok {
548-
callback(nil, protoError)
549-
} else {
550-
cs.error(fmt.Errorf("no callback available for incoming error %w", protoError))
551-
}
552-
case <-cs.stopC:
553-
return
554-
}
555-
}
556-
}
557-
558557
func (cs *chargingStation) sendResponse(response ocpp.Response, err error, requestId string) {
559558
if err != nil {
560559
// Send error response

0 commit comments

Comments
 (0)