Skip to content

Commit bfd64f6

Browse files
committed
Rename metrics
To match .NET client metric names. This commit also removes read/written bytes metrics because they were no-op. In this client, the AMQP 1.0 protocol is handled by a third-party library. This library does not expose "hooks" to count reliably the number of bytes read/written.
1 parent 281b18a commit bfd64f6

5 files changed

Lines changed: 47 additions & 99 deletions

File tree

docs/development/METRICS_IMPLEMENTATION_GUIDE.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,10 @@ consumeDisposition(disposition: ConsumeDisposition)
186186
- **DISCARDED**: Consumer discarded/rejected the message (dead letter)
187187
- **REQUEUED**: Consumer requeued the message for redelivery
188188

189-
#### Network I/O
189+
#### Network I/O (Optional)
190+
191+
**Note**: These metrics are **optional** and may not be supported by all client implementations.
192+
190193
```
191194
writtenBytes(byteCount: int)
192195
- Called when: Data is written to the network socket
@@ -203,6 +206,8 @@ readBytes(byteCount: int)
203206
- Note: Cumulative - caller must aggregate
204207
```
205208

209+
> **Go Client Implementation Note:** The Go client does **not** implement `writtenBytes` and `readBytes` metrics due to underlying library limitations. These methods are not present in the Go `MetricsCollector` interface.
210+
206211
---
207212

208213
## 4. Metric Types and Categories
@@ -237,13 +242,14 @@ These metrics represent **cumulative totals** that only increase:
237242
| consumed_accepted | Counter | Total messages accepted by consumers | messages |
238243
| consumed_discarded | Counter | Total messages discarded by consumers | messages |
239244
| consumed_requeued | Counter | Total messages requeued by consumers | messages |
240-
| written_bytes | Counter | Total bytes written to network | bytes |
241-
| read_bytes | Counter | Total bytes read from network | bytes |
245+
| written_bytes **(optional)** | Counter | Total bytes written to network | bytes |
246+
| read_bytes **(optional)** | Counter | Total bytes read from network | bytes |
242247

243248
**Implementation Notes**:
244249
- These should monotonically increase
245250
- Never reset during the lifetime of the application
246251
- Use thread-safe increment operations
252+
- **Note**: The `written_bytes` and `read_bytes` metrics are **optional**. Some client implementations (including the Go client) may not support these metrics due to underlying library limitations.
247253
- For byte metrics, expect very high frequency updates
248254

249255
### 4.3 Metric Naming Convention

pkg/rabbitmqamqp/metrics.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,6 @@ type MetricsCollector interface {
8383
// ConsumeDisposition is called when the consumer settles (acknowledges) a message.
8484
// The context contains OTEL semantic convention attributes for the disposition.
8585
ConsumeDisposition(disposition ConsumeDisposition, ctx ConsumeContext)
86-
87-
// WrittenBytes is called when data is written to the network socket.
88-
// The count parameter is the number of bytes written in this operation.
89-
WrittenBytes(count int64)
90-
91-
// ReadBytes is called when data is read from the network socket.
92-
// The count parameter is the number of bytes read in this operation.
93-
ReadBytes(count int64)
9486
}
9587

9688
// NoOpMetricsCollector is a MetricsCollector implementation that does nothing.
@@ -111,8 +103,6 @@ func (n *NoOpMetricsCollector) Publish(_ PublishContext)
111103
func (n *NoOpMetricsCollector) PublishDisposition(_ PublishDisposition, _ PublishContext) {}
112104
func (n *NoOpMetricsCollector) Consume(_ ConsumeContext) {}
113105
func (n *NoOpMetricsCollector) ConsumeDisposition(_ ConsumeDisposition, _ ConsumeContext) {}
114-
func (n *NoOpMetricsCollector) WrittenBytes(_ int64) {}
115-
func (n *NoOpMetricsCollector) ReadBytes(_ int64) {}
116106

117107
// defaultMetricsCollector is the singleton NoOpMetricsCollector instance used as default.
118108
var defaultMetricsCollector MetricsCollector = &NoOpMetricsCollector{}

pkg/rabbitmqamqp/metrics_otel.go

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ type OTELMetricsCollector struct {
3030
consumedAccepted metric.Int64Counter
3131
consumedDiscarded metric.Int64Counter
3232
consumedRequeued metric.Int64Counter
33-
writtenBytes metric.Int64Counter
34-
readBytes metric.Int64Counter
3533
}
3634

3735
// Ensure OTELMetricsCollector implements MetricsCollector
@@ -89,7 +87,7 @@ func NewOTELMetricsCollector(meterProvider metric.MeterProvider, prefix string)
8987
}
9088

9189
collector.publishedAccepted, err = meter.Int64Counter(
92-
prefix+".published.accepted",
90+
prefix+".published_accepted",
9391
metric.WithDescription("Total number of messages accepted by broker"),
9492
metric.WithUnit("{message}"),
9593
)
@@ -98,7 +96,7 @@ func NewOTELMetricsCollector(meterProvider metric.MeterProvider, prefix string)
9896
}
9997

10098
collector.publishedRejected, err = meter.Int64Counter(
101-
prefix+".published.rejected",
99+
prefix+".published_rejected",
102100
metric.WithDescription("Total number of messages rejected by broker"),
103101
metric.WithUnit("{message}"),
104102
)
@@ -107,7 +105,7 @@ func NewOTELMetricsCollector(meterProvider metric.MeterProvider, prefix string)
107105
}
108106

109107
collector.publishedReleased, err = meter.Int64Counter(
110-
prefix+".published.released",
108+
prefix+".published_released",
111109
metric.WithDescription("Total number of messages released by broker"),
112110
metric.WithUnit("{message}"),
113111
)
@@ -126,7 +124,7 @@ func NewOTELMetricsCollector(meterProvider metric.MeterProvider, prefix string)
126124
}
127125

128126
collector.consumedAccepted, err = meter.Int64Counter(
129-
prefix+".consumed.accepted",
127+
prefix+".consumed_accepted",
130128
metric.WithDescription("Total number of messages accepted by consumer"),
131129
metric.WithUnit("{message}"),
132130
)
@@ -135,7 +133,7 @@ func NewOTELMetricsCollector(meterProvider metric.MeterProvider, prefix string)
135133
}
136134

137135
collector.consumedDiscarded, err = meter.Int64Counter(
138-
prefix+".consumed.discarded",
136+
prefix+".consumed_discarded",
139137
metric.WithDescription("Total number of messages discarded by consumer"),
140138
metric.WithUnit("{message}"),
141139
)
@@ -144,33 +142,14 @@ func NewOTELMetricsCollector(meterProvider metric.MeterProvider, prefix string)
144142
}
145143

146144
collector.consumedRequeued, err = meter.Int64Counter(
147-
prefix+".consumed.requeued",
145+
prefix+".consumed_requeued",
148146
metric.WithDescription("Total number of messages requeued by consumer"),
149147
metric.WithUnit("{message}"),
150148
)
151149
if err != nil {
152150
return nil, err
153151
}
154152

155-
// Create counter metrics for network I/O
156-
collector.writtenBytes, err = meter.Int64Counter(
157-
prefix+".written_bytes",
158-
metric.WithDescription("Total bytes written to network"),
159-
metric.WithUnit("By"),
160-
)
161-
if err != nil {
162-
return nil, err
163-
}
164-
165-
collector.readBytes, err = meter.Int64Counter(
166-
prefix+".read_bytes",
167-
metric.WithDescription("Total bytes read from network"),
168-
metric.WithUnit("By"),
169-
)
170-
if err != nil {
171-
return nil, err
172-
}
173-
174153
return collector, nil
175154
}
176155

@@ -204,16 +183,6 @@ func (o *OTELMetricsCollector) CloseConsumer() {
204183
o.consumers.Add(context.Background(), -1)
205184
}
206185

207-
// WrittenBytes increments the written bytes counter.
208-
func (o *OTELMetricsCollector) WrittenBytes(count int64) {
209-
o.writtenBytes.Add(context.Background(), count)
210-
}
211-
212-
// ReadBytes increments the read bytes counter.
213-
func (o *OTELMetricsCollector) ReadBytes(count int64) {
214-
o.readBytes.Add(context.Background(), count)
215-
}
216-
217186
// buildBaseAttributes creates the common OTEL semantic convention attributes
218187
// for RabbitMQ messaging operations.
219188
func buildBaseAttributes(serverAddress string, serverPort int, destinationName string) []attribute.KeyValue {

tests/otelmetrics/metrics_e2e_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -148,16 +148,16 @@ func TestMetricsEndToEndBasicFlow(t *testing.T) {
148148
assertMetric(t, metrics, "test.e2e.published", int64(messageCount), "should have published %d messages", messageCount)
149149
})
150150

151-
t.Run("published.accepted counter", func(t *testing.T) {
152-
assertMetric(t, metrics, "test.e2e.published.accepted", int64(messageCount), "all messages should be accepted by broker")
151+
t.Run("published_accepted counter", func(t *testing.T) {
152+
assertMetric(t, metrics, "test.e2e.published_accepted", int64(messageCount), "all messages should be accepted by broker")
153153
})
154154

155155
t.Run("consumed counter", func(t *testing.T) {
156156
assertMetric(t, metrics, "test.e2e.consumed", int64(messageCount), "should have consumed %d messages", messageCount)
157157
})
158158

159-
t.Run("consumed.accepted counter", func(t *testing.T) {
160-
assertMetric(t, metrics, "test.e2e.consumed.accepted", int64(messageCount), "all messages should be accepted by consumer")
159+
t.Run("consumed_accepted counter", func(t *testing.T) {
160+
assertMetric(t, metrics, "test.e2e.consumed_accepted", int64(messageCount), "all messages should be accepted by consumer")
161161
})
162162
}
163163

@@ -487,11 +487,11 @@ func TestMetricsEndToEndWithSemanticConventions(t *testing.T) {
487487
}
488488
})
489489

490-
// Verify consumed.accepted has settle operation type
491-
t.Run("consumed.accepted has settle operation attributes", func(t *testing.T) {
492-
dps, ok := metricsWithAttrs["test.semconv.consumed.accepted"]
490+
// Verify consumed_accepted has settle operation type
491+
t.Run("consumed_accepted has settle operation attributes", func(t *testing.T) {
492+
dps, ok := metricsWithAttrs["test.semconv.consumed_accepted"]
493493
if !ok {
494-
t.Fatal("test.semconv.consumed.accepted metric not found")
494+
t.Fatal("test.semconv.consumed_accepted metric not found")
495495
}
496496

497497
found := false
@@ -503,7 +503,7 @@ func TestMetricsEndToEndWithSemanticConventions(t *testing.T) {
503503
}
504504
}
505505
if !found {
506-
t.Error("consumed.accepted metric with settle operation attributes not found")
506+
t.Error("consumed_accepted metric with settle operation attributes not found")
507507
}
508508
})
509509
}

tests/otelmetrics/metrics_test.go

Lines changed: 23 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ func TestNoOpMetricsCollector(t *testing.T) {
6464
collector.ConsumeDisposition(rmq.ConsumeAccepted, consumeCtx)
6565
collector.ConsumeDisposition(rmq.ConsumeDiscarded, consumeCtx)
6666
collector.ConsumeDisposition(rmq.ConsumeRequeued, consumeCtx)
67-
collector.WrittenBytes(1024)
68-
collector.ReadBytes(2048)
6967
}
7068

7169
// TestOTELMetricsCollectorCreation verifies that OTEL collector can be created with various options.
@@ -139,11 +137,6 @@ func TestOTELMetricsCollectorRecordsMetrics(t *testing.T) {
139137
collector.ConsumeDisposition(rmq.ConsumeRequeued, consumeCtx)
140138
collector.CloseConsumer()
141139

142-
// Simulate byte tracking
143-
collector.WrittenBytes(1000)
144-
collector.WrittenBytes(500)
145-
collector.ReadBytes(2000)
146-
147140
// Collect metrics
148141
var rm metricdata.ResourceMetrics
149142
err = reader.Collect(context.Background(), &rm)
@@ -189,32 +182,24 @@ func TestOTELMetricsCollectorRecordsMetrics(t *testing.T) {
189182
t.Errorf("expected published counter to be 3, got %d (found: %v)", val, ok)
190183
}
191184

192-
if val, ok := metrics["test.rabbitmq.published.accepted"]; !ok || val != 2 {
193-
t.Errorf("expected published.accepted counter to be 2, got %d (found: %v)", val, ok)
185+
if val, ok := metrics["test.rabbitmq.published_accepted"]; !ok || val != 2 {
186+
t.Errorf("expected published_accepted counter to be 2, got %d (found: %v)", val, ok)
194187
}
195188

196-
if val, ok := metrics["test.rabbitmq.published.rejected"]; !ok || val != 1 {
197-
t.Errorf("expected published.rejected counter to be 1, got %d (found: %v)", val, ok)
189+
if val, ok := metrics["test.rabbitmq.published_rejected"]; !ok || val != 1 {
190+
t.Errorf("expected published_rejected counter to be 1, got %d (found: %v)", val, ok)
198191
}
199192

200193
if val, ok := metrics["test.rabbitmq.consumed"]; !ok || val != 2 {
201194
t.Errorf("expected consumed counter to be 2, got %d (found: %v)", val, ok)
202195
}
203196

204-
if val, ok := metrics["test.rabbitmq.consumed.accepted"]; !ok || val != 1 {
205-
t.Errorf("expected consumed.accepted counter to be 1, got %d (found: %v)", val, ok)
206-
}
207-
208-
if val, ok := metrics["test.rabbitmq.consumed.requeued"]; !ok || val != 1 {
209-
t.Errorf("expected consumed.requeued counter to be 1, got %d (found: %v)", val, ok)
210-
}
211-
212-
if val, ok := metrics["test.rabbitmq.written_bytes"]; !ok || val != 1500 {
213-
t.Errorf("expected written_bytes counter to be 1500, got %d (found: %v)", val, ok)
197+
if val, ok := metrics["test.rabbitmq.consumed_accepted"]; !ok || val != 1 {
198+
t.Errorf("expected consumed_accepted counter to be 1, got %d (found: %v)", val, ok)
214199
}
215200

216-
if val, ok := metrics["test.rabbitmq.read_bytes"]; !ok || val != 2000 {
217-
t.Errorf("expected read_bytes counter to be 2000, got %d (found: %v)", val, ok)
201+
if val, ok := metrics["test.rabbitmq.consumed_requeued"]; !ok || val != 1 {
202+
t.Errorf("expected consumed_requeued counter to be 1, got %d (found: %v)", val, ok)
218203
}
219204
}
220205

@@ -331,8 +316,6 @@ func TestMetricsCollectorThreadSafety(t *testing.T) {
331316
collector.PublishDisposition(rmq.PublishAccepted, publishCtx)
332317
collector.Consume(consumeCtx)
333318
collector.ConsumeDisposition(rmq.ConsumeAccepted, consumeCtx)
334-
collector.WrittenBytes(100)
335-
collector.ReadBytes(100)
336319
collector.CloseConsumer()
337320
collector.ClosePublisher()
338321
collector.CloseConnection()
@@ -469,10 +452,10 @@ func TestOTELMetricsCollectorWithContextMethods(t *testing.T) {
469452

470453
// Verify consume disposition metrics have attributes
471454
t.Run("consume dispositions with context have attributes", func(t *testing.T) {
472-
// Check consumed.accepted
473-
dps, ok := metricsWithAttrs["test.ctx.consumed.accepted"]
455+
// Check consumed_accepted
456+
dps, ok := metricsWithAttrs["test.ctx.consumed_accepted"]
474457
if !ok {
475-
t.Error("test.ctx.consumed.accepted metric not found")
458+
t.Error("test.ctx.consumed_accepted metric not found")
476459
} else {
477460
found := false
478461
for _, dp := range dps {
@@ -482,14 +465,14 @@ func TestOTELMetricsCollectorWithContextMethods(t *testing.T) {
482465
}
483466
}
484467
if !found {
485-
t.Error("consumed.accepted with ack operation name not found")
468+
t.Error("consumed_accepted with ack operation name not found")
486469
}
487470
}
488471

489-
// Check consumed.discarded
490-
dps, ok = metricsWithAttrs["test.ctx.consumed.discarded"]
472+
// Check consumed_discarded
473+
dps, ok = metricsWithAttrs["test.ctx.consumed_discarded"]
491474
if !ok {
492-
t.Error("test.ctx.consumed.discarded metric not found")
475+
t.Error("test.ctx.consumed_discarded metric not found")
493476
} else {
494477
found := false
495478
for _, dp := range dps {
@@ -499,14 +482,14 @@ func TestOTELMetricsCollectorWithContextMethods(t *testing.T) {
499482
}
500483
}
501484
if !found {
502-
t.Error("consumed.discarded with nack operation name not found")
485+
t.Error("consumed_discarded with nack operation name not found")
503486
}
504487
}
505488

506-
// Check consumed.requeued
507-
dps, ok = metricsWithAttrs["test.ctx.consumed.requeued"]
489+
// Check consumed_requeued
490+
dps, ok = metricsWithAttrs["test.ctx.consumed_requeued"]
508491
if !ok {
509-
t.Error("test.ctx.consumed.requeued metric not found")
492+
t.Error("test.ctx.consumed_requeued metric not found")
510493
} else {
511494
found := false
512495
for _, dp := range dps {
@@ -516,7 +499,7 @@ func TestOTELMetricsCollectorWithContextMethods(t *testing.T) {
516499
}
517500
}
518501
if !found {
519-
t.Error("consumed.requeued with requeue operation name not found")
502+
t.Error("consumed_requeued with requeue operation name not found")
520503
}
521504
}
522505
})
@@ -603,9 +586,9 @@ func TestDoubleCountingBug(t *testing.T) {
603586
}
604587
})
605588

606-
t.Run("consumed.accepted should count once per disposition", func(t *testing.T) {
607-
if total := metricTotals["test.double.consumed.accepted"]; total != 1 {
608-
t.Errorf("BUG DETECTED: consumed.accepted metric should be 1 for one disposition, got %d (double-counting!)", total)
589+
t.Run("consumed_accepted should count once per disposition", func(t *testing.T) {
590+
if total := metricTotals["test.double.consumed_accepted"]; total != 1 {
591+
t.Errorf("BUG DETECTED: consumed_accepted metric should be 1 for one disposition, got %d (double-counting!)", total)
609592
}
610593
})
611594
}

0 commit comments

Comments
 (0)