Skip to content

Commit b6e76d3

Browse files
committed
feat: add dispatcher metrics
1 parent cf2f2b4 commit b6e76d3

File tree

4 files changed

+128
-37
lines changed

4 files changed

+128
-37
lines changed

ocppj/dispatcher.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package ocppj
33
import (
44
"context"
55
"fmt"
6+
"go.opentelemetry.io/otel/metric"
67
"sync"
78
"time"
89

@@ -373,6 +374,7 @@ type DefaultServerDispatcher struct {
373374
onRequestCancel CanceledRequestHandler
374375
network ws.Server
375376
mutex sync.RWMutex
377+
metrics *dispatcherMetrics
376378
}
377379

378380
// Handler function to be invoked when a request gets canceled (either due to timeout or to other external factors).
@@ -389,14 +391,25 @@ func (c clientTimeoutContext) isActive() bool {
389391
}
390392

391393
// NewDefaultServerDispatcher creates a new DefaultServerDispatcher struct.
392-
func NewDefaultServerDispatcher(queueMap ServerQueueMap) *DefaultServerDispatcher {
394+
func NewDefaultServerDispatcher(queueMap ServerQueueMap, provider metric.MeterProvider) *DefaultServerDispatcher {
395+
dispatcherMetrics, err := newDispatcherMetrics(provider)
396+
if err != nil {
397+
log.Errorf("failed to create dispatcher metrics: %v", err)
398+
return nil
399+
}
400+
393401
d := &DefaultServerDispatcher{
394402
queueMap: queueMap,
395403
requestChannel: nil,
396404
readyForDispatch: make(chan string, 1),
397405
timeout: defaultMessageTimeout,
406+
metrics: dispatcherMetrics,
398407
}
399408
d.pendingRequestState = NewServerState(&d.mutex)
409+
410+
dispatcherMetrics.ObserveQueues(queueMap)
411+
dispatcherMetrics.ObserveInFlightRequests(d.pendingRequestState.(*serverState))
412+
400413
return d
401414
}
402415

ocppj/dispatcher_metrics.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package ocppj
2+
3+
import (
4+
"context"
5+
"github.com/pkg/errors"
6+
"go.opentelemetry.io/otel/attribute"
7+
"go.opentelemetry.io/otel/metric"
8+
)
9+
10+
const (
11+
dispatcherQueueSize = "dispatcher_queue_size"
12+
dispatcherPendingRequest = "dispatcher_pending_requests"
13+
)
14+
15+
type dispatcherMetrics struct {
16+
meter metric.Meter
17+
18+
requestQueue metric.Int64ObservableGauge
19+
pendingRequests metric.Int64ObservableUpDownCounter
20+
}
21+
22+
func newDispatcherMetrics(meterProvider metric.MeterProvider) (*dispatcherMetrics, error) {
23+
if meterProvider == nil {
24+
return nil, errors.New("meterProvider is nil")
25+
}
26+
27+
meter := meterProvider.Meter("server_dispatcher")
28+
29+
clientQueue, err := meter.Int64ObservableGauge(
30+
dispatcherQueueSize,
31+
metric.WithDescription("Number of messages in the dispatcher's queue"),
32+
)
33+
if err != nil {
34+
return nil, err
35+
}
36+
37+
clientPendingRequest, err := meter.Int64ObservableUpDownCounter(
38+
dispatcherPendingRequest,
39+
metric.WithDescription("Number of pending requests in the dispatcher"),
40+
)
41+
if err != nil {
42+
return nil, err
43+
}
44+
45+
dispatcher := &dispatcherMetrics{
46+
meter: meter,
47+
requestQueue: clientQueue,
48+
pendingRequests: clientPendingRequest,
49+
}
50+
51+
return dispatcher, nil
52+
}
53+
54+
func (d *dispatcherMetrics) ObserveInFlightRequests(state *serverState) {
55+
if d.meter == nil {
56+
return
57+
}
58+
59+
_, err := d.meter.RegisterCallback(
60+
func(ctx context.Context, obs metric.Observer) error {
61+
state.mutex.RLock()
62+
currentState := state.pendingRequestState
63+
state.mutex.RUnlock()
64+
65+
for clientID, clientState := range currentState {
66+
inFlightRequest := int64(0)
67+
if clientState.HasPendingRequest() {
68+
inFlightRequest = 1
69+
}
70+
obs.ObserveInt64(
71+
d.pendingRequests,
72+
inFlightRequest,
73+
metric.WithAttributes(attribute.String("client_id", clientID)),
74+
)
75+
}
76+
return nil
77+
},
78+
d.pendingRequests)
79+
if err != nil {
80+
log.Errorf("failed to register callback for inflight queue size: %v", err)
81+
}
82+
}
83+
84+
func (d *dispatcherMetrics) ObserveQueues(queue ServerQueueMap) {
85+
if d.meter == nil {
86+
return
87+
}
88+
89+
_, err := d.meter.RegisterCallback(
90+
func(ctx context.Context, obs metric.Observer) error {
91+
for clientID, queueSize := range queue.SizePerClient() {
92+
obs.ObserveInt64(
93+
d.requestQueue,
94+
int64(queueSize),
95+
metric.WithAttributes(attribute.String("client_id", clientID)),
96+
)
97+
}
98+
return nil
99+
},
100+
d.requestQueue)
101+
if err != nil {
102+
log.Errorf("failed to register callback for dispatcher queue size: %v", err)
103+
}
104+
}

ocppj/server.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,9 @@ func NewServer(wsServer ws.Server, dispatcher ServerDispatcher, stateHandler Ser
5454
}
5555

5656
if dispatcher == nil {
57-
perClientQueue := NewFIFOQueueMap(0)
58-
59-
// Observe queue lengths
60-
metrics.ObserveRequestQueue(context.Background(), perClientQueue)
61-
62-
dispatcher = NewDefaultServerDispatcher(perClientQueue)
57+
dispatcher = NewDefaultServerDispatcher(NewFIFOQueueMap(0), meterProvider)
6358
}
59+
6460
if stateHandler == nil {
6561
d, ok := dispatcher.(*DefaultServerDispatcher)
6662
if !ok {
@@ -69,9 +65,11 @@ func NewServer(wsServer ws.Server, dispatcher ServerDispatcher, stateHandler Ser
6965
stateHandler = d.pendingRequestState
7066
}
7167
}
68+
7269
if wsServer == nil {
7370
wsServer = ws.NewServer()
7471
}
72+
7573
dispatcher.SetNetworkServer(wsServer)
7674
dispatcher.SetPendingRequestState(stateHandler)
7775

ocppj/server_metrics.go

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
const (
1313
requestsInboundMetric = "ocpp_requests_inbound"
1414
requestsOutboundMetric = "ocpp_requests_outbound"
15-
requestQueueMetric = "ocpp_request_queue_size"
1615
)
1716

1817
const (
@@ -33,10 +32,9 @@ var (
3332
)
3433

3534
type ocppMetrics struct {
36-
requestsIn metric.Int64Histogram
37-
requestsOut metric.Int64Histogram
38-
requestQueueMetric metric.Int64ObservableGauge
39-
meter metric.Meter
35+
requestsIn metric.Int64Histogram
36+
requestsOut metric.Int64Histogram
37+
meter metric.Meter
4038
}
4139

4240
// newOcppServerMetrics Creates a new metrics instance
@@ -66,19 +64,10 @@ func newOcppServerMetrics(meterProvider metric.MeterProvider, ocppVersion string
6664
return nil, errors.Wrap(err, fmt.Sprintf("failed to create %s metric", requestsOutboundMetric))
6765
}
6866

69-
queueSize, err := meter.Int64ObservableGauge(
70-
requestQueueMetric,
71-
metric.WithDescription("Current queue length"),
72-
)
73-
if err != nil {
74-
return nil, errors.Wrap(err, fmt.Sprintf("failed to create %s metric", requestQueueMetric))
75-
}
76-
7767
metrics := &ocppMetrics{
78-
requestsIn: requestsIn,
79-
requestsOut: requestsOut,
80-
requestQueueMetric: queueSize,
81-
meter: meter,
68+
requestsIn: requestsIn,
69+
requestsOut: requestsOut,
70+
meter: meter,
8271
}
8372
return metrics, nil
8473
}
@@ -118,16 +107,3 @@ func (m *ocppMetrics) IncrementOutboundRequests(ctx context.Context, chargePoint
118107
metricAttrs := metric.WithAttributes(attrs...)
119108
m.requestsOut.Record(ctx, 1, metricAttrs)
120109
}
121-
122-
func (m *ocppMetrics) ObserveRequestQueue(ctx context.Context, queueMap ServerQueueMap) {
123-
_, err := m.meter.RegisterCallback(
124-
func(ctx context.Context, observer metric.Observer) error {
125-
observer.ObserveInt64(m.requestQueueMetric, int64(queueMap.Size()))
126-
return nil
127-
},
128-
m.requestQueueMetric,
129-
)
130-
if err != nil {
131-
log.Error(err)
132-
}
133-
}

0 commit comments

Comments
 (0)