Skip to content

Commit e1c99cf

Browse files
committed
feat: Introduce object limits
This change allows user-controlled limits on how many objects KSM will list from the API. This is helpful to prevent resource exhaustion on KSM, in case the API creates too many resources. The object limit it set globally and applied per resource watched.
1 parent 3d73ddb commit e1c99cf

File tree

7 files changed

+111
-55
lines changed

7 files changed

+111
-55
lines changed

docs/developer/cli-arguments.md

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ Flags:
6464
--namespaces string Comma-separated list of namespaces to be enabled. Defaults to ""
6565
--namespaces-denylist string Comma-separated list of namespaces not to be enabled. If namespaces and namespaces-denylist are both set, only namespaces that are excluded in namespaces-denylist will be used.
6666
--node string Name of the node that contains the kube-state-metrics pod. Most likely it should be passed via the downward API. This is used for daemonset sharding. Only available for resources (pod metrics) that support spec.nodeName fieldSelector. This is experimental.
67+
--object-limit int The total number of objects to list per resource from the API Server. (experimental)
6768
--one_output If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true)
6869
--pod string Name of the pod that contains the kube-state-metrics container. When set, it is expected that --pod and --pod-namespace are both set. Most likely this should be passed via the downward API. This is used for auto-detecting sharding. If set, this has preference over statically configured sharding. This is experimental, it may be removed without notice.
6970
--pod-namespace string Name of the namespace of the pod specified by --pod. When set, it is expected that --pod and --pod-namespace are both set. Most likely this should be passed via the downward API. This is used for auto-detecting sharding. If set, this has preference over statically configured sharding. This is experimental, it may be removed without notice.

internal/store/builder.go

+51-42
Large diffs are not rendered by default.

pkg/app/server.go

+1
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error {
247247
))
248248

249249
storeBuilder.WithUsingAPIServerCache(opts.UseAPIServerCache)
250+
storeBuilder.WithObjectLimit(opts.ObjectLimit)
250251
storeBuilder.WithGenerateStoresFunc(storeBuilder.DefaultGenerateStoresFunc())
251252
proc.StartReaper()
252253

pkg/builder/builder_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func customStore(_ []generator.FamilyGenerator,
6767
_ interface{},
6868
_ func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher,
6969
_ bool,
70+
_ int64,
7071
) []cache.Store {
7172
stores := make([]cache.Store, 0, 2)
7273
stores = append(stores, newFakeStore(fakeMetricLists[0]))

pkg/builder/types/interfaces.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,15 @@ type BuilderInterface interface {
5757
type BuildStoresFunc func(metricFamilies []generator.FamilyGenerator,
5858
expectedType interface{},
5959
listWatchFunc func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher,
60-
useAPIServerCache bool,
60+
useAPIServerCache bool, limit int64,
6161
) []cache.Store
6262

6363
// BuildCustomResourceStoresFunc function signature that is used to return a list of custom resource cache.Store
6464
type BuildCustomResourceStoresFunc func(resourceName string,
6565
metricFamilies []generator.FamilyGenerator,
6666
expectedType interface{},
6767
listWatchFunc func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher,
68-
useAPIServerCache bool,
68+
useAPIServerCache bool, limit int64,
6969
) []cache.Store
7070

7171
// AllowDenyLister interface for AllowDeny lister that can allow or exclude metrics by there names

pkg/options/options.go

+2
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ type Options struct {
7979
Help bool `yaml:"help"`
8080
TrackUnscheduledPods bool `yaml:"track_unscheduled_pods"`
8181
UseAPIServerCache bool `yaml:"use_api_server_cache"`
82+
ObjectLimit int64 `yaml:"object_limit"`
8283
}
8384

8485
// GetConfigFile is the getter for --config value.
@@ -143,6 +144,7 @@ func (o *Options) AddFlags(cmd *cobra.Command) {
143144
o.cmd.Flags().BoolVar(&o.TrackUnscheduledPods, "track-unscheduled-pods", false, "This configuration is used in conjunction with node configuration. When this configuration is true, node configuration is empty and the metric of unscheduled pods is fetched from the Kubernetes API Server. This is experimental.")
144145
o.cmd.Flags().BoolVarP(&o.Help, "help", "h", false, "Print Help text")
145146
o.cmd.Flags().BoolVarP(&o.UseAPIServerCache, "use-apiserver-cache", "", false, "Sets resourceVersion=0 for ListWatch requests, using cached resources from the apiserver instead of an etcd quorum read.")
147+
o.cmd.Flags().Int64Var(&o.ObjectLimit, "object-limit", 0, "The total number of objects to list per resource from the API Server. (experimental)")
146148
o.cmd.Flags().Int32Var(&o.Shard, "shard", int32(0), "The instances shard nominal (zero indexed) within the total number of shards. (default 0)")
147149
o.cmd.Flags().IntVar(&o.Port, "port", 8080, `Port to expose metrics on.`)
148150
o.cmd.Flags().IntVar(&o.TelemetryPort, "telemetry-port", 8081, `Port to expose kube-state-metrics self metrics on.`)

pkg/watch/watch.go

+53-11
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package watch
1919
import (
2020
"github.com/prometheus/client_golang/prometheus"
2121
"github.com/prometheus/client_golang/prometheus/promauto"
22+
"k8s.io/apimachinery/pkg/api/meta"
2223
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2324
"k8s.io/apimachinery/pkg/runtime"
2425
"k8s.io/apimachinery/pkg/watch"
@@ -27,29 +28,45 @@ import (
2728

2829
// ListWatchMetrics stores the pointers of kube_state_metrics_[list|watch]_total metrics.
2930
type ListWatchMetrics struct {
30-
WatchTotal *prometheus.CounterVec
31-
ListTotal *prometheus.CounterVec
31+
WatchRequestsTotal *prometheus.CounterVec
32+
ListRequestsTotal *prometheus.CounterVec
33+
ListObjectsLimit *prometheus.GaugeVec
34+
ListObjectsCurrent *prometheus.GaugeVec
3235
}
3336

3437
// NewListWatchMetrics takes in a prometheus registry and initializes
3538
// and registers the kube_state_metrics_list_total and
3639
// kube_state_metrics_watch_total metrics. It returns those registered metrics.
3740
func NewListWatchMetrics(r prometheus.Registerer) *ListWatchMetrics {
3841
return &ListWatchMetrics{
39-
WatchTotal: promauto.With(r).NewCounterVec(
42+
WatchRequestsTotal: promauto.With(r).NewCounterVec(
4043
prometheus.CounterOpts{
4144
Name: "kube_state_metrics_watch_total",
42-
Help: "Number of total resource watches in kube-state-metrics",
45+
Help: "Number of total resource watch calls in kube-state-metrics",
4346
},
4447
[]string{"result", "resource"},
4548
),
46-
ListTotal: promauto.With(r).NewCounterVec(
49+
ListRequestsTotal: promauto.With(r).NewCounterVec(
4750
prometheus.CounterOpts{
4851
Name: "kube_state_metrics_list_total",
49-
Help: "Number of total resource list in kube-state-metrics",
52+
Help: "Number of total resource list calls in kube-state-metrics",
5053
},
5154
[]string{"result", "resource"},
5255
),
56+
ListObjectsCurrent: promauto.With(r).NewGaugeVec(
57+
prometheus.GaugeOpts{
58+
Name: "kube_state_metrics_list_objects",
59+
Help: "Number of resources listed in kube-state-metrics",
60+
},
61+
[]string{"resource"},
62+
),
63+
ListObjectsLimit: promauto.With(r).NewGaugeVec(
64+
prometheus.GaugeOpts{
65+
Name: "kube_state_metrics_list_objects_limit",
66+
Help: "Number of resource list limit in kube-state-metrics",
67+
},
68+
[]string{"resource"},
69+
),
5370
}
5471
}
5572

@@ -60,45 +77,70 @@ type InstrumentedListerWatcher struct {
6077
metrics *ListWatchMetrics
6178
resource string
6279
useAPIServerCache bool
80+
limit int64
6381
}
6482

6583
// NewInstrumentedListerWatcher returns a new InstrumentedListerWatcher.
66-
func NewInstrumentedListerWatcher(lw cache.ListerWatcher, metrics *ListWatchMetrics, resource string, useAPIServerCache bool) cache.ListerWatcher {
84+
func NewInstrumentedListerWatcher(lw cache.ListerWatcher, metrics *ListWatchMetrics, resource string, useAPIServerCache bool, limit int64) cache.ListerWatcher {
6785
return &InstrumentedListerWatcher{
6886
lw: lw,
6987
metrics: metrics,
7088
resource: resource,
7189
useAPIServerCache: useAPIServerCache,
90+
limit: limit,
7291
}
7392
}
7493

7594
// List is a wrapper func around the cache.ListerWatcher.List func. It increases the success/error
7695
// / counters based on the outcome of the List operation it instruments.
96+
// It supports setting object limits, this means if it is set it will only list and process
97+
// n objects of the same resource type.
7798
func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
7899

79100
if i.useAPIServerCache {
80101
options.ResourceVersion = "0"
81102
}
82103

104+
if i.limit != 0 {
105+
options.Limit = i.limit
106+
i.metrics.ListObjectsLimit.WithLabelValues(i.resource).Set(float64(i.limit))
107+
}
108+
83109
res, err := i.lw.List(options)
110+
84111
if err != nil {
85-
i.metrics.ListTotal.WithLabelValues("error", i.resource).Inc()
112+
i.metrics.ListRequestsTotal.WithLabelValues("error", i.resource).Inc()
86113
return nil, err
87114
}
88115

89-
i.metrics.ListTotal.WithLabelValues("success", i.resource).Inc()
116+
list, err := meta.ExtractList(res)
117+
if err != nil {
118+
return nil, err
119+
}
120+
i.metrics.ListRequestsTotal.WithLabelValues("success", i.resource).Inc()
121+
122+
if i.limit != 0 {
123+
if int64(len(list)) > i.limit {
124+
meta.SetList(res, list[0:i.limit])
125+
i.metrics.ListObjectsCurrent.WithLabelValues(i.resource).Set(float64(i.limit))
126+
} else {
127+
i.metrics.ListObjectsCurrent.WithLabelValues(i.resource).Set(float64(len(list)))
128+
}
129+
}
130+
90131
return res, nil
132+
91133
}
92134

93135
// Watch is a wrapper func around the cache.ListerWatcher.Watch func. It increases the success/error
94136
// counters based on the outcome of the Watch operation it instruments.
95137
func (i *InstrumentedListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
96138
res, err := i.lw.Watch(options)
97139
if err != nil {
98-
i.metrics.WatchTotal.WithLabelValues("error", i.resource).Inc()
140+
i.metrics.WatchRequestsTotal.WithLabelValues("error", i.resource).Inc()
99141
return nil, err
100142
}
101143

102-
i.metrics.WatchTotal.WithLabelValues("success", i.resource).Inc()
144+
i.metrics.WatchRequestsTotal.WithLabelValues("success", i.resource).Inc()
103145
return res, nil
104146
}

0 commit comments

Comments
 (0)