Skip to content

Commit ffe83c6

Browse files
authored
Merge pull request #4 from xiaozongyang/dev/refactor_to_generate_metrics
Always generate new metric samples to delete outdated node resources
2 parents b983ab3 + 658e41e commit ffe83c6

File tree

1 file changed

+92
-112
lines changed

1 file changed

+92
-112
lines changed

main.go

+92-112
Original file line numberDiff line numberDiff line change
@@ -5,57 +5,24 @@ import (
55
"log"
66
"net/http"
77
"os"
8+
"sync"
89
"time"
910

1011
"github.com/prometheus/client_golang/prometheus"
1112
"github.com/prometheus/client_golang/prometheus/promhttp"
1213
v1 "k8s.io/api/core/v1"
1314
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415
"k8s.io/apimachinery/pkg/fields"
15-
"k8s.io/apimachinery/pkg/watch"
1616
"k8s.io/client-go/kubernetes"
1717
"k8s.io/client-go/rest"
1818
)
1919

2020
var (
21-
namespace = "kube_node_metrics"
22-
commonLabels = []string{"node", "ip"}
23-
24-
cpuRequests = prometheus.NewGaugeVec(
25-
prometheus.GaugeOpts{
26-
Namespace: namespace,
27-
Name: "cpu_reqeust",
28-
Help: "Total CPU requests of all pods running on the node",
29-
},
30-
commonLabels,
31-
)
32-
33-
cpuLimits = prometheus.NewGaugeVec(
34-
prometheus.GaugeOpts{
35-
Namespace: namespace,
36-
Name: "cpu_limit",
37-
Help: "Total CPU limits of all pods running on the node",
38-
},
39-
commonLabels,
40-
)
41-
42-
memRequests = prometheus.NewGaugeVec(
43-
prometheus.GaugeOpts{
44-
Namespace: namespace,
45-
Name: "memory_request_bytes",
46-
Help: "Total memory requests of all pods running on the node",
47-
},
48-
commonLabels,
49-
)
50-
51-
memLimits = prometheus.NewGaugeVec(
52-
prometheus.GaugeOpts{
53-
Namespace: namespace,
54-
Name: "memory_limit_bytes",
55-
Help: "Total memory limits of all pods running on the node",
56-
},
57-
commonLabels,
58-
)
21+
namespace = "kube_node_metrics"
22+
metricNameCpuRequests = "cpu_requests"
23+
metricNameCpuLimits = "cpu_limits"
24+
metricNameMemRequests = "mem_requests"
25+
metricNameMemLimits = "mem_limits"
5926

6027
lastFullSyncOkTimeSeconds = prometheus.NewGauge(
6128
prometheus.GaugeOpts{
@@ -93,13 +60,8 @@ var (
9360
type nodeMeta struct {
9461
ip string
9562
name string
96-
}
9763

98-
type nodeMetrics struct {
99-
cpuRequests prometheus.Gauge
100-
cpuLimits prometheus.Gauge
101-
memRequests prometheus.Gauge
102-
memLimits prometheus.Gauge
64+
labels map[string]string
10365
}
10466

10567
type nodeResources struct {
@@ -108,10 +70,77 @@ type nodeResources struct {
10870
memRequests float64
10971
memLimits float64
11072

111-
meta *nodeMeta
112-
metrics *nodeMetrics
73+
meta *nodeMeta
74+
}
75+
76+
type nodeResourceCollector struct {
77+
name2nodeResources map[string]*nodeResources
78+
79+
rwMut sync.RWMutex
80+
}
81+
82+
func (n *nodeResourceCollector) lockedUpdate(name2nodeResources map[string]*nodeResources) {
83+
n.rwMut.Lock()
84+
defer n.rwMut.Unlock()
85+
86+
n.name2nodeResources = name2nodeResources
87+
}
88+
89+
// Collect implements prometheus.Collector.
90+
func (n *nodeResourceCollector) Collect(ch chan<- prometheus.Metric) {
91+
n.rwMut.RLock()
92+
defer n.rwMut.RUnlock()
93+
94+
for _, node := range n.name2nodeResources {
95+
node.collect(ch)
96+
}
97+
}
98+
99+
func (nr *nodeResources) collect(ch chan<- prometheus.Metric) {
100+
labels := nr.meta.labels
101+
102+
cpuRequest := prometheus.NewGauge(prometheus.GaugeOpts{
103+
Namespace: namespace,
104+
Name: metricNameCpuRequests,
105+
Help: "Total CPU requests of all pods running on the node",
106+
ConstLabels: labels,
107+
})
108+
cpuLimit := prometheus.NewGauge(prometheus.GaugeOpts{
109+
Namespace: namespace,
110+
Name: metricNameCpuLimits,
111+
Help: "Total CPU limits of all pods running on the node",
112+
ConstLabels: labels,
113+
})
114+
memRequest := prometheus.NewGauge(prometheus.GaugeOpts{
115+
Namespace: namespace,
116+
Name: metricNameMemRequests,
117+
Help: "Total memory requests of all pods running on the node",
118+
ConstLabels: labels,
119+
})
120+
memLimit := prometheus.NewGauge(prometheus.GaugeOpts{
121+
Namespace: namespace,
122+
Name: metricNameMemLimits,
123+
Help: "Total memory limits of all pods running on the node",
124+
ConstLabels: labels,
125+
})
126+
127+
cpuRequest.Set(nr.cpuRequests)
128+
cpuLimit.Set(nr.cpuLimits)
129+
memRequest.Set(nr.memRequests)
130+
memLimit.Set(nr.memLimits)
131+
132+
ch <- cpuRequest
133+
ch <- cpuLimit
134+
ch <- memRequest
135+
ch <- memLimit
136+
}
137+
138+
// Describe implements prometheus.Collector.
139+
func (n *nodeResourceCollector) Describe(chan<- *prometheus.Desc) {
113140
}
114141

142+
var _ prometheus.Collector = &nodeResourceCollector{}
143+
115144
var (
116145
fullSyncInterval = 2 * time.Minute
117146

@@ -120,10 +149,6 @@ var (
120149
)
121150

122151
func init() {
123-
prometheus.MustRegister(cpuRequests)
124-
prometheus.MustRegister(cpuLimits)
125-
prometheus.MustRegister(memRequests)
126-
prometheus.MustRegister(memLimits)
127152
prometheus.MustRegister(lastFullSyncOkTimeSeconds)
128153
prometheus.MustRegister(fullSyncDurationSeconds)
129154
prometheus.MustRegister(k8sApiLatencySeconds)
@@ -152,14 +177,16 @@ func main() {
152177

153178
log.Printf("Full sync interval: %v", fullSyncInterval)
154179

180+
collector := newNodeResourceCollector()
181+
155182
go func() {
156183
// TODO: set timeout
157184
ctx := context.Background()
158185

159186
lastFullSyncOk := time.Now()
160187

161188
for {
162-
err := syncFullMetrics(ctx, clientset)
189+
err := collector.syncFullMetrics(ctx, clientset)
163190
if err != nil {
164191
log.Printf("Error collecting metrics: %v", err)
165192
}
@@ -174,26 +201,24 @@ func main() {
174201
}
175202
}()
176203

177-
go watchNodes(clientset)
178-
179204
http.Handle("/metrics", promhttp.Handler())
180205
log.Fatal(http.ListenAndServe(":19191", nil))
181206
}
182207

208+
func newNodeResourceCollector() *nodeResourceCollector {
209+
return &nodeResourceCollector{
210+
name2nodeResources: make(map[string]*nodeResources),
211+
}
212+
}
213+
183214
func newNodeMeta(node *v1.Node) *nodeMeta {
184215
return &nodeMeta{
185216
ip: getNodeIp(node),
186217
name: node.Name,
187-
}
188-
}
189-
190-
func newNodeMetrics(meta *nodeMeta) *nodeMetrics {
191-
labels := getNodeLabelValues(meta)
192-
return &nodeMetrics{
193-
cpuRequests: cpuRequests.WithLabelValues(labels...),
194-
cpuLimits: cpuLimits.WithLabelValues(labels...),
195-
memRequests: memRequests.WithLabelValues(labels...),
196-
memLimits: memLimits.WithLabelValues(labels...),
218+
labels: map[string]string{
219+
"node": node.Name,
220+
"ip": getNodeIp(node),
221+
},
197222
}
198223
}
199224

@@ -221,8 +246,7 @@ func getName2NodeResources(nodes []v1.Node) map[string]*nodeResources {
221246
func newNodeResource(node *v1.Node) *nodeResources {
222247
meta := newNodeMeta(node)
223248
return &nodeResources{
224-
meta: meta,
225-
metrics: newNodeMetrics(meta),
249+
meta: meta,
226250
}
227251
}
228252

@@ -290,7 +314,7 @@ func listPods(ctx context.Context, clientset *kubernetes.Clientset) ([]v1.Pod, e
290314
return allPods, nil
291315
}
292316

293-
func syncFullMetrics(ctx context.Context, clientset *kubernetes.Clientset) error {
317+
func (collector *nodeResourceCollector) syncFullMetrics(ctx context.Context, clientset *kubernetes.Clientset) error {
294318
beg := time.Now()
295319
defer func(beg time.Time) {
296320
end := time.Now()
@@ -312,8 +336,8 @@ func syncFullMetrics(ctx context.Context, clientset *kubernetes.Clientset) error
312336
}
313337

314338
updateResourcesByNode(allPods, name2nodeResources)
315-
updateNodeMetrics(name2nodeResources)
316339

340+
collector.lockedUpdate(name2nodeResources)
317341
return nil
318342
}
319343

@@ -352,47 +376,3 @@ func updateResourcesByNode(pods []v1.Pod, n2r map[string]*nodeResources) {
352376
}
353377
}
354378
}
355-
356-
func updateNodeMetrics(name2node map[string]*nodeResources) {
357-
for _, node := range name2node {
358-
node.metrics.cpuRequests.Set(node.cpuRequests)
359-
node.metrics.cpuLimits.Set(node.cpuLimits)
360-
node.metrics.memRequests.Set(node.memRequests)
361-
node.metrics.memLimits.Set(node.memLimits)
362-
}
363-
}
364-
365-
func watchNodes(clientset *kubernetes.Clientset) {
366-
watcher, err := clientset.CoreV1().Nodes().Watch(context.TODO(), metav1.ListOptions{})
367-
if err != nil {
368-
log.Fatalf("Error watching nodes: %v", err)
369-
}
370-
371-
for event := range watcher.ResultChan() {
372-
node, ok := event.Object.(*v1.Node)
373-
if !ok {
374-
continue
375-
}
376-
377-
switch event.Type {
378-
case watch.Deleted:
379-
deleteNodeMetrics(node)
380-
}
381-
}
382-
}
383-
384-
func deleteNodeMetrics(node *v1.Node) {
385-
labelValues := getNodeLabelValues(newNodeMeta(node))
386-
387-
cpuRequests.DeleteLabelValues(labelValues...)
388-
cpuLimits.DeleteLabelValues(labelValues...)
389-
memRequests.DeleteLabelValues(labelValues...)
390-
memLimits.DeleteLabelValues(labelValues...)
391-
}
392-
393-
func getNodeLabelValues(meta *nodeMeta) []string {
394-
return []string{
395-
meta.name,
396-
meta.ip,
397-
}
398-
}

0 commit comments

Comments
 (0)