diff --git a/operator/informer_controller.go b/operator/informer_controller.go index 72979c74d..c58181210 100644 --- a/operator/informer_controller.go +++ b/operator/informer_controller.go @@ -124,6 +124,8 @@ type InformerController struct { watcherLatency *prometheus.HistogramVec inflightActions *prometheus.GaugeVec inflightEvents *prometheus.GaugeVec + reconcilerErrors *prometheus.CounterVec + watcherErrors *prometheus.CounterVec } type retryInfo struct { @@ -220,6 +222,18 @@ func NewInformerController(cfg InformerControllerConfig) *InformerController { Namespace: cfg.MetricsConfig.Namespace, Help: "Current number of events which have active reconcile processes", }, []string{"event_type", "kind"}), + reconcilerErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: cfg.MetricsConfig.Namespace, + Subsystem: "reconciler", + Name: "errors_total", + Help: "Total number of reconciler errors, before any retry.", + }, []string{"event_type", "kind"}), + watcherErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: cfg.MetricsConfig.Namespace, + Subsystem: "watcher", + Name: "errors_total", + Help: "Total number of watcher errors, before any retry.", + }, []string{"event_type", "kind"}), } if cfg.ErrorHandler != nil { inf.ErrorHandler = cfg.ErrorHandler @@ -232,7 +246,10 @@ func NewInformerController(cfg InformerControllerConfig) *InformerController { } if cfg.RetryProcessorConfig != nil { retryPolicyFn := func() RetryPolicy { return inf.RetryPolicy } - inf.retryProcessor = NewRetryProcessor(*cfg.RetryProcessorConfig, retryPolicyFn) + processorCfg := *cfg.RetryProcessorConfig + // Inherit the controller's MetricsConfig so all metrics share a consistent namespace. + processorCfg.MetricsConfig = cfg.MetricsConfig + inf.retryProcessor = NewRetryProcessor(processorCfg, retryPolicyFn) } return inf } @@ -352,6 +369,10 @@ func (c *InformerController) Run(ctx context.Context) error { func (c *InformerController) PrometheusCollectors() []prometheus.Collector { collectors := []prometheus.Collector{ c.totalEvents, c.reconcileLatency, c.inflightEvents, c.inflightActions, c.reconcilerLatency, c.watcherLatency, + c.reconcilerErrors, c.watcherErrors, + } + if cast, ok := c.retryProcessor.(metrics.Provider); ok { + collectors = append(collectors, cast.PrometheusCollectors()...) } c.informers.RangeAll(func(_ string, _ int, value Informer) { if cast, ok := value.(metrics.Provider); ok { @@ -424,6 +445,9 @@ func (c *InformerController) informerAddFunc(resourceKind string) func(context.C // Do the watcher's Add, check for error c.wrapWatcherCall(string(ResourceActionCreate), obj.GetStaticMetadata().Kind, func() { err := watcher.Add(ctx, obj) + if err != nil { + c.watcherErrors.WithLabelValues(string(ResourceActionCreate), obj.GetStaticMetadata().Kind).Inc() + } if err != nil && c.ErrorHandler != nil { c.ErrorHandler(ctx, err) // TODO: improve ErrorHandler } @@ -479,6 +503,9 @@ func (c *InformerController) informerUpdateFunc(resourceKind string) func(contex // Do the watcher's Update, check for error c.wrapWatcherCall(string(ResourceActionUpdate), newObj.GetStaticMetadata().Kind, func() { err := watcher.Update(ctx, oldObj, newObj) + if err != nil { + c.watcherErrors.WithLabelValues(string(ResourceActionUpdate), newObj.GetStaticMetadata().Kind).Inc() + } if err != nil && c.ErrorHandler != nil { c.ErrorHandler(ctx, err) } @@ -543,6 +570,9 @@ func (c *InformerController) informerDeleteFunc(resourceKind string) func(contex // Do the watcher's Delete, check for error c.wrapWatcherCall(string(ResourceActionDelete), obj.GetStaticMetadata().Kind, func() { err := watcher.Delete(ctx, obj) + if err != nil { + c.watcherErrors.WithLabelValues(string(ResourceActionDelete), obj.GetStaticMetadata().Kind).Inc() + } if err != nil && c.ErrorHandler != nil { c.ErrorHandler(ctx, err) // TODO: improve ErrorHandler } @@ -647,6 +677,7 @@ func (c *InformerController) doReconcile(ctx context.Context, reconciler Reconci } } else if err != nil { // Otherwise, if err is non-nil, queue a retry according to the RetryPolicy + c.reconcilerErrors.WithLabelValues(string(action), req.Object.GetStaticMetadata().Kind).Inc() if c.ErrorHandler != nil { // Call the ErrorHandler function as well if it's set c.ErrorHandler(ctx, err) diff --git a/operator/informer_controller_test.go b/operator/informer_controller_test.go index a24103b88..91583354f 100644 --- a/operator/informer_controller_test.go +++ b/operator/informer_controller_test.go @@ -10,10 +10,13 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/grafana/grafana-app-sdk/metrics" "github.com/grafana/grafana-app-sdk/resource" ) @@ -1394,3 +1397,86 @@ func TestInformerController_Run_WithRetryProcessorDequeue(t *testing.T) { assert.Equal(t, int64(1), addCalls.Load(), "add retry should have been dequeued by update") }) } + +func TestInformerController_PrometheusCollectors(t *testing.T) { + c := NewInformerController(InformerControllerConfig{ + MetricsConfig: metrics.DefaultConfig("test"), + }) + collectors := c.PrometheusCollectors() + assert.ElementsMatch(t, []prometheus.Collector{ + c.totalEvents, + c.reconcileLatency, + c.inflightEvents, + c.inflightActions, + c.reconcilerLatency, + c.watcherLatency, + c.reconcilerErrors, + c.watcherErrors, + }, collectors) +} + +func TestInformerController_Metrics_ReconcilerErrors(t *testing.T) { + kind := "foo" + inf := &testInformer{} + c := NewInformerController(InformerControllerConfig{ + MetricsConfig: metrics.DefaultConfig("test"), + // No RetryPolicy so errors are dropped after first attempt — keeps the test simple. + RetryPolicy: func(_ error, _ int) (bool, time.Duration) { return false, 0 }, + }) + c.AddReconciler(&SimpleReconciler{ + ReconcileFunc: func(_ context.Context, _ ReconcileRequest) (ReconcileResult, error) { + return ReconcileResult{}, errors.New("reconcile error") + }, + }, kind) + c.AddInformer(inf, kind) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go c.Run(ctx) + + inf.FireAdd(context.Background(), emptyObject) + + assert.Equal(t, float64(1), counterValueFromVec(t, c.reconcilerErrors, string(ResourceActionCreate), "")) +} + +func TestInformerController_Metrics_WatcherErrors(t *testing.T) { + kind := "foo" + inf := &testInformer{} + c := NewInformerController(InformerControllerConfig{ + MetricsConfig: metrics.DefaultConfig("test"), + RetryPolicy: func(_ error, _ int) (bool, time.Duration) { return false, 0 }, + }) + c.AddWatcher(&SimpleWatcher{ + AddFunc: func(_ context.Context, _ resource.Object) error { + return errors.New("watcher add error") + }, + UpdateFunc: func(_ context.Context, _, _ resource.Object) error { + return errors.New("watcher update error") + }, + DeleteFunc: func(_ context.Context, _ resource.Object) error { + return errors.New("watcher delete error") + }, + }, kind) + c.AddInformer(inf, kind) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go c.Run(ctx) + + inf.FireAdd(context.Background(), emptyObject) + inf.FireUpdate(context.Background(), emptyObject, emptyObject) + inf.FireDelete(context.Background(), emptyObject) + + assert.Equal(t, float64(1), counterValueFromVec(t, c.watcherErrors, string(ResourceActionCreate), "")) + assert.Equal(t, float64(1), counterValueFromVec(t, c.watcherErrors, string(ResourceActionUpdate), "")) + assert.Equal(t, float64(1), counterValueFromVec(t, c.watcherErrors, string(ResourceActionDelete), "")) +} + +func counterValueFromVec(t *testing.T, cv *prometheus.CounterVec, labelValues ...string) float64 { + t.Helper() + m, err := cv.GetMetricWithLabelValues(labelValues...) + require.NoError(t, err) + var pb dto.Metric + require.NoError(t, m.Write(&pb)) + return pb.GetCounter().GetValue() +} diff --git a/operator/retry_processor.go b/operator/retry_processor.go index f3c78047e..54605367e 100644 --- a/operator/retry_processor.go +++ b/operator/retry_processor.go @@ -7,7 +7,9 @@ import ( "time" "github.com/cespare/xxhash/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/grafana-app-sdk/metrics" "github.com/grafana/grafana-app-sdk/resource" ) @@ -50,6 +52,9 @@ type RetryProcessorConfig struct { WorkerPoolSize int // CheckInterval is how often workers check for due retries. Default: 1s. CheckInterval time.Duration + // MetricsConfig controls the namespace and histogram settings for Prometheus metrics. + // Metrics are always created; an empty Namespace produces unprefixed metric names. + MetricsConfig metrics.Config } // NewRetryProcessor creates a new defaultRetryProcessor. @@ -74,22 +79,73 @@ func NewRetryProcessor(cfg RetryProcessorConfig, retryPolicyFn func() RetryPolic } } - return &defaultRetryProcessor{ + p := &defaultRetryProcessor{ workers: workers, workerCount: uint64(cfg.WorkerPoolSize), retryPolicyFn: retryPolicyFn, } + + ns := cfg.MetricsConfig.Namespace + m := &retryProcessorMetrics{ + enqueuedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: "retry_processor", + Name: "enqueued_total", + Help: "Total number of retry requests enqueued, by triggering action.", + }, []string{"action"}), + executionsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: "retry_processor", + Name: "executions_total", + Help: "Total number of retry executions by action and result. result: success=no error and no requeue, requeue=explicit RequeueAfter returned, retry=error with policy allowing another attempt, failed=error with policy exhausted.", + }, []string{"action", "result"}), + pendingTotal: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: ns, + Subsystem: "retry_processor", + Name: "pending_total", + Help: "Current number of retry requests waiting across all workers.", + }, func() float64 { return float64(p.Len()) }), + waitDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: ns, + Subsystem: "retry_processor", + Name: "queue_wait_duration_seconds", + Help: "Time between a retry's scheduled RetryAfter time and when it actually executes.", + Buckets: metrics.LatencyBuckets, + NativeHistogramBucketFactor: cfg.MetricsConfig.NativeHistogramBucketFactor, + NativeHistogramMaxBucketNumber: cfg.MetricsConfig.NativeHistogramMaxBucketNumber, + NativeHistogramMinResetDuration: time.Hour, + }, []string{"action"}), + } + p.processorMetrics = m + for _, w := range p.workers { + w.processorMetrics = m + } + + return p } // defaultRetryProcessor implements RetryProcessor using a sharded worker pool. type defaultRetryProcessor struct { - workers []*retryWorker - workerCount uint64 - retryPolicyFn func() RetryPolicy + workers []*retryWorker + workerCount uint64 + retryPolicyFn func() RetryPolicy + processorMetrics *retryProcessorMetrics +} + +// PrometheusCollectors implements metrics.Provider. +func (p *defaultRetryProcessor) PrometheusCollectors() []prometheus.Collector { + return []prometheus.Collector{ + p.processorMetrics.enqueuedTotal, + p.processorMetrics.executionsTotal, + p.processorMetrics.pendingTotal, + p.processorMetrics.waitDuration, + } } // Enqueue adds a retry request, routing it to a worker based on hash(key). func (p *defaultRetryProcessor) Enqueue(req RetryRequest) { + p.processorMetrics.incEnqueuedTotal(req.Action) + w := p.workers[xxhash.Sum64([]byte(req.Key))%p.workerCount] w.mu.Lock() heap.Push(&w.queue, req) @@ -170,10 +226,11 @@ func (p *defaultRetryProcessor) Len() int { // retryWorker processes retries for its shard. type retryWorker struct { - mu sync.Mutex - queue retryPriorityQueue - wake chan struct{} - checkInterval time.Duration + mu sync.Mutex + queue retryPriorityQueue + wake chan struct{} + checkInterval time.Duration + processorMetrics *retryProcessorMetrics } // run executes the worker loop, processing due retries on wake signals or periodic ticks. @@ -221,6 +278,7 @@ func (w *retryWorker) processReady(retryPolicyFn func() RetryPolicy) { } results := make([]result, len(ready)) for i, req := range ready { + w.processorMetrics.observeWaitDuration(req.Action, req.RetryAfter) requeue, err := req.RetryFunc() results[i] = result{req: req, requeue: requeue, err: err} } @@ -230,6 +288,7 @@ func (w *retryWorker) processReady(retryPolicyFn func() RetryPolicy) { policy := retryPolicyFn() for _, res := range results { if res.requeue != nil { + w.processorMetrics.incExecutionsTotal(res.req.Action, retryResultRequeue) heap.Push(&w.queue, RetryRequest{ Key: res.req.Key, RetryAfter: time.Now().Add(*res.requeue), @@ -239,9 +298,13 @@ func (w *retryWorker) processReady(retryPolicyFn func() RetryPolicy) { Object: res.req.Object, LastError: res.err, }) - } else if res.err != nil && policy != nil { - ok, after := policy(res.err, res.req.Attempt+1) + } else if res.err != nil { + ok, after := false, time.Duration(0) + if policy != nil { + ok, after = policy(res.err, res.req.Attempt+1) + } if ok { + w.processorMetrics.incExecutionsTotal(res.req.Action, retryResultRetry) heap.Push(&w.queue, RetryRequest{ Key: res.req.Key, RetryAfter: time.Now().Add(after), @@ -251,7 +314,11 @@ func (w *retryWorker) processReady(retryPolicyFn func() RetryPolicy) { Object: res.req.Object, LastError: res.err, }) + } else { + w.processorMetrics.incExecutionsTotal(res.req.Action, retryResultFailed) } + } else { + w.processorMetrics.incExecutionsTotal(res.req.Action, retryResultSuccess) } } w.mu.Unlock() @@ -271,3 +338,31 @@ func (pq *retryPriorityQueue) Pop() any { *pq = old[:n-1] return item } + +// retryResultLabel values for the "result" label on retry_processor_executions_total. +const ( + retryResultSuccess = "success" + retryResultRequeue = "requeue" + retryResultRetry = "retry" + retryResultFailed = "failed" +) + +// retryProcessorMetrics holds the Prometheus metric instruments shared across a processor and its workers. +type retryProcessorMetrics struct { + enqueuedTotal *prometheus.CounterVec + executionsTotal *prometheus.CounterVec + pendingTotal prometheus.GaugeFunc + waitDuration *prometheus.HistogramVec +} + +func (m *retryProcessorMetrics) incEnqueuedTotal(action ResourceAction) { + m.enqueuedTotal.WithLabelValues(string(action)).Inc() +} + +func (m *retryProcessorMetrics) incExecutionsTotal(action ResourceAction, result string) { + m.executionsTotal.WithLabelValues(string(action), result).Inc() +} + +func (m *retryProcessorMetrics) observeWaitDuration(action ResourceAction, scheduledAt time.Time) { + m.waitDuration.WithLabelValues(string(action)).Observe(time.Since(scheduledAt).Seconds()) +} diff --git a/operator/retry_processor_test.go b/operator/retry_processor_test.go index 1ba6ee78b..1dda6be73 100644 --- a/operator/retry_processor_test.go +++ b/operator/retry_processor_test.go @@ -8,8 +8,12 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/grafana/grafana-app-sdk/metrics" ) func TestRetryProcessor_HashDistribution(t *testing.T) { @@ -17,9 +21,9 @@ func TestRetryProcessor_HashDistribution(t *testing.T) { // and different keys can coexist. We verify this indirectly: enqueue several items // with the same key and different keys, then check Len(). tests := []struct { - name string - enqueue []RetryRequest - expectedLen int + name string + enqueue []RetryRequest + expectedLen int }{ { name: "single key, multiple items", @@ -436,6 +440,190 @@ func TestRetryProcessor_DequeueDoesNotBlockOnExecution(t *testing.T) { assert.Less(t, elapsed, 50*time.Millisecond, "Dequeue should return quickly, not block on executing retry") } +func TestRetryProcessor_PrometheusCollectors(t *testing.T) { + p := NewRetryProcessor(RetryProcessorConfig{ + MetricsConfig: metrics.DefaultConfig("test"), + }, nil).(*defaultRetryProcessor) + + assert.ElementsMatch(t, []prometheus.Collector{ + p.processorMetrics.enqueuedTotal, + p.processorMetrics.executionsTotal, + p.processorMetrics.pendingTotal, + p.processorMetrics.waitDuration, + }, p.PrometheusCollectors()) +} + +func TestRetryProcessor_Metrics_EnqueuedTotal(t *testing.T) { + p := NewRetryProcessor(RetryProcessorConfig{ + WorkerPoolSize: 1, + CheckInterval: time.Hour, // prevent firing + MetricsConfig: metrics.DefaultConfig("test"), + }, nil).(*defaultRetryProcessor) + + noop := func() (*time.Duration, error) { return nil, nil } + p.Enqueue(RetryRequest{Key: "a", RetryAfter: time.Now().Add(time.Hour), Action: ResourceActionCreate, RetryFunc: noop}) + p.Enqueue(RetryRequest{Key: "b", RetryAfter: time.Now().Add(time.Hour), Action: ResourceActionCreate, RetryFunc: noop}) + p.Enqueue(RetryRequest{Key: "c", RetryAfter: time.Now().Add(time.Hour), Action: ResourceActionUpdate, RetryFunc: noop}) + + assert.Equal(t, 2.0, counterValue(t, p.processorMetrics.enqueuedTotal, string(ResourceActionCreate))) + assert.Equal(t, 1.0, counterValue(t, p.processorMetrics.enqueuedTotal, string(ResourceActionUpdate))) +} + +func TestRetryProcessor_Metrics_ExecutionResults(t *testing.T) { + tests := []struct { + name string + retryFunc func(done chan struct{}) func() (*time.Duration, error) + retryPolicy RetryPolicy + wantResult string + }{ + { + name: "success", + retryFunc: func(done chan struct{}) func() (*time.Duration, error) { + return func() (*time.Duration, error) { + close(done) + return nil, nil + } + }, + retryPolicy: func(_ error, _ int) (bool, time.Duration) { return false, 0 }, + wantResult: retryResultSuccess, + }, + { + name: "failed", + retryFunc: func(done chan struct{}) func() (*time.Duration, error) { + return func() (*time.Duration, error) { + close(done) + return nil, fmt.Errorf("boom") + } + }, + retryPolicy: func(_ error, _ int) (bool, time.Duration) { return false, 0 }, + wantResult: retryResultFailed, + }, + { + name: "retry", + retryFunc: func(done chan struct{}) func() (*time.Duration, error) { + var calls atomic.Int64 + return func() (*time.Duration, error) { + if calls.Add(1) == 1 { + return nil, fmt.Errorf("transient") + } + close(done) + return nil, nil + } + }, + retryPolicy: func(_ error, attempt int) (bool, time.Duration) { return attempt <= 1, time.Millisecond }, + wantResult: retryResultRetry, + }, + { + name: "requeue", + retryFunc: func(done chan struct{}) func() (*time.Duration, error) { + var calls atomic.Int64 + return func() (*time.Duration, error) { + if calls.Add(1) == 1 { + d := time.Millisecond + return &d, nil + } + close(done) + return nil, nil + } + }, + retryPolicy: func(_ error, _ int) (bool, time.Duration) { return false, 0 }, + wantResult: retryResultRequeue, + }, + { + name: "failed with nil policy", + retryFunc: func(done chan struct{}) func() (*time.Duration, error) { + return func() (*time.Duration, error) { + close(done) + return nil, fmt.Errorf("boom") + } + }, + retryPolicy: nil, + wantResult: retryResultFailed, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + done := make(chan struct{}) + + p := NewRetryProcessor(RetryProcessorConfig{ + WorkerPoolSize: 1, + CheckInterval: 5 * time.Millisecond, + MetricsConfig: metrics.DefaultConfig("test"), + }, func() RetryPolicy { return tt.retryPolicy }).(*defaultRetryProcessor) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go p.Run(ctx) + + p.Enqueue(RetryRequest{ + Key: "obj", + Action: ResourceActionCreate, + RetryAfter: time.Now(), + RetryFunc: tt.retryFunc(done), + }) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for execution to complete") + } + + // incExecutionsTotal is recorded in Phase 3, after RetryFunc returns. Poll until it lands. + assert.Eventually(t, func() bool { + return counterValue(t, p.processorMetrics.executionsTotal, string(ResourceActionCreate), tt.wantResult) == 1.0 + }, time.Second, time.Millisecond, "executionsTotal[%s]", tt.wantResult) + }) + } +} + +func TestRetryProcessor_Metrics_WaitDurationObserved(t *testing.T) { + done := make(chan struct{}) + + p := NewRetryProcessor(RetryProcessorConfig{ + WorkerPoolSize: 1, + CheckInterval: 5 * time.Millisecond, + MetricsConfig: metrics.DefaultConfig("test"), + }, nil).(*defaultRetryProcessor) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go p.Run(ctx) + + p.Enqueue(RetryRequest{ + Key: "obj", + Action: ResourceActionCreate, + RetryAfter: time.Now(), + RetryFunc: func() (*time.Duration, error) { + close(done) + return nil, nil + }, + }) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for execution") + } + + // observeWaitDuration is called before RetryFunc in Phase 2, so it's already recorded. + m, err := p.processorMetrics.waitDuration.GetMetricWithLabelValues(string(ResourceActionCreate)) + require.NoError(t, err) + var pb dto.Metric + require.NoError(t, m.(prometheus.Metric).Write(&pb)) + assert.EqualValues(t, 1, pb.GetHistogram().GetSampleCount(), "waitDuration should have one observation") +} + +// counterValue reads the current float64 value of a CounterVec for the given label values. +func counterValue(t *testing.T, cv *prometheus.CounterVec, labelValues ...string) float64 { + t.Helper() + m, err := cv.GetMetricWithLabelValues(labelValues...) + require.NoError(t, err) + var pb dto.Metric + require.NoError(t, m.Write(&pb)) + return pb.GetCounter().GetValue() +} + // --- Benchmarks --- func BenchmarkRetryProcessor_Throughput(b *testing.B) {