Skip to content

Commit 984358c

Browse files
authored
feat(operational-gateway): Add observability instrumentation (#1329)
* feat: add observability instrumentation for operational gateway Adds Prometheus metrics, OpenTelemetry tracing, and structured logging support for the operational gateway service. Metrics: - meridian_gateway_instructions_total (counter, tenant/type/status) - meridian_gateway_dispatch_duration_seconds (histogram, tenant/provider) - meridian_gateway_dispatch_attempts_total (counter, tenant/provider/outcome) - meridian_gateway_circuit_breaker_state (gauge, tenant/connection_id/state) - meridian_gateway_active_instructions (gauge, tenant/status) Tracing: - OpenTelemetry tracer using meridian/operational-gateway instrumentation scope - StartSpan helper for creating internal spans - RecordError helper for marking spans as failed - Attribute keys for gateway-specific span data * fix: address CodeRabbit review feedback on observability --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 4bd4e20 commit 984358c

3 files changed

Lines changed: 357 additions & 0 deletions

File tree

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Package observability provides Prometheus metrics and monitoring for the operational gateway service.
2+
package observability
3+
4+
import (
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
"github.com/prometheus/client_golang/prometheus/promauto"
9+
)
10+
11+
var (
12+
// instructionsTotal counts all instructions by tenant, type, and terminal status.
13+
instructionsTotal = promauto.NewCounterVec(
14+
prometheus.CounterOpts{
15+
Name: "meridian_gateway_instructions_total",
16+
Help: "Total number of instructions processed, labeled by tenant, instruction type, and terminal status.",
17+
},
18+
[]string{"tenant", "instruction_type", "status"},
19+
)
20+
21+
// dispatchDuration records how long a single dispatch attempt takes.
22+
dispatchDuration = promauto.NewHistogramVec(
23+
prometheus.HistogramOpts{
24+
Name: "meridian_gateway_dispatch_duration_seconds",
25+
Help: "Duration of a dispatch attempt to an external provider in seconds.",
26+
Buckets: []float64{.01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 30},
27+
},
28+
[]string{"tenant", "provider"},
29+
)
30+
31+
// dispatchAttemptsTotal counts all dispatch attempts by tenant, provider, and outcome.
32+
dispatchAttemptsTotal = promauto.NewCounterVec(
33+
prometheus.CounterOpts{
34+
Name: "meridian_gateway_dispatch_attempts_total",
35+
Help: "Total number of individual dispatch attempts to external providers.",
36+
},
37+
[]string{"tenant", "provider", "outcome"},
38+
)
39+
40+
// circuitBreakerState tracks the circuit breaker state per connection.
41+
// One time series per state label: active state is 1, inactive states are 0.
42+
circuitBreakerState = promauto.NewGaugeVec(
43+
prometheus.GaugeOpts{
44+
Name: "meridian_gateway_circuit_breaker_state",
45+
Help: "One-hot circuit breaker state per provider connection and state label (1=active, 0=inactive).",
46+
},
47+
[]string{"tenant", "connection_id", "state"},
48+
)
49+
50+
// activeInstructions tracks the number of instructions currently in a given non-terminal status.
51+
activeInstructions = promauto.NewGaugeVec(
52+
prometheus.GaugeOpts{
53+
Name: "meridian_gateway_active_instructions",
54+
Help: "Current number of instructions in non-terminal states, labeled by tenant and status.",
55+
},
56+
[]string{"tenant", "status"},
57+
)
58+
)
59+
60+
// DispatchOutcome represents the result of a single dispatch attempt.
61+
const (
62+
// DispatchOutcomeSuccess means the provider accepted the instruction.
63+
DispatchOutcomeSuccess = "success"
64+
// DispatchOutcomeRetry means the attempt failed but will be retried.
65+
DispatchOutcomeRetry = "retry"
66+
// DispatchOutcomeFailure means the attempt failed permanently.
67+
DispatchOutcomeFailure = "failure"
68+
// DispatchOutcomeCircuitOpen means the circuit breaker blocked the attempt.
69+
DispatchOutcomeCircuitOpen = "circuit_open"
70+
)
71+
72+
// CircuitBreakerStateValue encodes the circuit breaker state as a Prometheus gauge value.
73+
type CircuitBreakerStateValue float64
74+
75+
const (
76+
// CircuitBreakerClosed represents a closed (healthy) circuit breaker.
77+
CircuitBreakerClosed CircuitBreakerStateValue = 0
78+
// CircuitBreakerHalfOpen represents a half-open (probing) circuit breaker.
79+
CircuitBreakerHalfOpen CircuitBreakerStateValue = 1
80+
// CircuitBreakerOpen represents an open (blocking) circuit breaker.
81+
CircuitBreakerOpen CircuitBreakerStateValue = 2
82+
)
83+
84+
// RecordInstruction increments the instructions counter for the given terminal status.
85+
// Call this when an instruction reaches a terminal state (DELIVERED, FAILED, EXPIRED, CANCELLED, ACKNOWLEDGED).
86+
func RecordInstruction(tenant, instructionType, status string) {
87+
instructionsTotal.WithLabelValues(tenant, instructionType, status).Inc()
88+
}
89+
90+
// RecordDispatchDuration records how long a dispatch attempt took.
91+
func RecordDispatchDuration(tenant, provider string, duration time.Duration) {
92+
dispatchDuration.WithLabelValues(tenant, provider).Observe(duration.Seconds())
93+
}
94+
95+
// RecordDispatchAttempt increments the dispatch attempts counter.
96+
// outcome should be one of the DispatchOutcome* constants.
97+
func RecordDispatchAttempt(tenant, provider, outcome string) {
98+
dispatchAttemptsTotal.WithLabelValues(tenant, provider, outcome).Inc()
99+
}
100+
101+
// RecordCircuitBreakerState updates the gauge for a specific connection's circuit breaker state.
102+
// state should be one of the CircuitBreaker* constants.
103+
func RecordCircuitBreakerState(tenant, connectionID string, state CircuitBreakerStateValue) {
104+
// Reset all state label values for this connection before setting the current one
105+
// to avoid stale multi-value time series. Using three separate gauge vectors would
106+
// require separate Reset calls; instead we use a single gauge and set the value.
107+
circuitBreakerState.WithLabelValues(tenant, connectionID, "closed").Set(0)
108+
circuitBreakerState.WithLabelValues(tenant, connectionID, "half_open").Set(0)
109+
circuitBreakerState.WithLabelValues(tenant, connectionID, "open").Set(0)
110+
111+
switch state {
112+
case CircuitBreakerClosed:
113+
circuitBreakerState.WithLabelValues(tenant, connectionID, "closed").Set(1)
114+
case CircuitBreakerHalfOpen:
115+
circuitBreakerState.WithLabelValues(tenant, connectionID, "half_open").Set(1)
116+
case CircuitBreakerOpen:
117+
circuitBreakerState.WithLabelValues(tenant, connectionID, "open").Set(1)
118+
}
119+
}
120+
121+
// SetActiveInstructions sets the gauge for instructions currently in a given status.
122+
// Call this after each polling cycle with the current count per status.
123+
func SetActiveInstructions(tenant, status string, count float64) {
124+
activeInstructions.WithLabelValues(tenant, status).Set(count)
125+
}
126+
127+
// IncrActiveInstructions increments the active instructions gauge.
128+
// Call this when a new instruction enters a non-terminal status.
129+
func IncrActiveInstructions(tenant, status string) {
130+
activeInstructions.WithLabelValues(tenant, status).Inc()
131+
}
132+
133+
// DecrActiveInstructions decrements the active instructions gauge.
134+
// Call this when an instruction leaves a status (transitions to another or reaches terminal).
135+
func DecrActiveInstructions(tenant, status string) {
136+
activeInstructions.WithLabelValues(tenant, status).Dec()
137+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package observability
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus/testutil"
8+
)
9+
10+
func TestRecordInstruction(t *testing.T) {
11+
instructionsTotal.Reset()
12+
13+
RecordInstruction("tenant-a", "PAYMENT_INITIATION", "ACKNOWLEDGED")
14+
RecordInstruction("tenant-a", "PAYMENT_INITIATION", "FAILED")
15+
RecordInstruction("tenant-b", "REFUND", "ACKNOWLEDGED")
16+
17+
count := testutil.CollectAndCount(instructionsTotal)
18+
if count != 3 {
19+
t.Errorf("expected 3 series, got %d", count)
20+
}
21+
}
22+
23+
func TestRecordDispatchDuration(t *testing.T) {
24+
dispatchDuration.Reset()
25+
26+
RecordDispatchDuration("tenant-a", "acme-bank", 150*time.Millisecond)
27+
RecordDispatchDuration("tenant-a", "acme-bank", 300*time.Millisecond)
28+
RecordDispatchDuration("tenant-b", "energy-co", 50*time.Millisecond)
29+
30+
count := testutil.CollectAndCount(dispatchDuration)
31+
if count == 0 {
32+
t.Error("expected dispatch duration metrics to be recorded")
33+
}
34+
}
35+
36+
func TestRecordDispatchAttempt(t *testing.T) {
37+
dispatchAttemptsTotal.Reset()
38+
39+
RecordDispatchAttempt("tenant-a", "acme-bank", DispatchOutcomeSuccess)
40+
RecordDispatchAttempt("tenant-a", "acme-bank", DispatchOutcomeRetry)
41+
RecordDispatchAttempt("tenant-a", "acme-bank", DispatchOutcomeFailure)
42+
RecordDispatchAttempt("tenant-b", "energy-co", DispatchOutcomeCircuitOpen)
43+
44+
count := testutil.CollectAndCount(dispatchAttemptsTotal)
45+
if count != 4 {
46+
t.Errorf("expected 4 series, got %d", count)
47+
}
48+
}
49+
50+
func TestRecordCircuitBreakerState(t *testing.T) {
51+
circuitBreakerState.Reset()
52+
53+
tests := []struct {
54+
name string
55+
state CircuitBreakerStateValue
56+
activeLabel string
57+
inactiveA string
58+
inactiveB string
59+
}{
60+
{
61+
name: "closed state",
62+
state: CircuitBreakerClosed,
63+
activeLabel: "closed",
64+
inactiveA: "half_open",
65+
inactiveB: "open",
66+
},
67+
{
68+
name: "half-open state",
69+
state: CircuitBreakerHalfOpen,
70+
activeLabel: "half_open",
71+
inactiveA: "closed",
72+
inactiveB: "open",
73+
},
74+
{
75+
name: "open state",
76+
state: CircuitBreakerOpen,
77+
activeLabel: "open",
78+
inactiveA: "closed",
79+
inactiveB: "half_open",
80+
},
81+
}
82+
83+
for _, tt := range tests {
84+
t.Run(tt.name, func(t *testing.T) {
85+
circuitBreakerState.Reset()
86+
RecordCircuitBreakerState("tenant-a", "conn-1", tt.state)
87+
88+
// Verify the active label has value 1
89+
active := testutil.ToFloat64(circuitBreakerState.WithLabelValues("tenant-a", "conn-1", tt.activeLabel))
90+
if active != 1 {
91+
t.Errorf("expected %s gauge to be 1, got %f", tt.activeLabel, active)
92+
}
93+
94+
// Verify inactive labels are set to 0
95+
inactiveAVal := testutil.ToFloat64(circuitBreakerState.WithLabelValues("tenant-a", "conn-1", tt.inactiveA))
96+
if inactiveAVal != 0 {
97+
t.Errorf("expected %s gauge to be 0, got %f", tt.inactiveA, inactiveAVal)
98+
}
99+
100+
inactiveBVal := testutil.ToFloat64(circuitBreakerState.WithLabelValues("tenant-a", "conn-1", tt.inactiveB))
101+
if inactiveBVal != 0 {
102+
t.Errorf("expected %s gauge to be 0, got %f", tt.inactiveB, inactiveBVal)
103+
}
104+
})
105+
}
106+
}
107+
108+
func TestSetActiveInstructions(t *testing.T) {
109+
activeInstructions.Reset()
110+
111+
SetActiveInstructions("tenant-a", "PENDING", 10)
112+
SetActiveInstructions("tenant-a", "DISPATCHING", 5)
113+
SetActiveInstructions("tenant-b", "RETRYING", 2)
114+
115+
pendingVal := testutil.ToFloat64(activeInstructions.WithLabelValues("tenant-a", "PENDING"))
116+
if pendingVal != 10 {
117+
t.Errorf("expected PENDING gauge to be 10, got %f", pendingVal)
118+
}
119+
120+
dispatchingVal := testutil.ToFloat64(activeInstructions.WithLabelValues("tenant-a", "DISPATCHING"))
121+
if dispatchingVal != 5 {
122+
t.Errorf("expected DISPATCHING gauge to be 5, got %f", dispatchingVal)
123+
}
124+
}
125+
126+
func TestIncrDecrActiveInstructions(t *testing.T) {
127+
activeInstructions.Reset()
128+
129+
IncrActiveInstructions("tenant-a", "PENDING")
130+
IncrActiveInstructions("tenant-a", "PENDING")
131+
IncrActiveInstructions("tenant-a", "PENDING")
132+
133+
val := testutil.ToFloat64(activeInstructions.WithLabelValues("tenant-a", "PENDING"))
134+
if val != 3 {
135+
t.Errorf("expected gauge to be 3 after 3 increments, got %f", val)
136+
}
137+
138+
DecrActiveInstructions("tenant-a", "PENDING")
139+
140+
val = testutil.ToFloat64(activeInstructions.WithLabelValues("tenant-a", "PENDING"))
141+
if val != 2 {
142+
t.Errorf("expected gauge to be 2 after decrement, got %f", val)
143+
}
144+
}
145+
146+
func TestDispatchOutcomeConstants(t *testing.T) {
147+
if DispatchOutcomeSuccess != "success" {
148+
t.Errorf("DispatchOutcomeSuccess should be 'success', got %q", DispatchOutcomeSuccess)
149+
}
150+
if DispatchOutcomeRetry != "retry" {
151+
t.Errorf("DispatchOutcomeRetry should be 'retry', got %q", DispatchOutcomeRetry)
152+
}
153+
if DispatchOutcomeFailure != "failure" {
154+
t.Errorf("DispatchOutcomeFailure should be 'failure', got %q", DispatchOutcomeFailure)
155+
}
156+
if DispatchOutcomeCircuitOpen != "circuit_open" {
157+
t.Errorf("DispatchOutcomeCircuitOpen should be 'circuit_open', got %q", DispatchOutcomeCircuitOpen)
158+
}
159+
}
160+
161+
func TestCircuitBreakerStateValueConstants(t *testing.T) {
162+
if CircuitBreakerClosed != 0 {
163+
t.Errorf("CircuitBreakerClosed should be 0, got %v", CircuitBreakerClosed)
164+
}
165+
if CircuitBreakerHalfOpen != 1 {
166+
t.Errorf("CircuitBreakerHalfOpen should be 1, got %v", CircuitBreakerHalfOpen)
167+
}
168+
if CircuitBreakerOpen != 2 {
169+
t.Errorf("CircuitBreakerOpen should be 2, got %v", CircuitBreakerOpen)
170+
}
171+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package observability
2+
3+
import (
4+
"context"
5+
6+
"go.opentelemetry.io/otel"
7+
"go.opentelemetry.io/otel/attribute"
8+
"go.opentelemetry.io/otel/codes"
9+
"go.opentelemetry.io/otel/trace"
10+
)
11+
12+
const tracerName = "meridian/operational-gateway"
13+
14+
// Tracer returns the OpenTelemetry tracer for the operational gateway service.
15+
func Tracer() trace.Tracer {
16+
return otel.Tracer(tracerName)
17+
}
18+
19+
// StartSpan creates a new internal span as a child of the span in the context.
20+
// The returned context contains the new span; callers must call span.End() when done.
21+
func StartSpan(ctx context.Context, name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) {
22+
return Tracer().Start(ctx, name,
23+
trace.WithAttributes(attrs...),
24+
trace.WithSpanKind(trace.SpanKindInternal),
25+
)
26+
}
27+
28+
// RecordError marks a span as having errored and records the error message.
29+
func RecordError(span trace.Span, err error) {
30+
if err == nil || span == nil {
31+
return
32+
}
33+
span.RecordError(err)
34+
span.SetStatus(codes.Error, "error")
35+
}
36+
37+
// Common attribute keys for operational gateway spans.
38+
var (
39+
AttrTenantID = attribute.Key("gateway.tenant_id")
40+
AttrInstructionID = attribute.Key("gateway.instruction_id")
41+
AttrInstructionType = attribute.Key("gateway.instruction_type")
42+
AttrInstructionStatus = attribute.Key("gateway.instruction_status")
43+
AttrProviderConnectionID = attribute.Key("gateway.provider_connection_id")
44+
AttrProviderName = attribute.Key("gateway.provider_name")
45+
AttrAttemptCount = attribute.Key("gateway.attempt_count")
46+
AttrMaxAttempts = attribute.Key("gateway.max_attempts")
47+
AttrErrorCode = attribute.Key("gateway.error_code")
48+
AttrBatchSize = attribute.Key("gateway.batch_size")
49+
)

0 commit comments

Comments
 (0)