Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion operator/informer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ type InformerController struct {
watcherLatency *prometheus.HistogramVec
inflightActions *prometheus.GaugeVec
inflightEvents *prometheus.GaugeVec
reconcilerErrors *prometheus.CounterVec
watcherErrors *prometheus.CounterVec
}

type retryInfo struct {
Expand Down Expand Up @@ -220,6 +222,18 @@ func NewInformerController(cfg InformerControllerConfig) *InformerController {
Namespace: cfg.MetricsConfig.Namespace,
Help: "Current number of events which have active reconcile processes",
}, []string{"event_type", "kind"}),
reconcilerErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: cfg.MetricsConfig.Namespace,
Subsystem: "reconciler",
Name: "errors_total",
Help: "Total number of reconciler errors, before any retry.",
}, []string{"event_type", "kind"}),
watcherErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: cfg.MetricsConfig.Namespace,
Subsystem: "watcher",
Name: "errors_total",
Help: "Total number of watcher errors, before any retry.",
}, []string{"event_type", "kind"}),
}
if cfg.ErrorHandler != nil {
inf.ErrorHandler = cfg.ErrorHandler
Expand All @@ -232,7 +246,10 @@ func NewInformerController(cfg InformerControllerConfig) *InformerController {
}
if cfg.RetryProcessorConfig != nil {
retryPolicyFn := func() RetryPolicy { return inf.RetryPolicy }
inf.retryProcessor = NewRetryProcessor(*cfg.RetryProcessorConfig, retryPolicyFn)
processorCfg := *cfg.RetryProcessorConfig
// Inherit the controller's MetricsConfig so all metrics share a consistent namespace.
processorCfg.MetricsConfig = cfg.MetricsConfig
inf.retryProcessor = NewRetryProcessor(processorCfg, retryPolicyFn)
}
return inf
}
Expand Down Expand Up @@ -352,6 +369,10 @@ func (c *InformerController) Run(ctx context.Context) error {
func (c *InformerController) PrometheusCollectors() []prometheus.Collector {
collectors := []prometheus.Collector{
c.totalEvents, c.reconcileLatency, c.inflightEvents, c.inflightActions, c.reconcilerLatency, c.watcherLatency,
c.reconcilerErrors, c.watcherErrors,
}
if cast, ok := c.retryProcessor.(metrics.Provider); ok {
collectors = append(collectors, cast.PrometheusCollectors()...)
}
c.informers.RangeAll(func(_ string, _ int, value Informer) {
if cast, ok := value.(metrics.Provider); ok {
Expand Down Expand Up @@ -424,6 +445,9 @@ func (c *InformerController) informerAddFunc(resourceKind string) func(context.C
// Do the watcher's Add, check for error
c.wrapWatcherCall(string(ResourceActionCreate), obj.GetStaticMetadata().Kind, func() {
err := watcher.Add(ctx, obj)
if err != nil {
c.watcherErrors.WithLabelValues(string(ResourceActionCreate), obj.GetStaticMetadata().Kind).Inc()
}
if err != nil && c.ErrorHandler != nil {
c.ErrorHandler(ctx, err) // TODO: improve ErrorHandler
}
Expand Down Expand Up @@ -479,6 +503,9 @@ func (c *InformerController) informerUpdateFunc(resourceKind string) func(contex
// Do the watcher's Update, check for error
c.wrapWatcherCall(string(ResourceActionUpdate), newObj.GetStaticMetadata().Kind, func() {
err := watcher.Update(ctx, oldObj, newObj)
if err != nil {
c.watcherErrors.WithLabelValues(string(ResourceActionUpdate), newObj.GetStaticMetadata().Kind).Inc()
}
if err != nil && c.ErrorHandler != nil {
c.ErrorHandler(ctx, err)
}
Expand Down Expand Up @@ -543,6 +570,9 @@ func (c *InformerController) informerDeleteFunc(resourceKind string) func(contex
// Do the watcher's Delete, check for error
c.wrapWatcherCall(string(ResourceActionDelete), obj.GetStaticMetadata().Kind, func() {
err := watcher.Delete(ctx, obj)
if err != nil {
c.watcherErrors.WithLabelValues(string(ResourceActionDelete), obj.GetStaticMetadata().Kind).Inc()
}
if err != nil && c.ErrorHandler != nil {
c.ErrorHandler(ctx, err) // TODO: improve ErrorHandler
}
Expand Down Expand Up @@ -647,6 +677,7 @@ func (c *InformerController) doReconcile(ctx context.Context, reconciler Reconci
}
} else if err != nil {
// Otherwise, if err is non-nil, queue a retry according to the RetryPolicy
c.reconcilerErrors.WithLabelValues(string(action), req.Object.GetStaticMetadata().Kind).Inc()
if c.ErrorHandler != nil {
// Call the ErrorHandler function as well if it's set
c.ErrorHandler(ctx, err)
Expand Down
86 changes: 86 additions & 0 deletions operator/informer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/grafana/grafana-app-sdk/metrics"
"github.com/grafana/grafana-app-sdk/resource"
)

Expand Down Expand Up @@ -1394,3 +1397,86 @@ func TestInformerController_Run_WithRetryProcessorDequeue(t *testing.T) {
assert.Equal(t, int64(1), addCalls.Load(), "add retry should have been dequeued by update")
})
}

func TestInformerController_PrometheusCollectors(t *testing.T) {
c := NewInformerController(InformerControllerConfig{
MetricsConfig: metrics.DefaultConfig("test"),
})
collectors := c.PrometheusCollectors()
assert.ElementsMatch(t, []prometheus.Collector{
c.totalEvents,
c.reconcileLatency,
c.inflightEvents,
c.inflightActions,
c.reconcilerLatency,
c.watcherLatency,
c.reconcilerErrors,
c.watcherErrors,
}, collectors)
}

func TestInformerController_Metrics_ReconcilerErrors(t *testing.T) {
kind := "foo"
inf := &testInformer{}
c := NewInformerController(InformerControllerConfig{
MetricsConfig: metrics.DefaultConfig("test"),
// No RetryPolicy so errors are dropped after first attempt — keeps the test simple.
RetryPolicy: func(_ error, _ int) (bool, time.Duration) { return false, 0 },
})
c.AddReconciler(&SimpleReconciler{
ReconcileFunc: func(_ context.Context, _ ReconcileRequest) (ReconcileResult, error) {
return ReconcileResult{}, errors.New("reconcile error")
},
}, kind)
c.AddInformer(inf, kind)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.Run(ctx)

inf.FireAdd(context.Background(), emptyObject)

assert.Equal(t, float64(1), counterValueFromVec(t, c.reconcilerErrors, string(ResourceActionCreate), ""))
}

func TestInformerController_Metrics_WatcherErrors(t *testing.T) {
kind := "foo"
inf := &testInformer{}
c := NewInformerController(InformerControllerConfig{
MetricsConfig: metrics.DefaultConfig("test"),
RetryPolicy: func(_ error, _ int) (bool, time.Duration) { return false, 0 },
})
c.AddWatcher(&SimpleWatcher{
AddFunc: func(_ context.Context, _ resource.Object) error {
return errors.New("watcher add error")
},
UpdateFunc: func(_ context.Context, _, _ resource.Object) error {
return errors.New("watcher update error")
},
DeleteFunc: func(_ context.Context, _ resource.Object) error {
return errors.New("watcher delete error")
},
}, kind)
c.AddInformer(inf, kind)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.Run(ctx)

inf.FireAdd(context.Background(), emptyObject)
inf.FireUpdate(context.Background(), emptyObject, emptyObject)
inf.FireDelete(context.Background(), emptyObject)

assert.Equal(t, float64(1), counterValueFromVec(t, c.watcherErrors, string(ResourceActionCreate), ""))
assert.Equal(t, float64(1), counterValueFromVec(t, c.watcherErrors, string(ResourceActionUpdate), ""))
assert.Equal(t, float64(1), counterValueFromVec(t, c.watcherErrors, string(ResourceActionDelete), ""))
}

func counterValueFromVec(t *testing.T, cv *prometheus.CounterVec, labelValues ...string) float64 {
t.Helper()
m, err := cv.GetMetricWithLabelValues(labelValues...)
require.NoError(t, err)
var pb dto.Metric
require.NoError(t, m.Write(&pb))
return pb.GetCounter().GetValue()
}
115 changes: 105 additions & 10 deletions operator/retry_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"time"

"github.com/cespare/xxhash/v2"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/grafana-app-sdk/metrics"
"github.com/grafana/grafana-app-sdk/resource"
)

Expand Down Expand Up @@ -50,6 +52,9 @@ type RetryProcessorConfig struct {
WorkerPoolSize int
// CheckInterval is how often workers check for due retries. Default: 1s.
CheckInterval time.Duration
// MetricsConfig controls the namespace and histogram settings for Prometheus metrics.
// Metrics are always created; an empty Namespace produces unprefixed metric names.
MetricsConfig metrics.Config
}

// NewRetryProcessor creates a new defaultRetryProcessor.
Expand All @@ -74,22 +79,73 @@ func NewRetryProcessor(cfg RetryProcessorConfig, retryPolicyFn func() RetryPolic
}
}

return &defaultRetryProcessor{
p := &defaultRetryProcessor{
workers: workers,
workerCount: uint64(cfg.WorkerPoolSize),
retryPolicyFn: retryPolicyFn,
}

ns := cfg.MetricsConfig.Namespace
m := &retryProcessorMetrics{
enqueuedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "retry_processor",
Name: "enqueued_total",
Help: "Total number of retry requests enqueued, by triggering action.",
}, []string{"action"}),
executionsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "retry_processor",
Name: "executions_total",
Help: "Total number of retry executions by action and result. result: success=no error and no requeue, requeue=explicit RequeueAfter returned, retry=error with policy allowing another attempt, failed=error with policy exhausted.",
}, []string{"action", "result"}),
pendingTotal: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: "retry_processor",
Name: "pending_total",
Help: "Current number of retry requests waiting across all workers.",
}, func() float64 { return float64(p.Len()) }),
waitDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Subsystem: "retry_processor",
Name: "queue_wait_duration_seconds",
Help: "Time between a retry's scheduled RetryAfter time and when it actually executes.",
Buckets: metrics.LatencyBuckets,
NativeHistogramBucketFactor: cfg.MetricsConfig.NativeHistogramBucketFactor,
NativeHistogramMaxBucketNumber: cfg.MetricsConfig.NativeHistogramMaxBucketNumber,
NativeHistogramMinResetDuration: time.Hour,
}, []string{"action"}),
}
p.processorMetrics = m
for _, w := range p.workers {
w.processorMetrics = m
}

return p
}

// defaultRetryProcessor implements RetryProcessor using a sharded worker pool.
type defaultRetryProcessor struct {
workers []*retryWorker
workerCount uint64
retryPolicyFn func() RetryPolicy
workers []*retryWorker
workerCount uint64
retryPolicyFn func() RetryPolicy
processorMetrics *retryProcessorMetrics
}

// PrometheusCollectors implements metrics.Provider.
func (p *defaultRetryProcessor) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
p.processorMetrics.enqueuedTotal,
p.processorMetrics.executionsTotal,
p.processorMetrics.pendingTotal,
p.processorMetrics.waitDuration,
}
}

// Enqueue adds a retry request, routing it to a worker based on hash(key).
func (p *defaultRetryProcessor) Enqueue(req RetryRequest) {
p.processorMetrics.incEnqueuedTotal(req.Action)

w := p.workers[xxhash.Sum64([]byte(req.Key))%p.workerCount]
w.mu.Lock()
heap.Push(&w.queue, req)
Expand Down Expand Up @@ -170,10 +226,11 @@ func (p *defaultRetryProcessor) Len() int {

// retryWorker processes retries for its shard.
type retryWorker struct {
mu sync.Mutex
queue retryPriorityQueue
wake chan struct{}
checkInterval time.Duration
mu sync.Mutex
queue retryPriorityQueue
wake chan struct{}
checkInterval time.Duration
processorMetrics *retryProcessorMetrics
}

// run executes the worker loop, processing due retries on wake signals or periodic ticks.
Expand Down Expand Up @@ -221,6 +278,7 @@ func (w *retryWorker) processReady(retryPolicyFn func() RetryPolicy) {
}
results := make([]result, len(ready))
for i, req := range ready {
w.processorMetrics.observeWaitDuration(req.Action, req.RetryAfter)
requeue, err := req.RetryFunc()
results[i] = result{req: req, requeue: requeue, err: err}
}
Expand All @@ -230,6 +288,7 @@ func (w *retryWorker) processReady(retryPolicyFn func() RetryPolicy) {
policy := retryPolicyFn()
for _, res := range results {
if res.requeue != nil {
w.processorMetrics.incExecutionsTotal(res.req.Action, retryResultRequeue)
heap.Push(&w.queue, RetryRequest{
Key: res.req.Key,
RetryAfter: time.Now().Add(*res.requeue),
Expand All @@ -239,9 +298,13 @@ func (w *retryWorker) processReady(retryPolicyFn func() RetryPolicy) {
Object: res.req.Object,
LastError: res.err,
})
} else if res.err != nil && policy != nil {
ok, after := policy(res.err, res.req.Attempt+1)
} else if res.err != nil {
ok, after := false, time.Duration(0)
if policy != nil {
ok, after = policy(res.err, res.req.Attempt+1)
}
if ok {
w.processorMetrics.incExecutionsTotal(res.req.Action, retryResultRetry)
heap.Push(&w.queue, RetryRequest{
Key: res.req.Key,
RetryAfter: time.Now().Add(after),
Expand All @@ -251,7 +314,11 @@ func (w *retryWorker) processReady(retryPolicyFn func() RetryPolicy) {
Object: res.req.Object,
LastError: res.err,
})
} else {
w.processorMetrics.incExecutionsTotal(res.req.Action, retryResultFailed)
}
} else {
w.processorMetrics.incExecutionsTotal(res.req.Action, retryResultSuccess)
}
}
w.mu.Unlock()
Expand All @@ -271,3 +338,31 @@ func (pq *retryPriorityQueue) Pop() any {
*pq = old[:n-1]
return item
}

// retryResultLabel values for the "result" label on retry_processor_executions_total.
const (
retryResultSuccess = "success"
retryResultRequeue = "requeue"
retryResultRetry = "retry"
retryResultFailed = "failed"
)

// retryProcessorMetrics holds the Prometheus metric instruments shared across a processor and its workers.
type retryProcessorMetrics struct {
enqueuedTotal *prometheus.CounterVec
executionsTotal *prometheus.CounterVec
pendingTotal prometheus.GaugeFunc
waitDuration *prometheus.HistogramVec
}

func (m *retryProcessorMetrics) incEnqueuedTotal(action ResourceAction) {
m.enqueuedTotal.WithLabelValues(string(action)).Inc()
}

func (m *retryProcessorMetrics) incExecutionsTotal(action ResourceAction, result string) {
m.executionsTotal.WithLabelValues(string(action), result).Inc()
}

func (m *retryProcessorMetrics) observeWaitDuration(action ResourceAction, scheduledAt time.Time) {
m.waitDuration.WithLabelValues(string(action)).Observe(time.Since(scheduledAt).Seconds())
}
Loading
Loading