-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Expand file tree
/
Copy pathotel.go
More file actions
204 lines (164 loc) · 8.57 KB
/
otel.go
File metadata and controls
204 lines (164 loc) · 8.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package redis
import (
"context"
"net"
"time"
"github.com/redis/go-redis/v9/internal/otel"
"github.com/redis/go-redis/v9/internal/pool"
)
// ConnInfo provides information about a Redis connection for metrics.
type ConnInfo interface {
RemoteAddr() net.Addr
PoolName() string
}
type Pooler interface {
PoolStats() *pool.Stats
}
type PubSubPooler interface {
Stats() *pool.PubSubStats
}
// OTelRecorder is the interface for recording OpenTelemetry metrics.
type OTelRecorder interface {
// RecordOperationDuration records the total operation duration (including all retries)
RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, err error, cn ConnInfo, dbIndex int)
// RecordPipelineOperationDuration records the total pipeline/transaction duration.
// operationName should be "PIPELINE" for regular pipelines or "MULTI" for transactions.
RecordPipelineOperationDuration(ctx context.Context, duration time.Duration, operationName string, cmdCount int, attempts int, err error, cn ConnInfo, dbIndex int)
// RecordConnectionCreateTime records the time it took to create a new connection
RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn ConnInfo)
// RecordConnectionRelaxedTimeout records when connection timeout is relaxed/unrelaxed
// delta: +1 for relaxed, -1 for unrelaxed
// poolName: name of the connection pool (e.g., "main", "pubsub")
// notificationType: the notification type that triggered the timeout relaxation (e.g., "MOVING", "HANDOFF")
RecordConnectionRelaxedTimeout(ctx context.Context, delta int, cn ConnInfo, poolName, notificationType string)
// RecordConnectionHandoff records when a connection is handed off to another node
// poolName: name of the connection pool (e.g., "main", "pubsub")
RecordConnectionHandoff(ctx context.Context, cn ConnInfo, poolName string)
// RecordError records client errors (ASK, MOVED, handshake failures, etc.)
// errorType: type of error (e.g., "ASK", "MOVED", "HANDSHAKE_FAILED")
// statusCode: Redis response status code if available (e.g., "MOVED", "ASK")
// isInternal: whether this is an internal error
// retryAttempts: number of retry attempts made
RecordError(ctx context.Context, errorType string, cn ConnInfo, statusCode string, isInternal bool, retryAttempts int)
// RecordMaintenanceNotification records when a maintenance notification is received
// notificationType: the type of notification (e.g., "MOVING", "MIGRATING", etc.)
RecordMaintenanceNotification(ctx context.Context, cn ConnInfo, notificationType string)
// RecordConnectionWaitTime records the time spent waiting for a connection from the pool
RecordConnectionWaitTime(ctx context.Context, duration time.Duration, cn ConnInfo)
// RecordConnectionClosed records when a connection is closed
// reason: reason for closing (e.g., "idle", "max_lifetime", "error", "pool_closed")
// err: the error that caused the close (nil for non-error closures)
RecordConnectionClosed(ctx context.Context, cn ConnInfo, reason string, err error)
// RecordPubSubMessage records a Pub/Sub message
// direction: "sent" or "received"
// channel: channel name (may be hidden for cardinality reduction)
// sharded: true for sharded pub/sub (SPUBLISH/SSUBSCRIBE)
RecordPubSubMessage(ctx context.Context, cn ConnInfo, direction, channel string, sharded bool)
// RecordStreamLag records the lag for stream consumer group processing
// lag: time difference between message creation and consumption
// streamName: name of the stream (may be hidden for cardinality reduction)
// consumerGroup: name of the consumer group
// consumerName: name of the consumer
RecordStreamLag(ctx context.Context, lag time.Duration, cn ConnInfo, streamName, consumerGroup, consumerName string)
}
// This is used for async gauge metrics that need to pull stats from pools periodically.
type OTelPoolRegistrar interface {
// RegisterPool is called when a new client is created with its main connection pool.
// poolName: unique identifier for the pool (e.g., "main_abc123")
RegisterPool(poolName string, pool Pooler)
// UnregisterPool is called when a client is closed to remove its pool from the registry.
UnregisterPool(pool Pooler)
// RegisterPubSubPool is called when a new client is created with a PubSub pool.
// poolName: unique identifier for the pool (e.g., "main_abc123_pubsub")
RegisterPubSubPool(poolName string, pool PubSubPooler)
// UnregisterPubSubPool is called when a PubSub client is closed to remove its pool.
UnregisterPubSubPool(pool PubSubPooler)
}
// SetOTelRecorder sets the global OpenTelemetry recorder.
func SetOTelRecorder(r OTelRecorder) {
if r == nil {
otel.SetGlobalRecorder(nil)
return
}
otel.SetGlobalRecorder(&otelRecorderAdapter{r})
}
type otelRecorderAdapter struct {
recorder OTelRecorder
}
// toConnInfo converts *pool.Conn to ConnInfo interface properly.
// This ensures that a nil *pool.Conn becomes a true nil interface,
// not a non-nil interface containing a nil pointer.
func toConnInfo(cn *pool.Conn) ConnInfo {
if cn == nil {
return nil
}
return cn
}
func (a *otelRecorderAdapter) RecordOperationDuration(ctx context.Context, duration time.Duration, cmd otel.Cmder, attempts int, err error, cn *pool.Conn, dbIndex int) {
// Convert internal Cmder to public Cmder
if publicCmd, ok := cmd.(Cmder); ok {
a.recorder.RecordOperationDuration(ctx, duration, publicCmd, attempts, err, toConnInfo(cn), dbIndex)
}
}
func (a *otelRecorderAdapter) RecordPipelineOperationDuration(ctx context.Context, duration time.Duration, operationName string, cmdCount int, attempts int, err error, cn *pool.Conn, dbIndex int) {
a.recorder.RecordPipelineOperationDuration(ctx, duration, operationName, cmdCount, attempts, err, toConnInfo(cn), dbIndex)
}
func (a *otelRecorderAdapter) RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn *pool.Conn) {
a.recorder.RecordConnectionCreateTime(ctx, duration, toConnInfo(cn))
}
func (a *otelRecorderAdapter) RecordConnectionRelaxedTimeout(ctx context.Context, delta int, cn *pool.Conn, poolName, notificationType string) {
a.recorder.RecordConnectionRelaxedTimeout(ctx, delta, toConnInfo(cn), poolName, notificationType)
}
func (a *otelRecorderAdapter) RecordConnectionHandoff(ctx context.Context, cn *pool.Conn, poolName string) {
a.recorder.RecordConnectionHandoff(ctx, toConnInfo(cn), poolName)
}
func (a *otelRecorderAdapter) RecordError(ctx context.Context, errorType string, cn *pool.Conn, statusCode string, isInternal bool, retryAttempts int) {
a.recorder.RecordError(ctx, errorType, toConnInfo(cn), statusCode, isInternal, retryAttempts)
}
func (a *otelRecorderAdapter) RecordMaintenanceNotification(ctx context.Context, cn *pool.Conn, notificationType string) {
a.recorder.RecordMaintenanceNotification(ctx, toConnInfo(cn), notificationType)
}
func (a *otelRecorderAdapter) RecordConnectionWaitTime(ctx context.Context, duration time.Duration, cn *pool.Conn) {
a.recorder.RecordConnectionWaitTime(ctx, duration, toConnInfo(cn))
}
func (a *otelRecorderAdapter) RecordConnectionClosed(ctx context.Context, cn *pool.Conn, reason string, err error) {
a.recorder.RecordConnectionClosed(ctx, toConnInfo(cn), reason, err)
}
func (a *otelRecorderAdapter) RecordPubSubMessage(ctx context.Context, cn *pool.Conn, direction, channel string, sharded bool) {
a.recorder.RecordPubSubMessage(ctx, toConnInfo(cn), direction, channel, sharded)
}
func (a *otelRecorderAdapter) RecordStreamLag(ctx context.Context, lag time.Duration, cn *pool.Conn, streamName, consumerGroup, consumerName string) {
a.recorder.RecordStreamLag(ctx, lag, toConnInfo(cn), streamName, consumerGroup, consumerName)
}
func (a *otelRecorderAdapter) RegisterPool(poolName string, p pool.Pooler) {
if registrar, ok := a.recorder.(OTelPoolRegistrar); ok {
registrar.RegisterPool(poolName, &poolerAdapter{p})
}
}
func (a *otelRecorderAdapter) UnregisterPool(p pool.Pooler) {
if registrar, ok := a.recorder.(OTelPoolRegistrar); ok {
registrar.UnregisterPool(&poolerAdapter{p})
}
}
func (a *otelRecorderAdapter) RegisterPubSubPool(poolName string, p otel.PubSubPooler) {
if registrar, ok := a.recorder.(OTelPoolRegistrar); ok {
registrar.RegisterPubSubPool(poolName, &pubSubPoolerAdapter{p})
}
}
func (a *otelRecorderAdapter) UnregisterPubSubPool(p otel.PubSubPooler) {
if registrar, ok := a.recorder.(OTelPoolRegistrar); ok {
registrar.UnregisterPubSubPool(&pubSubPoolerAdapter{p})
}
}
type poolerAdapter struct {
p pool.Pooler
}
func (a *poolerAdapter) PoolStats() *pool.Stats {
return a.p.Stats()
}
type pubSubPoolerAdapter struct {
p otel.PubSubPooler
}
func (a *pubSubPoolerAdapter) Stats() *pool.PubSubStats {
return a.p.Stats()
}