@@ -22,6 +22,9 @@ import (
2222 "sync/atomic"
2323 "time"
2424
25+ "github.com/scalyr/dataset-go/pkg/meter_config"
26+ "go.opentelemetry.io/otel/attribute"
27+
2528 "go.uber.org/zap"
2629
2730 "github.com/scalyr/dataset-go/pkg/buffer_config"
@@ -44,8 +47,10 @@ type Statistics struct {
4447 bytesAPISent atomic.Uint64
4548 bytesAPIAccepted atomic.Uint64
4649
47- meter * metric.Meter
48- logger * zap.Logger
50+ config * meter_config.MeterConfig
51+ meter * metric.Meter
52+ logger * zap.Logger
53+ attributes []attribute.KeyValue
4954
5055 cBuffersEnqueued metric.Int64UpDownCounter
5156 cBuffersProcessed metric.Int64UpDownCounter
@@ -67,7 +72,7 @@ type Statistics struct {
6772// NewStatistics creates structure to keep track of data processing.
6873// If meter is not nil, then Open Telemetry is used for collecting metrics
6974// as well.
70- func NewStatistics (meter * metric. Meter , logger * zap.Logger ) (* Statistics , error ) {
75+ func NewStatistics (config * meter_config. MeterConfig , logger * zap.Logger ) (* Statistics , error ) {
7176 logger .Info ("Initialising statistics" )
7277 statistics := & Statistics {
7378 buffersEnqueued : atomic.Uint64 {},
@@ -83,8 +88,10 @@ func NewStatistics(meter *metric.Meter, logger *zap.Logger) (*Statistics, error)
8388 bytesAPIAccepted : atomic.Uint64 {},
8489 bytesAPISent : atomic.Uint64 {},
8590
86- meter : meter ,
87- logger : logger ,
91+ config : config ,
92+ meter : nil ,
93+ logger : logger ,
94+ attributes : []attribute.KeyValue {},
8895 }
8996
9097 err := statistics .initMetrics ()
@@ -97,54 +104,70 @@ func key(key string) string {
97104}
98105
99106func (stats * Statistics ) initMetrics () error {
107+ // if there is no config, there is no need to initialise counters
108+ if stats .config == nil {
109+ stats .logger .Info ("OTel metrics WILL NOT be collected (MeterConfig is nil)" )
110+ return nil
111+ }
112+
113+ // update meter with config meter
114+ stats .meter = stats .config .Meter ()
100115 meter := stats .meter
116+
101117 // if there is no meter, there is no need to initialise counters
102118 if meter == nil {
103- stats .logger .Info ("OTel metrics WILL NOT be collected" )
119+ stats .logger .Info ("OTel metrics WILL NOT be collected (Meter is nil) " )
104120 return nil
105121 }
106122 stats .logger .Info ("OTel metrics WILL be collected" )
107123
124+ // set attributes so that we can track multiple instances
125+ stats .attributes = []attribute.KeyValue {
126+ {Key : "entity" , Value : attribute .StringValue (stats .config .Entity ())},
127+ {Key : "name" , Value : attribute .StringValue (stats .config .Name ())},
128+ }
129+ metric .WithAttributes (stats .attributes ... )
130+
108131 err := error (nil )
109- stats .cBuffersEnqueued , err = (* meter ).Int64UpDownCounter (key ("buffersEnqueued " ))
132+ stats .cBuffersEnqueued , err = (* meter ).Int64UpDownCounter (key ("buffers_enqueued " ))
110133 if err != nil {
111134 return err
112135 }
113- stats .cBuffersProcessed , err = (* meter ).Int64UpDownCounter (key ("buffersProcessed " ))
136+ stats .cBuffersProcessed , err = (* meter ).Int64UpDownCounter (key ("buffers_processed " ))
114137 if err != nil {
115138 return err
116139 }
117- stats .cBuffersDropped , err = (* meter ).Int64UpDownCounter (key ("buffersDropped " ))
140+ stats .cBuffersDropped , err = (* meter ).Int64UpDownCounter (key ("buffers_dropped " ))
118141 if err != nil {
119142 return err
120143 }
121- stats .cBuffersBroken , err = (* meter ).Int64UpDownCounter (key ("buffersBroken " ))
144+ stats .cBuffersBroken , err = (* meter ).Int64UpDownCounter (key ("buffers_broken " ))
122145 if err != nil {
123146 return err
124147 }
125148
126- stats .cEventsEnqueued , err = (* meter ).Int64UpDownCounter (key ("eventsEnqueued " ))
149+ stats .cEventsEnqueued , err = (* meter ).Int64UpDownCounter (key ("events_enqueued " ))
127150 if err != nil {
128151 return err
129152 }
130- stats .cEventsProcessed , err = (* meter ).Int64UpDownCounter (key ("eventsProcessed " ))
153+ stats .cEventsProcessed , err = (* meter ).Int64UpDownCounter (key ("events_processed " ))
131154 if err != nil {
132155 return err
133156 }
134- stats .cEventsDropped , err = (* meter ).Int64UpDownCounter (key ("eventsDropped " ))
157+ stats .cEventsDropped , err = (* meter ).Int64UpDownCounter (key ("events_dropped " ))
135158 if err != nil {
136159 return err
137160 }
138- stats .cEventsBroken , err = (* meter ).Int64UpDownCounter (key ("eventsBroken " ))
161+ stats .cEventsBroken , err = (* meter ).Int64UpDownCounter (key ("events_broken " ))
139162 if err != nil {
140163 return err
141164 }
142165
143- stats .cBytesAPISent , err = (* meter ).Int64UpDownCounter (key ("bytesAPISent " ))
166+ stats .cBytesAPISent , err = (* meter ).Int64UpDownCounter (key ("bytes_api_sent " ))
144167 if err != nil {
145168 return err
146169 }
147- stats .cBytesAPIAccepted , err = (* meter ).Int64UpDownCounter (key ("bytesAPIAccepted " ))
170+ stats .cBytesAPIAccepted , err = (* meter ).Int64UpDownCounter (key ("bytes_api_accepted " ))
148171 if err != nil {
149172 return err
150173 }
@@ -158,7 +181,7 @@ func (stats *Statistics) initMetrics() error {
158181 zap .Float64s ("buckets" , payloadBuckets ),
159182 )
160183 stats .hPayloadSize , err = (* meter ).Int64Histogram (key (
161- "payloadSize " ),
184+ "payload_size " ),
162185 metric .WithExplicitBucketBoundaries (payloadBuckets ... ),
163186 metric .WithUnit ("b" ),
164187 )
@@ -175,7 +198,7 @@ func (stats *Statistics) initMetrics() error {
175198 zap .Float64s ("buckets" , responseBuckets ),
176199 )
177200 stats .hResponseTime , err = (* meter ).Int64Histogram (key (
178- "responseTime " ),
201+ "response_time " ),
179202 metric .WithExplicitBucketBoundaries (responseBuckets ... ),
180203 metric .WithUnit ("ms" ),
181204 )
@@ -278,19 +301,31 @@ func (stats *Statistics) BytesAPIAcceptedAdd(i uint64) {
278301
279302func (stats * Statistics ) PayloadSizeRecord (payloadSizeInBytes int64 ) {
280303 if stats .hPayloadSize != nil {
281- stats .hPayloadSize .Record (context .Background (), payloadSizeInBytes )
304+ stats .hPayloadSize .Record (
305+ context .Background (),
306+ payloadSizeInBytes ,
307+ metric .WithAttributes (stats .attributes ... ),
308+ )
282309 }
283310}
284311
285312func (stats * Statistics ) ResponseTimeRecord (duration time.Duration ) {
286313 if stats .hResponseTime != nil {
287- stats .hResponseTime .Record (context .Background (), duration .Milliseconds ())
314+ stats .hResponseTime .Record (
315+ context .Background (),
316+ duration .Milliseconds (),
317+ metric .WithAttributes (stats .attributes ... ),
318+ )
288319 }
289320}
290321
291322func (stats * Statistics ) add (counter metric.Int64UpDownCounter , i uint64 ) {
292323 if counter != nil {
293- counter .Add (context .Background (), int64 (i ))
324+ counter .Add (
325+ context .Background (),
326+ int64 (i ),
327+ metric .WithAttributes (stats .attributes ... ),
328+ )
294329 }
295330}
296331
0 commit comments