Skip to content

Commit e0348b1

Browse files
evanfosterEvan Foster
andauthored
Add Metrics to the sync manager (#244)
* Add metrics to sync manager This PR adds metrics to the sync manager. * Generate licenses, PR comments --------- Co-authored-by: Evan Foster <[email protected]>
1 parent 10d8858 commit e0348b1

File tree

5 files changed

+292
-8
lines changed

5 files changed

+292
-8
lines changed

cmd/sync/manager/manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
registryv1 "github.com/adobe/cluster-registry/pkg/api/registry/v1"
2020
registryv1alpha1 "github.com/adobe/cluster-registry/pkg/api/registry/v1alpha1"
2121
"github.com/adobe/cluster-registry/pkg/config"
22-
monitoring "github.com/adobe/cluster-registry/pkg/monitoring/client"
22+
monitoring "github.com/adobe/cluster-registry/pkg/monitoring/manager"
2323
"github.com/adobe/cluster-registry/pkg/sqs"
2424
"github.com/adobe/cluster-registry/pkg/sync/manager"
2525
"github.com/adobe/cluster-registry/pkg/sync/parser"
@@ -161,6 +161,7 @@ func main() {
161161
WatchedGVKs: loadWatchedGVKs(syncConfig),
162162
Queue: q,
163163
ResourceParser: rp,
164+
Metrics: m,
164165
}).SetupWithManager(ctx, mgr); err != nil {
165166
setupLog.Error(err, "unable to create controller", "controller", "SyncController")
166167
os.Exit(1)

pkg/monitoring/manager/metrics.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
Copyright 2024 Adobe. All rights reserved.
3+
This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License. You may obtain a copy
5+
of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
Unless required by applicable law or agreed to in writing, software distributed under
8+
the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
OF ANY KIND, either express or implied. See the License for the specific language
10+
governing permissions and limitations under the License.
11+
*/
12+
13+
package monitoring
14+
15+
import "github.com/prometheus/client_golang/prometheus"
16+
import "github.com/prometheus/client_golang/prometheus/promauto"
17+
18+
type MetricsI interface {
19+
RecordRequeueCnt(target string)
20+
RecordReconciliationCnt(target string)
21+
RecordReconciliationDur(target string, elapsed float64)
22+
RecordEnqueueCnt(target string)
23+
RecordEnqueueDur(target string, elapsed float64)
24+
RecordErrorCnt(target string)
25+
}
26+
27+
type Metrics struct {
28+
RequeueCnt *prometheus.CounterVec
29+
ReconciliationCnt *prometheus.CounterVec
30+
ReconciliationDur *prometheus.HistogramVec
31+
EnqueueCnt *prometheus.CounterVec
32+
EnqueueDur *prometheus.HistogramVec
33+
ErrCnt *prometheus.CounterVec
34+
metrics []prometheus.Collector
35+
}
36+
37+
func NewMetrics() *Metrics {
38+
return &Metrics{}
39+
}
40+
41+
func (m *Metrics) Init(isUnitTest bool) {
42+
reg := prometheus.DefaultRegisterer
43+
if isUnitTest {
44+
reg = prometheus.NewRegistry()
45+
}
46+
var requeueCnt prometheus.Collector = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
47+
Name: "cluster_registry_sync_manager_requeues_total",
48+
Help: "The total number of controller-manager requeues partitioned by target.",
49+
}, []string{"target"})
50+
m.RequeueCnt = requeueCnt.(*prometheus.CounterVec)
51+
m.metrics = append(m.metrics, m.RequeueCnt)
52+
53+
var reconciliationCnt prometheus.Collector = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
54+
Name: "cluster_registry_sync_manager_reconciliation_total",
55+
Help: "How many reconciliations occurred, partitioned by target.",
56+
},
57+
[]string{"target"},
58+
)
59+
m.ReconciliationCnt = reconciliationCnt.(*prometheus.CounterVec)
60+
m.metrics = append(m.metrics, m.ReconciliationCnt)
61+
62+
var reconciliationDur prometheus.Collector = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
63+
Name: "cluster_registry_sync_manager_reconciliation_duration_seconds",
64+
Help: "The time taken to reconcile resources in seconds partitioned by target.",
65+
},
66+
[]string{"target"},
67+
)
68+
m.ReconciliationDur = reconciliationDur.(*prometheus.HistogramVec)
69+
m.metrics = append(m.metrics, m.ReconciliationDur)
70+
71+
var enqueueCnt prometheus.Collector = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
72+
Name: "cluster_registry_sync_manager_enqueue_total",
73+
Help: "How many reconciliations were enqueued, partitioned by target.",
74+
},
75+
[]string{"target"},
76+
)
77+
m.EnqueueCnt = enqueueCnt.(*prometheus.CounterVec)
78+
m.metrics = append(m.metrics, m.EnqueueCnt)
79+
80+
var enqueueDur prometheus.Collector = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
81+
Name: "cluster_registry_sync_manager_enqueue_duration_seconds",
82+
Help: "The time taken to enqueue a reconciliation in seconds partitioned by target.",
83+
},
84+
[]string{"target"},
85+
)
86+
m.EnqueueDur = enqueueDur.(*prometheus.HistogramVec)
87+
m.metrics = append(m.metrics, m.EnqueueDur)
88+
89+
var errorCnt prometheus.Collector = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
90+
Name: "cluster_registry_sync_manager_error_total",
91+
Help: "The total number controller-manager errors partitioned by target.",
92+
}, []string{"target"})
93+
m.ErrCnt = errorCnt.(*prometheus.CounterVec)
94+
m.metrics = append(m.metrics, m.ErrCnt)
95+
}
96+
97+
func (m *Metrics) RecordRequeueCnt(target string) {
98+
m.RequeueCnt.WithLabelValues(target).Inc()
99+
}
100+
101+
func (m *Metrics) RecordReconciliationCnt(target string) {
102+
m.ReconciliationCnt.WithLabelValues(target).Inc()
103+
}
104+
105+
func (m *Metrics) RecordReconciliationDur(target string, elapsed float64) {
106+
m.ReconciliationDur.WithLabelValues(target).Observe(elapsed)
107+
}
108+
109+
func (m *Metrics) RecordEnqueueCnt(target string) {
110+
m.EnqueueCnt.WithLabelValues(target).Inc()
111+
}
112+
113+
func (m *Metrics) RecordEnqueueDur(target string, elapsed float64) {
114+
m.EnqueueDur.WithLabelValues(target).Observe(elapsed)
115+
}
116+
117+
func (m *Metrics) RecordErrorCnt(target string) {
118+
m.ErrCnt.WithLabelValues(target).Inc()
119+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
Copyright 2024 Adobe. All rights reserved.
3+
This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License. You may obtain a copy
5+
of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
Unless required by applicable law or agreed to in writing, software distributed under
8+
the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
OF ANY KIND, either express or implied. See the License for the specific language
10+
governing permissions and limitations under the License.
11+
*/
12+
13+
package monitoring
14+
15+
import (
16+
"fmt"
17+
"github.com/prometheus/client_golang/prometheus/testutil"
18+
"github.com/stretchr/testify/assert"
19+
"math/rand"
20+
"strings"
21+
"testing"
22+
)
23+
24+
const (
25+
clusterSyncTarget = "orgnumber-env-region-cluster-sync"
26+
subsystem = "cluster_registry_sync_manager"
27+
minRand = 1
28+
maxRand = 2.5
29+
)
30+
31+
// Generate a random float number between min and max
32+
func generateFloatRand(min, max float64) float64 {
33+
return min + rand.Float64()*(max-min)
34+
}
35+
36+
// Generate what we expect a histogram of some random number to look like. metricTopic is what the metric is about, e.g.
37+
// reconciliation or enqueue. helpString is the literal help string from metrics.go. I'd grab this myself, but it's not
38+
// exposed in the HistogramVec object AFAICT :(
39+
func generateExpectedHistogram(randomFloat float64, metricTopic string, helpString string) string {
40+
expected := fmt.Sprintf(`
41+
# HELP %[1]s_%[5]s_duration_seconds %[4]s
42+
# TYPE %[1]s_%[5]s_duration_seconds histogram
43+
%[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.005"} 0
44+
%[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.01"} 0
45+
%[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.025"} 0
46+
%[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.05"} 0
47+
%[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.1"} 0
48+
%[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.25"} 0
49+
%[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="0.5"} 0
50+
%[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="1"} 0
51+
%[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="2.5"} 1
52+
%[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="5"} 1
53+
%[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="10"} 1
54+
%[1]s_%[5]s_duration_seconds_bucket{target="%[2]s",le="+Inf"} 1
55+
%[1]s_%[5]s_duration_seconds_sum{target="%[2]s"} %[3]s
56+
%[1]s_%[5]s_duration_seconds_count{target="%[2]s"} 1
57+
`, subsystem, clusterSyncTarget, fmt.Sprintf("%.16f", randomFloat), helpString, metricTopic)
58+
return expected
59+
}
60+
61+
func TestNewMetrics(t *testing.T) {
62+
test := assert.New(t)
63+
m := NewMetrics()
64+
test.NotNil(m)
65+
}
66+
67+
func TestInit(t *testing.T) {
68+
test := assert.New(t)
69+
m := NewMetrics()
70+
m.Init(true)
71+
test.NotNil(m.RequeueCnt)
72+
test.NotNil(m.ReconciliationCnt)
73+
test.NotNil(m.ReconciliationDur)
74+
test.NotNil(m.EnqueueCnt)
75+
test.NotNil(m.EnqueueDur)
76+
test.NotNil(m.ErrCnt)
77+
}
78+
79+
func TestRecordRequeueCnt(t *testing.T) {
80+
test := assert.New(t)
81+
m := NewMetrics()
82+
m.Init(true)
83+
m.RecordRequeueCnt(clusterSyncTarget)
84+
test.Equal(1, testutil.CollectAndCount(*m.RequeueCnt))
85+
test.Equal(float64(1), testutil.ToFloat64((*m.RequeueCnt).WithLabelValues(clusterSyncTarget)))
86+
}
87+
88+
func TestRecordReconciliationCnt(t *testing.T) {
89+
test := assert.New(t)
90+
m := NewMetrics()
91+
m.Init(true)
92+
m.RecordReconciliationCnt(clusterSyncTarget)
93+
test.Equal(1, testutil.CollectAndCount(*m.ReconciliationCnt))
94+
test.Equal(float64(1), testutil.ToFloat64((*m.ReconciliationCnt).WithLabelValues(clusterSyncTarget)))
95+
}
96+
97+
func TestRecordReconciliationDur(t *testing.T) {
98+
m := NewMetrics()
99+
m.Init(true)
100+
randomFloat := generateFloatRand(minRand, maxRand)
101+
m.RecordReconciliationDur(clusterSyncTarget, randomFloat)
102+
expected := generateExpectedHistogram(randomFloat, "reconciliation", "The time taken to reconcile resources in seconds partitioned by target.")
103+
if err := testutil.CollectAndCompare(
104+
*m.ReconciliationDur,
105+
strings.NewReader(expected),
106+
fmt.Sprintf("%s_%s_duration_seconds", subsystem, "reconciliation")); err != nil {
107+
t.Errorf("unexpected collecting result:\n%s", err)
108+
}
109+
110+
}
111+
112+
func TestRecordEnqueueCnt(t *testing.T) {
113+
test := assert.New(t)
114+
m := NewMetrics()
115+
m.Init(true)
116+
m.RecordEnqueueCnt(clusterSyncTarget)
117+
test.Equal(1, testutil.CollectAndCount(*m.EnqueueCnt))
118+
test.Equal(float64(1), testutil.ToFloat64((*m.EnqueueCnt).WithLabelValues(clusterSyncTarget)))
119+
120+
}
121+
122+
func TestRecordEnqueueDur(t *testing.T) {
123+
m := NewMetrics()
124+
m.Init(true)
125+
randomFloat := generateFloatRand(minRand, maxRand)
126+
m.RecordEnqueueDur(clusterSyncTarget, randomFloat)
127+
expected := generateExpectedHistogram(randomFloat, "enqueue", "The time taken to enqueue a reconciliation in seconds partitioned by target.")
128+
if err := testutil.CollectAndCompare(
129+
*m.EnqueueDur,
130+
strings.NewReader(expected),
131+
fmt.Sprintf("%s_%s_duration_seconds", subsystem, "enqueue")); err != nil {
132+
t.Errorf("unexpected collecting result:\n%s", err)
133+
}
134+
135+
}
136+
137+
func TestRecordErrCnt(t *testing.T) {
138+
test := assert.New(t)
139+
m := NewMetrics()
140+
m.Init(true)
141+
m.RecordErrorCnt(clusterSyncTarget)
142+
test.Equal(1, testutil.CollectAndCount(*m.ErrCnt))
143+
test.Equal(float64(1), testutil.ToFloat64((*m.ErrCnt).WithLabelValues(clusterSyncTarget)))
144+
145+
}

pkg/sync/manager/controller.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"context"
1717
v1 "github.com/adobe/cluster-registry/pkg/api/registry/v1"
1818
registryv1alpha1 "github.com/adobe/cluster-registry/pkg/api/registry/v1alpha1"
19+
monitoring "github.com/adobe/cluster-registry/pkg/monitoring/manager"
1920
"github.com/adobe/cluster-registry/pkg/sqs"
2021
"github.com/adobe/cluster-registry/pkg/sync/parser"
2122
"github.com/aws/aws-sdk-go/aws"
@@ -54,19 +55,26 @@ type SyncController struct {
5455
WatchedGVKs []schema.GroupVersionKind
5556
Queue *sqs.Config
5657
ResourceParser *parser.ResourceParser
58+
Metrics monitoring.MetricsI
5759
}
5860

5961
func (c *SyncController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
6062
var start = time.Now()
6163

6264
log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
6365
log.Info("start")
64-
defer log.Info("end", "duration", time.Since(start))
66+
defer func() {
67+
elapsed := time.Since(start)
68+
log.Info("end", "duration", elapsed)
69+
c.Metrics.RecordReconciliationDur(req.Name, float64(elapsed)/float64(time.Second))
70+
c.Metrics.RecordReconciliationCnt(req.Name)
71+
}()
6572

6673
instance := new(registryv1alpha1.ClusterSync)
6774
if err := c.Get(ctx, req.NamespacedName, instance); err != nil {
75+
c.Metrics.RecordErrorCnt(req.Name)
6876
log.Error(err, "unable to fetch object")
69-
return requeueIfError(client.IgnoreNotFound(err))
77+
return requeueIfError(c, req, client.IgnoreNotFound(err))
7078
}
7179

7280
if instance.ObjectMeta.DeletionTimestamp != nil {
@@ -100,15 +108,17 @@ func (c *SyncController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
100108
instance.Status.LastSyncError = ptr.To(errList[0].Error())
101109
instance.Status.LastSyncTime = &metav1.Time{Time: time.Now()}
102110
log.Error(errList[0], "failed to sync resources")
111+
c.Metrics.RecordErrorCnt(req.Name)
103112

104113
if err := c.updateStatus(ctx, instance); err != nil {
105-
return requeueAfter(10*time.Second, err)
114+
return requeueAfter(c, req, 10*time.Second, err)
106115
}
107116
return noRequeue()
108117
}
109118

110119
syncedData, err := c.ResourceParser.Diff()
111120
if err != nil {
121+
c.Metrics.RecordErrorCnt(req.Name)
112122
return noRequeue()
113123
}
114124

@@ -119,13 +129,15 @@ func (c *SyncController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
119129
instance.Status.LastSyncTime = &metav1.Time{Time: time.Now()}
120130
if err := c.enqueueData(instance); err != nil {
121131
log.Error(err, "failed to enqueue message")
132+
c.Metrics.RecordErrorCnt(req.Name)
122133
if err := c.updateStatus(ctx, instance); err != nil {
123-
return requeueAfter(10*time.Second, err)
134+
return requeueAfter(c, req, 10*time.Second, err)
124135
}
125136
return noRequeue()
126137
}
127138
if err := c.updateStatus(ctx, instance); err != nil {
128-
return requeueAfter(10*time.Second, err)
139+
c.Metrics.RecordErrorCnt(req.Name)
140+
return requeueAfter(c, req, 10*time.Second, err)
129141
}
130142
return noRequeue()
131143
}
@@ -259,6 +271,7 @@ func (c *SyncController) enqueueRequestsFromMapFunc(gvk schema.GroupVersionKind)
259271
break
260272
}
261273
}
274+
// TODO consider adding error handling/metrics if we don't find our object
262275

263276
return requests
264277
}
@@ -294,6 +307,8 @@ func (c *SyncController) enqueueData(instance *registryv1alpha1.ClusterSync) err
294307
})
295308
elapsed := float64(time.Since(start)) / float64(time.Second)
296309
c.Log.Info("Enqueue time", "time", elapsed)
310+
c.Metrics.RecordEnqueueDur(instance.Name, elapsed)
311+
c.Metrics.RecordEnqueueCnt(instance.Name)
297312

298313
if err != nil {
299314
return err

pkg/sync/manager/result.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@ import (
1717
"time"
1818
)
1919

20-
func requeueIfError(err error) (ctrl.Result, error) {
20+
func requeueIfError(c *SyncController, req ctrl.Request, err error) (ctrl.Result, error) {
21+
if err != nil {
22+
c.Metrics.RecordRequeueCnt(req.Name)
23+
}
2124
return ctrl.Result{}, err
2225
}
2326

2427
func noRequeue() (ctrl.Result, error) {
2528
return ctrl.Result{}, nil
2629
}
2730

28-
func requeueAfter(interval time.Duration, err error) (ctrl.Result, error) {
31+
func requeueAfter(c *SyncController, req ctrl.Request, interval time.Duration, err error) (ctrl.Result, error) {
32+
c.Metrics.RecordRequeueCnt(req.Name)
2933
return ctrl.Result{RequeueAfter: interval}, err
3034
}

0 commit comments

Comments
 (0)