Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce object limits #2626

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/developer/cli-arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Flags:
--namespaces string Comma-separated list of namespaces to be enabled. Defaults to ""
--namespaces-denylist string Comma-separated list of namespaces not to be enabled. If namespaces and namespaces-denylist are both set, only namespaces that are excluded in namespaces-denylist will be used.
--node string Name of the node that contains the kube-state-metrics pod. Most likely it should be passed via the downward API. This is used for daemonset sharding. Only available for resources (pod metrics) that support spec.nodeName fieldSelector. This is experimental.
--object-limit int The total number of objects to list per resource from the API Server. (experimental)
--one_output If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true)
--pod string Name of the pod that contains the kube-state-metrics container. When set, it is expected that --pod and --pod-namespace are both set. Most likely this should be passed via the downward API. This is used for auto-detecting sharding. If set, this has preference over statically configured sharding. This is experimental, it may be removed without notice.
--pod-namespace string Name of the namespace of the pod specified by --pod. When set, it is expected that --pod and --pod-namespace are both set. Most likely this should be passed via the downward API. This is used for auto-detecting sharding. If set, this has preference over statically configured sharding. This is experimental, it may be removed without notice.
Expand Down
93 changes: 51 additions & 42 deletions internal/store/builder.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error {
))

storeBuilder.WithUsingAPIServerCache(opts.UseAPIServerCache)
storeBuilder.WithObjectLimit(opts.ObjectLimit)
storeBuilder.WithGenerateStoresFunc(storeBuilder.DefaultGenerateStoresFunc())
proc.StartReaper()

Expand Down
1 change: 1 addition & 0 deletions pkg/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func customStore(_ []generator.FamilyGenerator,
_ interface{},
_ func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher,
_ bool,
_ int64,
) []cache.Store {
stores := make([]cache.Store, 0, 2)
stores = append(stores, newFakeStore(fakeMetricLists[0]))
Expand Down
4 changes: 2 additions & 2 deletions pkg/builder/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ type BuilderInterface interface {
type BuildStoresFunc func(metricFamilies []generator.FamilyGenerator,
expectedType interface{},
listWatchFunc func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher,
useAPIServerCache bool,
useAPIServerCache bool, limit int64,
) []cache.Store

// BuildCustomResourceStoresFunc function signature that is used to return a list of custom resource cache.Store
type BuildCustomResourceStoresFunc func(resourceName string,
metricFamilies []generator.FamilyGenerator,
expectedType interface{},
listWatchFunc func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher,
useAPIServerCache bool,
useAPIServerCache bool, limit int64,
) []cache.Store

// AllowDenyLister interface for AllowDeny lister that can allow or exclude metrics by there names
Expand Down
2 changes: 2 additions & 0 deletions pkg/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Options struct {
Help bool `yaml:"help"`
TrackUnscheduledPods bool `yaml:"track_unscheduled_pods"`
UseAPIServerCache bool `yaml:"use_api_server_cache"`
ObjectLimit int64 `yaml:"object_limit"`
}

// GetConfigFile is the getter for --config value.
Expand Down Expand Up @@ -143,6 +144,7 @@ func (o *Options) AddFlags(cmd *cobra.Command) {
o.cmd.Flags().BoolVar(&o.TrackUnscheduledPods, "track-unscheduled-pods", false, "This configuration is used in conjunction with node configuration. When this configuration is true, node configuration is empty and the metric of unscheduled pods is fetched from the Kubernetes API Server. This is experimental.")
o.cmd.Flags().BoolVarP(&o.Help, "help", "h", false, "Print Help text")
o.cmd.Flags().BoolVarP(&o.UseAPIServerCache, "use-apiserver-cache", "", false, "Sets resourceVersion=0 for ListWatch requests, using cached resources from the apiserver instead of an etcd quorum read.")
o.cmd.Flags().Int64Var(&o.ObjectLimit, "object-limit", 0, "The total number of objects to list per resource from the API Server. (experimental)")
o.cmd.Flags().Int32Var(&o.Shard, "shard", int32(0), "The instances shard nominal (zero indexed) within the total number of shards. (default 0)")
o.cmd.Flags().IntVar(&o.Port, "port", 8080, `Port to expose metrics on.`)
o.cmd.Flags().IntVar(&o.TelemetryPort, "telemetry-port", 8081, `Port to expose kube-state-metrics self metrics on.`)
Expand Down
64 changes: 53 additions & 11 deletions pkg/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package watch
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -27,29 +28,45 @@ import (

// ListWatchMetrics stores the pointers of kube_state_metrics_[list|watch]_total metrics.
type ListWatchMetrics struct {
WatchTotal *prometheus.CounterVec
ListTotal *prometheus.CounterVec
WatchRequestsTotal *prometheus.CounterVec
ListRequestsTotal *prometheus.CounterVec
ListObjectsLimit *prometheus.GaugeVec
ListObjectsCurrent *prometheus.GaugeVec
}

// NewListWatchMetrics takes in a prometheus registry and initializes
// and registers the kube_state_metrics_list_total and
// kube_state_metrics_watch_total metrics. It returns those registered metrics.
func NewListWatchMetrics(r prometheus.Registerer) *ListWatchMetrics {
return &ListWatchMetrics{
WatchTotal: promauto.With(r).NewCounterVec(
WatchRequestsTotal: promauto.With(r).NewCounterVec(
prometheus.CounterOpts{
Name: "kube_state_metrics_watch_total",
Help: "Number of total resource watches in kube-state-metrics",
Help: "Number of total resource watch calls in kube-state-metrics",
},
[]string{"result", "resource"},
),
ListTotal: promauto.With(r).NewCounterVec(
ListRequestsTotal: promauto.With(r).NewCounterVec(
prometheus.CounterOpts{
Name: "kube_state_metrics_list_total",
Help: "Number of total resource list in kube-state-metrics",
Help: "Number of total resource list calls in kube-state-metrics",
},
[]string{"result", "resource"},
),
ListObjectsCurrent: promauto.With(r).NewGaugeVec(
prometheus.GaugeOpts{
Name: "kube_state_metrics_list_objects",
Help: "Number of resources listed in kube-state-metrics",
},
[]string{"resource"},
),
ListObjectsLimit: promauto.With(r).NewGaugeVec(
prometheus.GaugeOpts{
Name: "kube_state_metrics_list_objects_limit",
Help: "Number of resource list limit in kube-state-metrics",
},
[]string{"resource"},
),
}
}

Expand All @@ -60,45 +77,70 @@ type InstrumentedListerWatcher struct {
metrics *ListWatchMetrics
resource string
useAPIServerCache bool
limit int64
}

// NewInstrumentedListerWatcher returns a new InstrumentedListerWatcher.
func NewInstrumentedListerWatcher(lw cache.ListerWatcher, metrics *ListWatchMetrics, resource string, useAPIServerCache bool) cache.ListerWatcher {
func NewInstrumentedListerWatcher(lw cache.ListerWatcher, metrics *ListWatchMetrics, resource string, useAPIServerCache bool, limit int64) cache.ListerWatcher {
return &InstrumentedListerWatcher{
lw: lw,
metrics: metrics,
resource: resource,
useAPIServerCache: useAPIServerCache,
limit: limit,
}
}

// List is a wrapper func around the cache.ListerWatcher.List func. It increases the success/error
// / counters based on the outcome of the List operation it instruments.
// It supports setting object limits, this means if it is set it will only list and process
// n objects of the same resource type.
func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {

if i.useAPIServerCache {
options.ResourceVersion = "0"
}

if i.limit != 0 {
options.Limit = i.limit
i.metrics.ListObjectsLimit.WithLabelValues(i.resource).Set(float64(i.limit))
}

res, err := i.lw.List(options)

if err != nil {
i.metrics.ListTotal.WithLabelValues("error", i.resource).Inc()
i.metrics.ListRequestsTotal.WithLabelValues("error", i.resource).Inc()
return nil, err
}

i.metrics.ListTotal.WithLabelValues("success", i.resource).Inc()
list, err := meta.ExtractList(res)
if err != nil {
return nil, err
}
i.metrics.ListRequestsTotal.WithLabelValues("success", i.resource).Inc()

if i.limit != 0 {
if int64(len(list)) > i.limit {
meta.SetList(res, list[0:i.limit])
i.metrics.ListObjectsCurrent.WithLabelValues(i.resource).Set(float64(i.limit))
} else {
i.metrics.ListObjectsCurrent.WithLabelValues(i.resource).Set(float64(len(list)))
}
}

return res, nil

}

// Watch is a wrapper func around the cache.ListerWatcher.Watch func. It increases the success/error
// counters based on the outcome of the Watch operation it instruments.
func (i *InstrumentedListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
res, err := i.lw.Watch(options)
if err != nil {
i.metrics.WatchTotal.WithLabelValues("error", i.resource).Inc()
i.metrics.WatchRequestsTotal.WithLabelValues("error", i.resource).Inc()
return nil, err
}

i.metrics.WatchTotal.WithLabelValues("success", i.resource).Inc()
i.metrics.WatchRequestsTotal.WithLabelValues("success", i.resource).Inc()
return res, nil
}