-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
379 lines (316 loc) · 11.5 KB
/
main.go
File metadata and controls
379 lines (316 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
package main
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"time"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type Controller struct {
Clientset *kubernetes.Clientset
Namespace string
}
type CmdConfig struct {
ConfigMapName string
ConfigMapLimitsPath string
ConfigMapGeneratedName string
ReceiverLabel string
ActiveSeriesMax int
Interval time.Duration
MetricsPort string
MetricsPath string
}
var (
configMapUpdateTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_limits_configmap_updates_total",
Help: "Total number of ConfigMap updates performed by the controller",
},
)
configMapUpdateFailures = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_limits_configmap_update_failures_total",
Help: "Total number of ConfigMap update failures",
},
)
lastUpdateTime = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_limits_last_update_timestamp_seconds",
Help: "Timestamp of the last successful ConfigMap update",
},
)
currentRunningReplicas = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_limits_current_running_replicas",
Help: "Total number of currently running replicas",
},
)
currentMaxActiveHeadSeries = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_limits_current_global_active_head_series_limit",
Help: "Current maximum global active head series limit",
},
)
)
// https://thanos.io/tip/components/receive.md/#understanding-the-configuration-file
// Take an existing configmap as an input, and override (for now) the `write.global.samples_limit`
type WriteConfig struct {
Write LimitsConfig `yaml:"write"`
}
type LimitsConfig struct {
Global GlobalConfig `yaml:"global"`
Default TenantConfig `yaml:"default"`
Tenant map[string]*TenantConfig `yaml:"tenants,omitempty"`
}
type GlobalConfig struct {
MaxConcurrency *int `yaml:"max_concurrency,omitempty"`
MetaMonitoringURL string `yaml:"meta_monitoring_url"`
MetaMonitoringLimitQuery string `yaml:"meta_monitoring_limit_query"`
}
type TenantConfig struct {
Request *RequestConfig `yaml:"request,omitempty"`
HeadSeriesLimit *int `yaml:"head_series_limit,omitempty"`
}
type RequestConfig struct {
SizeBytesLimit *int `yaml:"size_bytes_limit,omitempty"`
SeriesLimit *int `yaml:"series_limit,omitempty"`
SamplesLimit *int `yaml:"samples_limit,omitempty"`
}
func parseFlags() CmdConfig {
var config CmdConfig
flag.StringVar(&config.ConfigMapName, "configmap-name", "", "The previous limits configuration configmap containing the limits configuration.")
flag.StringVar(&config.ConfigMapLimitsPath, "configmap-limits-path", "config.yaml", "The default location of the limits configuration within the ConfigMap.")
flag.StringVar(&config.ConfigMapGeneratedName, "configmap-generated-name", "", "The name given to the configmap containing the limits configuration.")
flag.StringVar(&config.ReceiverLabel, "statefulset-label", "controller.limits.thanos.io=thanos-limits-controller", "The statefulset's label to watch by the controller.")
flag.IntVar(&config.ActiveSeriesMax, "active-series-max", 0, "The maximum the number that a single particular receive instance can handle.")
flag.DurationVar(&config.Interval, "interval", 0, "Optional interval for periodic reconciliation (e.g. 30s, 1m). If 0, runs once and exits.")
// Metrics arguments
flag.StringVar(&config.MetricsPort, "metrics-port", "8080", "Port to expose Prometheus metrics on.")
flag.StringVar(&config.MetricsPath, "metrics-path", "/metrics", "Path to expose Prometheus metrics on.")
flag.Parse()
return config
}
func (c CmdConfig) validate() error {
if c.ConfigMapName == "" {
return fmt.Errorf("missing required flag: -configmap-name")
}
if c.ActiveSeriesMax == 0 {
return fmt.Errorf("missing or invalid flag: -active-series-max")
}
if c.ConfigMapGeneratedName == "" {
return fmt.Errorf("missing required flag: -configmap-generated-name")
}
return nil
}
func init() {
prometheus.MustRegister(configMapUpdateTotal, configMapUpdateFailures, lastUpdateTime, currentRunningReplicas, currentMaxActiveHeadSeries)
logLevelStr := os.Getenv("LOG_LEVEL")
if logLevelStr == "" {
log.SetLevel(log.InfoLevel)
return
}
logLevelStr = strings.ToLower(logLevelStr)
switch logLevelStr {
case "debug":
log.SetLevel(log.DebugLevel)
case "info":
log.SetLevel(log.InfoLevel)
case "warn", "warning":
log.SetLevel(log.WarnLevel)
case "error":
log.SetLevel(log.ErrorLevel)
case "fatal":
log.SetLevel(log.FatalLevel)
case "panic":
log.SetLevel(log.PanicLevel)
case "trace":
log.SetLevel(log.TraceLevel)
default:
log.SetLevel(log.InfoLevel)
}
}
func main() {
cmdConfig := parseFlags()
labelSelector := cmdConfig.ReceiverLabel
if err := cmdConfig.validate(); err != nil {
log.Fatal(err)
}
go func() {
http.Handle(cmdConfig.MetricsPath, promhttp.Handler())
addr := ":" + cmdConfig.MetricsPort
log.Infof("Starting metrics server at %s%s", addr, cmdConfig.MetricsPath)
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatalf("Failed to start metrics server: %v", err)
}
}()
execute := func() {
controller, err := NewController()
if err != nil {
log.Fatalf("Failed to initialize controller: %v", err)
}
runningReplicas := controller.getRunningStatefulSets(labelSelector)
currentRunningReplicas.Set(float64(runningReplicas))
globalLimit := runningReplicas * cmdConfig.ActiveSeriesMax
currentMaxActiveHeadSeries.Set(float64(globalLimit))
log.Debugf("Calculated global head_series_limit: %d", globalLimit)
limitsConfig, err := controller.getLimitsConfigMap(cmdConfig.ConfigMapName, cmdConfig.ConfigMapLimitsPath)
if err != nil {
log.Fatalf("Error fetching the configmap %s, %v", cmdConfig.ConfigMapName, err)
}
err = controller.createGeneratedConfigMap(cmdConfig.ConfigMapGeneratedName, cmdConfig.ConfigMapLimitsPath, limitsConfig, globalLimit)
if err != nil {
configMapUpdateFailures.Inc()
log.Fatalf("Failed to create or update configmap: %v", err)
}
configMapUpdateTotal.Inc()
lastUpdateTime.SetToCurrentTime()
}
if cmdConfig.Interval > 0 {
ticker := time.NewTicker(cmdConfig.Interval)
defer ticker.Stop()
for {
execute()
<-ticker.C
}
} else {
execute()
}
}
// getKubernetesClient creates a Kubernetes clientset
func getKubernetesClient() (*kubernetes.Clientset, error) {
var config *rest.Config
var err error
// Try in-cluster config first
config, err = rest.InClusterConfig()
if err != nil {
// Fall back to kubeconfig
log.Debug("Not running in cluster, using kubeconfig")
kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, fmt.Errorf("error building kubeconfig: %v", err)
}
}
return kubernetes.NewForConfig(config)
}
func getCurrentNamespace() (string, error) {
// Try to get namespace from service account
data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err == nil {
return string(data), nil
}
// If not running in a pod, check if NAMESPACE env var is set
namespace := os.Getenv("NAMESPACE")
if namespace != "" {
return namespace, nil
}
// Otherwise, use the current context's namespace from kubeconfig
kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
config, err := clientcmd.LoadFromFile(kubeconfig)
if err != nil {
return "default", nil // Default to "default" namespace as last resort
}
context := config.Contexts[config.CurrentContext]
if context != nil && context.Namespace != "" {
return context.Namespace, nil
}
return "default", nil
}
// getRunningStatefulSets returns the number of ready replicas for statefulsets matching a label.
func (c *Controller) getRunningStatefulSets(labelSelector string) int {
// List StatefulSets with the given label
statefulSets, err := c.Clientset.AppsV1().StatefulSets(c.Namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
log.Fatalf("error listing StatefulSets: %v", err)
}
// Then filter for only those in running state (where ReadyReplicas equals Replicas)
var runningReplicas int32
for _, sts := range statefulSets.Items {
runningReplicas += sts.Status.ReadyReplicas
log.Debugf("StatefulSet %s is running with %d/%d ready replicas",
sts.Name, sts.Status.ReadyReplicas, sts.Status.Replicas)
}
log.Debugf("Returned running replicas are: %d", runningReplicas)
return int(runningReplicas)
}
func (c *Controller) getLimitsConfigMap(configMapName string, configMapPath string) (*WriteConfig, error) {
configMapData, err := c.Clientset.CoreV1().ConfigMaps(c.Namespace).Get(context.TODO(), configMapName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("Error locating ConfigMap: %v", err)
}
limitsConfig, exists := configMapData.Data[configMapPath]
if !exists {
return nil, fmt.Errorf("key %s not fond in ConfigMap %s", configMapPath, configMapName)
}
var parsedConfig WriteConfig
unmarshalErr := yaml.Unmarshal([]byte(limitsConfig), &parsedConfig)
if unmarshalErr != nil {
return nil, fmt.Errorf("failed to parse limits config: %w", unmarshalErr)
}
return &parsedConfig, nil
}
func (c *Controller) createGeneratedConfigMap(configMapGeneratedName string, configMapPath string, config *WriteConfig, headSeriesValue int) error {
config.Write.Default.HeadSeriesLimit = &headSeriesValue
updatedYAML, err := yaml.Marshal(config)
if err != nil {
return fmt.Errorf("failed to marshal updated config: %w", err)
}
// Create the new ConfigMap object
newConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapGeneratedName,
},
Data: map[string]string{
configMapPath: string(updatedYAML),
},
}
// Attempt to create the ConfigMap
_, err = c.Clientset.CoreV1().ConfigMaps(c.Namespace).Create(context.TODO(), newConfigMap, metav1.CreateOptions{})
if err == nil {
log.Infof("Successfully created ConfigMap: %s", configMapGeneratedName)
return nil
}
// If it already exists, update it
if strings.Contains(err.Error(), "already exists") {
log.Infof("ConfigMap %s already exists. Updating...", configMapGeneratedName)
// Retrieve existing ConfigMap to get its resource version
existing, getErr := c.Clientset.CoreV1().ConfigMaps(c.Namespace).Get(context.TODO(), configMapGeneratedName, metav1.GetOptions{})
if getErr != nil {
return fmt.Errorf("failed to fetch existing ConfigMap for update: %w", getErr)
}
// Set the resource version to allow update
newConfigMap.ResourceVersion = existing.ResourceVersion
_, updateErr := c.Clientset.CoreV1().ConfigMaps(c.Namespace).Update(context.TODO(), newConfigMap, metav1.UpdateOptions{})
if updateErr != nil {
return fmt.Errorf("failed to update existing ConfigMap: %w", updateErr)
}
log.Infof("Successfully updated ConfigMap: %s", configMapGeneratedName)
return nil
}
return nil
}
func NewController() (*Controller, error) {
clientset, err := getKubernetesClient()
if err != nil {
return nil, err
}
namespace, err := getCurrentNamespace()
if err != nil {
return nil, err
}
return &Controller{Clientset: clientset, Namespace: namespace}, nil
}