diff --git a/cmd/katalyst-controller/app/controller/overcommit.go b/cmd/katalyst-controller/app/controller/overcommit.go index cd5f403eee..d52146bbe9 100644 --- a/cmd/katalyst-controller/app/controller/overcommit.go +++ b/cmd/katalyst-controller/app/controller/overcommit.go @@ -24,6 +24,7 @@ import ( katalyst "github.com/kubewharf/katalyst-core/cmd/base" "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/node" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction" ) const ( @@ -47,7 +48,20 @@ func StartOvercommitController( klog.Errorf("failed to new nodeOvercommit controller") return false, err } - go noc.Run() + + if conf.Prediction.EnablePredict { + pc, err := prediction.NewPredictionController( + ctx, + controlCtx, + conf.ControllersConfiguration.OvercommitConfig, + ) + if err != nil { + klog.Errorf("failed to new overcommit prediction controller") + return false, err + } + + go pc.Run() + } return true, nil } diff --git a/cmd/katalyst-controller/app/options/overcommit.go b/cmd/katalyst-controller/app/options/overcommit.go index 4a37da8c70..f6054e7c84 100644 --- a/cmd/katalyst-controller/app/options/overcommit.go +++ b/cmd/katalyst-controller/app/options/overcommit.go @@ -22,6 +22,7 @@ import ( cliflag "k8s.io/component-base/cli/flag" "github.com/kubewharf/katalyst-core/pkg/config/controller" + "github.com/kubewharf/katalyst-core/pkg/util/datasource/prometheus" ) const ( @@ -32,6 +33,35 @@ const ( // OvercommitOptions holds the configurations for overcommit. type OvercommitOptions struct { NodeOvercommitOptions + PredictionOptions +} + +type PredictionOptions struct { + EnablePredict bool + Predictor string + PredictPeriod time.Duration + ReconcilePeriod time.Duration + + MaxTimeSeriesDuration time.Duration + MinTimeSeriesDuration time.Duration + + TargetReferenceNameKey string + TargetReferenceTypeKey string + CPUScaleFactor float64 + MemoryScaleFactor float64 + + NodeCPUTargetLoad float64 + NodeMemoryTargetLoad float64 + PodEstimatedCPULoad float64 + PodEstimatedMemoryLoad float64 + + prometheus.PromConfig + NSigmaOptions +} + +type NSigmaOptions struct { + Factor int + Buckets int } // NodeOvercommitOptions holds the configurations for nodeOvercommitConfig controller. @@ -45,7 +75,27 @@ type NodeOvercommitOptions struct { // NewOvercommitOptions creates a new Options with a default config. func NewOvercommitOptions() *OvercommitOptions { - return &OvercommitOptions{} + return &OvercommitOptions{ + PredictionOptions: PredictionOptions{ + EnablePredict: false, + Predictor: "", + PredictPeriod: 24 * time.Hour, + ReconcilePeriod: 1 * time.Hour, + MaxTimeSeriesDuration: 7 * 24 * time.Hour, + MinTimeSeriesDuration: 24 * time.Hour, + CPUScaleFactor: 1, + MemoryScaleFactor: 1, + NodeCPUTargetLoad: 0.6, + NodeMemoryTargetLoad: 0.8, + PodEstimatedCPULoad: 0.5, + PodEstimatedMemoryLoad: 0.8, + NSigmaOptions: NSigmaOptions{ + Factor: 3, + Buckets: 24, + }, + PromConfig: prometheus.PromConfig{}, + }, + } } // AddFlags adds flags to the specified FlagSet @@ -54,11 +104,72 @@ func (o *OvercommitOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs.IntVar(&o.SyncWorkers, "nodeovercommit-sync-workers", defaultNodeOvercommitSyncWorkers, "num of goroutines to sync nodeovercommitconfig") fs.DurationVar(&o.ConfigReconcilePeriod, "nodeovercommit-reconcile-period", defaultNodeOvercommitReconcilePeriod, "Period for nodeovercommit controller to sync configs") + + fs.BoolVar(&o.EnablePredict, "nodeovercommit-enable-predict", o.EnablePredict, "enable node overcommit prediction") + fs.StringVar(&o.Predictor, "nodeovercommit-predictor", o.Predictor, "workload usage predictor in node overcommit controller") + fs.DurationVar(&o.PredictPeriod, "nodeovercommit-workload-predict-period", o.PredictPeriod, "reconcile period of workload usage predictor in overcommit controller") + fs.DurationVar(&o.ReconcilePeriod, "nodeovercommit-node-predict-period", o.ReconcilePeriod, "reconcile period of node overcommitmentRatio prediction in overcommit controller") + fs.DurationVar(&o.MaxTimeSeriesDuration, "nodeovercommit-max-timeseries-duration", o.MaxTimeSeriesDuration, + "max time duration of time series for workload usage prediction, default 7 days") + fs.DurationVar(&o.MinTimeSeriesDuration, "nodeovercommit-min-timeseries-duration", o.MinTimeSeriesDuration, + "min time duration of time series for workload usage prediction, default 24 hours") + fs.IntVar(&o.Factor, "nodeovercommit-nsigma-factor", o.Factor, "stddev factor of n-sigma predictor, default 3") + fs.IntVar(&o.Buckets, "nodeovercommit-nsigma-buckets", o.Buckets, + "bucket of n-sigma predictor result, 24 means predictor result will be divide into 24 buckets according to hours") + fs.StringVar(&o.TargetReferenceNameKey, "nodeovercommit-target-reference-name-key", o.TargetReferenceNameKey, + "overcommit controller get pod owner reference workload name from pod label by nodeovercommit-target-reference-name-key") + fs.StringVar(&o.TargetReferenceTypeKey, "nodeovercommit-target-reference-type-key", o.TargetReferenceTypeKey, + "overcommit controller get pod owner reference workload type from pod label by nodeovercommit-target-reference-type-key") + fs.Float64Var(&o.CPUScaleFactor, "nodeovercommit-cpu-scaleFactor", o.CPUScaleFactor, + "podUsage = podRequest * scaleFactor when pod resource portrait is missed") + fs.Float64Var(&o.MemoryScaleFactor, "nodeovercommit-memory-scaleFactor", o.MemoryScaleFactor, + "podUsage = podRequest * scaleFactor when pod resource portrait is missed") + + fs.Float64Var(&o.NodeCPUTargetLoad, "nodeovercommit-cpu-targetload", o.NodeCPUTargetLoad, + "max node CPU load when calculate node CPU overcommitment ratio, should be greater than 0 and less than 1") + fs.Float64Var(&o.NodeMemoryTargetLoad, "nodeovercommit-memory-targetload", o.NodeMemoryTargetLoad, + "max node memory load when calculate node CPU overcommitment ratio, should be greater than 0 and less than 1") + fs.Float64Var(&o.PodEstimatedCPULoad, "nodeovercommit-cpu-estimatedload", o.PodEstimatedCPULoad, + "estimated avg pod CPU load in the cluster, should be greater than 0 and less than 1") + fs.Float64Var(&o.PodEstimatedMemoryLoad, "nodeovercommit-memory-estimatedload", o.PodEstimatedMemoryLoad, + "estimated avg pod memory load in the cluster, should be greater than 0 and less than 1") + + fs.StringVar(&o.Address, "nodeovercommit-prometheus-address", "", "prometheus address") + fs.StringVar(&o.Auth.Type, "nodeovercommit-prometheus-auth-type", "", "prometheus auth type") + fs.StringVar(&o.Auth.Username, "nodeovercommit-prometheus-auth-username", "", "prometheus auth username") + fs.StringVar(&o.Auth.Password, "nodeovercommit-prometheus-auth-password", "", "prometheus auth password") + fs.StringVar(&o.Auth.BearerToken, "nodeovercommit-prometheus-auth-bearertoken", "", "prometheus auth bearertoken") + fs.DurationVar(&o.KeepAlive, "nodeovercommit-prometheus-keepalive", 60*time.Second, "prometheus keep alive") + fs.DurationVar(&o.Timeout, "nodeovercommit-prometheus-timeout", 3*time.Minute, "prometheus timeout") + fs.BoolVar(&o.BRateLimit, "nodeovercommit-prometheus-bratelimit", false, "prometheus bratelimit") + fs.IntVar(&o.MaxPointsLimitPerTimeSeries, "nodeovercommit-prometheus-maxpoints", 11000, "prometheus max points limit per time series") + fs.StringVar(&o.BaseFilter, "nodeovercommit-prometheus-promql-base-filter", "", ""+ + "Get basic filters in promql for historical usage data. This filter is added to all promql statements. "+ + "Supports filters format of promql, e.g: group=\\\"Katalyst\\\",cluster=\\\"cfeaf782fasdfe\\\"") + fs.BoolVar(&o.InsecureSkipVerify, "nodeovercommit-prometheus-insecureSkipVerify", true, "prometheus insecure skip verify") + fs.DurationVar(&o.TLSHandshakeTimeoutInSecond, "nodeovercommit-prometheus-TLSHandshakeTimeoutInSecond", 10*time.Second, "prometheus TLSHandshake timeout") } func (o *OvercommitOptions) ApplyTo(c *controller.OvercommitConfig) error { c.Node.SyncWorkers = o.SyncWorkers c.Node.ConfigReconcilePeriod = o.ConfigReconcilePeriod + c.Prediction.EnablePredict = o.EnablePredict + c.Prediction.Predictor = o.Predictor + c.Prediction.PredictPeriod = o.PredictPeriod + c.Prediction.ReconcilePeriod = o.ReconcilePeriod + c.Prediction.MaxTimeSeriesDuration = o.MaxTimeSeriesDuration + c.Prediction.MinTimeSeriesDuration = o.MinTimeSeriesDuration + c.Prediction.Buckets = o.Buckets + c.Prediction.Factor = o.Factor + c.Prediction.TargetReferenceNameKey = o.TargetReferenceNameKey + c.Prediction.TargetReferenceTypeKey = o.TargetReferenceTypeKey + c.Prediction.CPUScaleFactor = o.CPUScaleFactor + c.Prediction.MemoryScaleFactor = o.MemoryScaleFactor + c.Prediction.NodeCPUTargetLoad = o.NodeCPUTargetLoad + c.Prediction.NodeMemoryTargetLoad = o.NodeMemoryTargetLoad + c.Prediction.PodEstimatedCPULoad = o.PodEstimatedCPULoad + c.Prediction.PodEstimatedMemoryLoad = o.PodEstimatedMemoryLoad + c.Prediction.PromConfig = &o.PromConfig return nil } diff --git a/go.mod b/go.mod index 23710e9466..cd0c55bd2a 100644 --- a/go.mod +++ b/go.mod @@ -156,6 +156,7 @@ require ( ) replace ( + github.com/kubewharf/katalyst-api => github.com/WangZzzhe/katalyst-api v0.0.0-20240708111725-a9666d703f9f k8s.io/api => k8s.io/api v0.24.6 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6 k8s.io/apimachinery => k8s.io/apimachinery v0.24.6 diff --git a/go.sum b/go.sum index 1cfb8e2330..88a2d002fb 100644 --- a/go.sum +++ b/go.sum @@ -82,6 +82,8 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= +github.com/WangZzzhe/katalyst-api v0.0.0-20240708111725-a9666d703f9f h1:xNRsHeQYoquhyG43l6PMyaHOksTHxzy1667T2q2WnxY= +github.com/WangZzzhe/katalyst-api v0.0.0-20240708111725-a9666d703f9f/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -558,8 +560,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubewharf/katalyst-api v0.5.1-0.20240702044746-be552fd7ea7d h1:6CuK3axf2B63zIkEu5XyxbaC+JArE/3Jo3QHvb+Hn0M= -github.com/kubewharf/katalyst-api v0.5.1-0.20240702044746-be552fd7ea7d/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k= github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4= github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c= github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8= diff --git a/pkg/config/controller/overcommit.go b/pkg/config/controller/overcommit.go index 34bc3ce3a0..b68383470b 100644 --- a/pkg/config/controller/overcommit.go +++ b/pkg/config/controller/overcommit.go @@ -16,14 +16,20 @@ limitations under the License. package controller -import "time" +import ( + "time" + + "github.com/kubewharf/katalyst-core/pkg/util/datasource/prometheus" +) type OvercommitConfig struct { Node NodeOvercommitConfig + + Prediction PredictionConfig } type NodeOvercommitConfig struct { - // numer of workers to sync overcommit config + // number of workers to sync overcommit config SyncWorkers int // time interval of reconcile overcommit config @@ -33,5 +39,36 @@ type NodeOvercommitConfig struct { func NewOvercommitConfig() *OvercommitConfig { return &OvercommitConfig{ Node: NodeOvercommitConfig{}, + Prediction: PredictionConfig{ + PromConfig: &prometheus.PromConfig{}, + }, } } + +type PredictionConfig struct { + EnablePredict bool + Predictor string + PredictPeriod time.Duration + ReconcilePeriod time.Duration + + MaxTimeSeriesDuration time.Duration + MinTimeSeriesDuration time.Duration + + TargetReferenceNameKey string + TargetReferenceTypeKey string + CPUScaleFactor float64 + MemoryScaleFactor float64 + + NodeCPUTargetLoad float64 + NodeMemoryTargetLoad float64 + PodEstimatedCPULoad float64 + PodEstimatedMemoryLoad float64 + + *prometheus.PromConfig + NSigmaPredictorConfig +} + +type NSigmaPredictorConfig struct { + Factor int + Buckets int +} diff --git a/pkg/controller/overcommit/node/node.go b/pkg/controller/overcommit/node/node.go index af1ba34fa4..923740d41b 100644 --- a/pkg/controller/overcommit/node/node.go +++ b/pkg/controller/overcommit/node/node.go @@ -467,7 +467,8 @@ func (nc *NodeOvercommitController) setNodeOvercommitAnnotations(nodeName string nc.nodeRealtimeOvercommitRatio(nodeAnnotations, node) - cpuAllocatable, cpuCapacity := nc.nodeOvercommitResource(node, validCPUOvercommitRatio(nodeAnnotations), corev1.ResourceCPU, consts.NodeAnnotationOriginalAllocatableCPUKey, consts.NodeAnnotationOriginalCapacityCPUKey) + enableDynamicOvercommit := nc.nodeEnableDynamicOvercommit(node.Name) + cpuAllocatable, cpuCapacity := nc.nodeOvercommitResource(node, validCPUOvercommitRatio(nodeAnnotations, enableDynamicOvercommit), corev1.ResourceCPU, consts.NodeAnnotationOriginalAllocatableCPUKey, consts.NodeAnnotationOriginalCapacityCPUKey) klog.V(5).Infof("node %s CPU allocatable: %v, CPU capacity: %v with bindcpu", node.Name, cpuAllocatable, cpuCapacity) if cpuAllocatable == "" { delete(nodeAnnotations, consts.NodeAnnotationOvercommitAllocatableCPUKey) @@ -477,7 +478,7 @@ func (nc *NodeOvercommitController) setNodeOvercommitAnnotations(nodeName string nodeAnnotations[consts.NodeAnnotationOvercommitCapacityCPUKey] = cpuCapacity } - memAllocatable, memCapacity := nc.nodeOvercommitResource(node, validMemoryOvercommitRatio(nodeAnnotations), corev1.ResourceMemory, consts.NodeAnnotationOriginalAllocatableMemoryKey, consts.NodeAnnotationOriginalCapacityMemoryKey) + memAllocatable, memCapacity := nc.nodeOvercommitResource(node, validMemoryOvercommitRatio(nodeAnnotations, enableDynamicOvercommit), corev1.ResourceMemory, consts.NodeAnnotationOriginalAllocatableMemoryKey, consts.NodeAnnotationOriginalCapacityMemoryKey) klog.V(5).Infof("node %s memory allocatable: %v, memory capacity: %v", node.Name, memAllocatable, memCapacity) if memAllocatable == "" { delete(nodeAnnotations, consts.NodeAnnotationOvercommitAllocatableMemoryKey) @@ -494,6 +495,18 @@ func (nc *NodeOvercommitController) setNodeOvercommitAnnotations(nodeName string return nil } +func (nc *NodeOvercommitController) nodeEnableDynamicOvercommit(nodeName string) bool { + // get node matched config + + noc := nc.matcher.GetConfig(nodeName) + if noc == nil { + // if node is not matched with any noc, overcommit is not allowed + return false + } + + return noc.Spec.EnableDynamicOvercommit +} + func emptyOvercommitConfig() *configv1alpha1.NodeOvercommitConfig { return &configv1alpha1.NodeOvercommitConfig{ Spec: configv1alpha1.NodeOvercommitConfigSpec{ @@ -623,16 +636,16 @@ func (nc *NodeOvercommitController) getGuaranteedCPU(nodeName string) (int, erro return guaranteedCPUs, nil } -func validCPUOvercommitRatio(annotation map[string]string) float64 { - res, err := overcommitutil.OvercommitRatioValidate(annotation, consts.NodeAnnotationCPUOvercommitRatioKey, consts.NodeAnnotationRealtimeCPUOvercommitRatioKey) +func validCPUOvercommitRatio(annotation map[string]string, enableDynamicOvercommit bool) float64 { + res, err := overcommitutil.OvercommitRatioValidate(annotation, consts.NodeAnnotationCPUOvercommitRatioKey, consts.NodeAnnotationPredictCPUOvercommitRatioKey, consts.NodeAnnotationRealtimeCPUOvercommitRatioKey, enableDynamicOvercommit) if err != nil { klog.Error(err) } return res } -func validMemoryOvercommitRatio(annotation map[string]string) float64 { - res, err := overcommitutil.OvercommitRatioValidate(annotation, consts.NodeAnnotationMemoryOvercommitRatioKey, consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey) +func validMemoryOvercommitRatio(annotation map[string]string, enableDynamicOvercommit bool) float64 { + res, err := overcommitutil.OvercommitRatioValidate(annotation, consts.NodeAnnotationMemoryOvercommitRatioKey, consts.NodeAnnotationPredictMemoryOvercommitRatioKey, consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey, enableDynamicOvercommit) if err != nil { klog.Error(err) } diff --git a/pkg/controller/overcommit/prediction/common/types.go b/pkg/controller/overcommit/prediction/common/types.go new file mode 100644 index 0000000000..54586ff926 --- /dev/null +++ b/pkg/controller/overcommit/prediction/common/types.go @@ -0,0 +1,111 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "fmt" + "time" +) + +type TimeSeries struct { + Metadata []Metadata + + Samples []Sample +} + +type Sample struct { + Value float64 + Timestamp int64 +} + +type Samples []Sample + +func (s Samples) Len() int { + return len(s) +} + +func (s Samples) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s Samples) Less(i, j int) bool { + if s[i].Timestamp == 0 { + return true + } + if s[j].Timestamp == 0 { + return false + } + + location, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + location = time.Local + } + // sort sample timestamp hour + houri := time.Unix(s[i].Timestamp, 0).In(location).Hour() + hourj := time.Unix(s[j].Timestamp, 0).In(location).Hour() + + return houri < hourj +} + +type Metadata struct { + Key string + Value string +} + +func EmptyTimeSeries() *TimeSeries { + return &TimeSeries{ + Metadata: []Metadata{}, + Samples: []Sample{}, + } +} + +func (ts *TimeSeries) Add(timeSeries *TimeSeries) error { + if len(ts.Samples) != len(timeSeries.Samples) { + return fmt.Errorf("timeSeries points %v and %v not equal", len(ts.Samples), len(timeSeries.Samples)) + } + + for i := range ts.Samples { + ts.Samples[i].Value += timeSeries.Samples[i].Value + } + return nil +} + +func (ts *TimeSeries) Max() Sample { + if len(ts.Samples) == 0 { + return Sample{ + Value: 0, + } + } + + res := ts.Samples[0] + for _, sample := range ts.Samples { + if sample.Value > res.Value { + res = sample + } + } + return res +} + +type PredictArgs struct { + Namespace string + WorkloadType string + WorkloadName string + Interval int64 // seconds + StartTime int64 // unix timestamp + Duration int64 // seconds + ResourceName string +} diff --git a/pkg/controller/overcommit/prediction/common/types_test.go b/pkg/controller/overcommit/prediction/common/types_test.go new file mode 100644 index 0000000000..b5f162c395 --- /dev/null +++ b/pkg/controller/overcommit/prediction/common/types_test.go @@ -0,0 +1,57 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestSort(t *testing.T) { + t.Parallel() + a := time.Now() + b := a.Add(23 * time.Hour) + + samples := []Sample{ + { + Value: 0, + Timestamp: a.Unix(), + }, + { + Value: 1, + Timestamp: b.Unix(), + }, + } + sort.Sort(Samples(samples)) + + assert.Equal(t, 1.0, samples[0].Value) + + samples = []Sample{ + { + Value: 0, + Timestamp: 0, + }, + { + Value: 1, + Timestamp: b.Unix(), + }, + } + assert.Equal(t, 0.0, samples[0].Value) +} diff --git a/pkg/controller/overcommit/prediction/prediction.go b/pkg/controller/overcommit/prediction/prediction.go new file mode 100644 index 0000000000..83e4c0ed32 --- /dev/null +++ b/pkg/controller/overcommit/prediction/prediction.go @@ -0,0 +1,571 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prediction + +import ( + "context" + "fmt" + "sort" + "strings" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + listerv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + workloadlister "github.com/kubewharf/katalyst-api/pkg/client/listers/workload/v1alpha1" + "github.com/kubewharf/katalyst-api/pkg/consts" + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/pkg/client/control" + "github.com/kubewharf/katalyst-core/pkg/config/controller" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/common" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/predictor" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/provider/prom" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +type Prediction struct { + sync.RWMutex + ctx context.Context + + conf *controller.OvercommitConfig + + provider prom.Interface + predictor predictor.Interface + + nodeLister listerv1.NodeLister + podIndexer cache.Indexer + workloadLister map[schema.GroupVersionResource]cache.GenericLister + spdLister workloadlister.ServiceProfileDescriptorLister + syncedFunc []cache.InformerSynced + + // cache workload usage calculated by predictor when portrait is unable + // workload -> metricName -> predict timeSeries + workloadUsageCache map[string]map[string]*common.TimeSeries + + nodeUpdater control.NodeUpdater + rpUpdater control.SPDControlImp + + metricsEmitter metrics.MetricEmitter + + firstReconcileWorkload bool +} + +func NewPredictionController( + ctx context.Context, + controlCtx *katalyst_base.GenericContext, + overcommitConf *controller.OvercommitConfig, +) (*Prediction, error) { + if overcommitConf == nil || controlCtx.Client == nil { + return nil, fmt.Errorf("client and overcommitConf can not be nil") + } + + klog.V(6).Infof("overcommitConf: %v", *overcommitConf) + + predictionController := &Prediction{ + ctx: ctx, + metricsEmitter: controlCtx.EmitterPool.GetDefaultMetricsEmitter(), + conf: overcommitConf, + workloadLister: map[schema.GroupVersionResource]cache.GenericLister{}, + firstReconcileWorkload: false, + } + + // init workload lister + workloadInformers := controlCtx.DynamicResourcesManager.GetDynamicInformers() + for _, wf := range workloadInformers { + klog.Infof("workload informer: %v", wf.GVR.String()) + predictionController.workloadLister[wf.GVR] = wf.Informer.Lister() + predictionController.syncedFunc = append(predictionController.syncedFunc, wf.Informer.Informer().HasSynced) + } + + err := predictionController.initProvider() + if err != nil { + klog.Errorf("init provider fail: %v", err) + return nil, err + } + + // init predictor + err = predictionController.initPredictor() + if err != nil { + klog.Errorf("init predictor fail: %v", err) + return nil, err + } + + if predictionController.predictor != nil && predictionController.provider != nil { + // if local predictor is enabled, init workloadUsageCache + predictionController.workloadUsageCache = map[string]map[string]*common.TimeSeries{} + } else { + // if spd portrait is enabled, init spdLister + predictionController.spdLister = controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors().Lister() + predictionController.syncedFunc = append(predictionController.syncedFunc, controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors().Informer().HasSynced) + } + + podInformer := controlCtx.KubeInformerFactory.Core().V1().Pods() + predictionController.podIndexer = podInformer.Informer().GetIndexer() + predictionController.podIndexer.AddIndexers(cache.Indexers{ + nodePodIndex: nodePodIndexFunc, + }) + predictionController.syncedFunc = append(predictionController.syncedFunc, podInformer.Informer().HasSynced) + + predictionController.nodeLister = controlCtx.KubeInformerFactory.Core().V1().Nodes().Lister() + predictionController.syncedFunc = append(predictionController.syncedFunc, controlCtx.KubeInformerFactory.Core().V1().Nodes().Informer().HasSynced) + + predictionController.nodeUpdater = control.NewRealNodeUpdater(controlCtx.Client.KubeClient) + + predictionController.addDeleteHandler(workloadInformers) + + return predictionController, nil +} + +func (p *Prediction) Run() { + if !cache.WaitForCacheSync(p.ctx.Done(), p.syncedFunc...) { + klog.Fatalf("unable to sync caches") + } + + if p.predictor != nil && p.provider != nil { + go wait.Until(p.reconcileWorkloads, p.conf.Prediction.PredictPeriod, p.ctx.Done()) + + // if predictor is used, wait for first reconcile before reconcile nodes + _ = wait.PollImmediateUntil(5*time.Second, func() (done bool, err error) { + return p.firstReconcileWorkload, nil + }, p.ctx.Done()) + + klog.V(6).Infof("first reconcile workloads finish: %v", p.firstReconcileWorkload) + + go wait.Until(p.reconcileNodes, p.conf.Prediction.ReconcilePeriod, p.ctx.Done()) + } else { + klog.Infof("nil predictor, skip reconcile workload") + go wait.Until(p.reconcileNodes, p.conf.Prediction.ReconcilePeriod, p.ctx.Done()) + } +} + +// calculate node usage and overcommitment ratio by workloads usage +func (p *Prediction) reconcileNodes() { + // list nodes + nodeList, err := p.nodeLister.List(labels.Everything()) + if err != nil { + klog.Errorf("overcommit prediction list node fail: %v", err) + return + } + + for _, node := range nodeList { + cpuVal, memoryVal, err := p.estimateNode(node) + if err != nil { + klog.Errorf("estimate node %v fail: %v", node.Name, err) + continue + } + klog.V(6).Infof("estimate node %v overcommit CPU: %v, memory: %v", node.Name, cpuVal, memoryVal) + + // update node annotation + err = p.updatePredictOvercommitRatio(cpuVal, memoryVal, node) + if err != nil { + klog.Errorf("update node %v overcommit ratio fail: %v", node.Name, err) + continue + } + } +} + +func (p *Prediction) estimateNode(node *v1.Node) (float64, float64, error) { + // get pods in node + objs, err := p.podIndexer.ByIndex(nodePodIndex, node.Name) + if err != nil { + return 0, 0, err + } + if len(objs) <= 0 { + return 0, 0, nil + } + + var ( + sumPodCpuTimeSeries, sumPodMemoryTimeSeries *common.TimeSeries + nodeResource = v1.ResourceList{} + ) + for _, obj := range objs { + pod, ok := obj.(*v1.Pod) + if !ok { + return 0, 0, fmt.Errorf("can not convert obj to pod: %v", obj) + } + klog.V(6).Infof("estimate node, namespace: %v, podname: %v, owner: %v", pod.Namespace, pod.Name, pod.OwnerReferences) + + // get pod resource portrait usage + cpuTimeSeries, memoryTimeSeries, podResource := p.podResourceTimeSeries(pod) + + // sum pod resources + if sumPodCpuTimeSeries == nil { + sumPodCpuTimeSeries = cpuTimeSeries + } else { + err = sumPodCpuTimeSeries.Add(cpuTimeSeries) + if err != nil { + klog.Errorf("estimate node %v fail: %v", node.Name, err) + return 0, 0, err + } + } + if sumPodMemoryTimeSeries == nil { + sumPodMemoryTimeSeries = memoryTimeSeries + } else { + err = sumPodMemoryTimeSeries.Add(memoryTimeSeries) + if err != nil { + klog.Errorf("estimate node %v fail: %v", node.Name, err) + return 0, 0, err + } + } + + nodeResource = native.AddResources(nodeResource, podResource) + } + + klog.V(6).Infof("node %v cpu resource: %v, memory resource: %v, pod CPU timeSeries: %v, pod memory timeSeries: %v", + node.Name, nodeResource.Cpu().String(), nodeResource.Memory().String(), sumPodCpuTimeSeries, sumPodMemoryTimeSeries) + + nodeAllocatable := getNodeAllocatable(node) + // calculate node overcommitment ratio + cpuOvercommitRatio := p.resourceToOvercommitRatio( + node.Name, + v1.ResourceCPU.String(), + float64(nodeResource.Cpu().MilliValue()), + sumPodCpuTimeSeries.Max().Value, + float64(nodeAllocatable.Cpu().MilliValue())) + + memoryOvercommitRatio := p.resourceToOvercommitRatio( + node.Name, + v1.ResourceMemory.String(), + float64(nodeResource.Memory().Value()), + sumPodMemoryTimeSeries.Max().Value, + float64(nodeAllocatable.Memory().Value())) + + return cpuOvercommitRatio, memoryOvercommitRatio, nil +} + +// return CPU timeSeries, memory timeSeries and requestResource +func (p *Prediction) podResourceTimeSeries(pod *v1.Pod) (*common.TimeSeries, *common.TimeSeries, v1.ResourceList) { + var cpuTs, memoryTs *common.TimeSeries + + // pod request + podResource := native.SumUpPodRequestResources(pod) + + // pod to workload + workloadName, workloadType, ok := p.podToWorkloadNameAndType(pod) + if !ok { + klog.Warningf("get pod %v-%v workload fail", pod.Namespace, pod.Name) + cpuTs, memoryTs = p.timeSeriesByRequest(podResource) + return cpuTs, memoryTs, podResource + } + + if p.predictor != nil && p.provider != nil { + // get time series from cache + cacheKey := workloadUsageCacheName(pod.GetNamespace(), workloadType, workloadName) + cpuTs = p.getWorkloadUsageCache(cacheKey, v1.ResourceCPU.String()) + memoryTs = p.getWorkloadUsageCache(cacheKey, v1.ResourceMemory.String()) + } else { + // get time series from spd + cpuTs, memoryTs = p.getSPDPortrait(pod.GetNamespace(), workloadName) + } + if cpuTs == nil || len(cpuTs.Samples) != workloadUsageDataLength { + cpuTs = p.cpuTimeSeriesByRequest(podResource) + } + if memoryTs == nil || len(memoryTs.Samples) != workloadUsageDataLength { + memoryTs = p.memoryTimeSeriesByRequest(podResource) + } + + klog.V(6).Infof("workload %v cpu timeseries: %v", workloadName, cpuTs.Samples) + klog.V(6).Infof("workload %v memory timeseries: %v", workloadName, memoryTs.Samples) + klog.V(6).Infof("pod %v podResource: %v", pod.Name, podResource.Cpu().MilliValue()) + + return cpuTs, memoryTs, podResource +} + +func (p *Prediction) getWorkloadUsageCache(key string, resourceName string) *common.TimeSeries { + p.RLock() + defer p.RUnlock() + + if p.workloadUsageCache == nil { + klog.Errorf("getWorkloadUsageCache fail, workloadUsageCache is nil") + return nil + } + cache, ok := p.workloadUsageCache[key] + if !ok { + klog.Warningf("%v workloadUsageCache miss", key) + return nil + } + return cache[resourceName] +} + +func (p *Prediction) getSPDPortrait(podNamespace string, workloadName string) (*common.TimeSeries, *common.TimeSeries) { + var ( + cpuTs = common.EmptyTimeSeries() + memoryTs = common.EmptyTimeSeries() + ) + + // get spd from lister + spd, err := p.spdLister.ServiceProfileDescriptors(podNamespace).Get(workloadName) + if err != nil { + klog.Errorf("get workload %v spd fail: %v", workloadName, err) + return nil, nil + } + + for i := range spd.Status.AggMetrics { + if spd.Status.AggMetrics[i].Scope != spdPortraitScope { + continue + } + + for _, podMetrics := range spd.Status.AggMetrics[i].Items { + timestamp := podMetrics.Timestamp.Time + for _, containerMetrics := range podMetrics.Containers { + if containerMetrics.Name != spdPortraitLoadAwareMetricName { + continue + } + if usage, ok := containerMetrics.Usage[resourceToPortraitMetrics[v1.ResourceCPU.String()]]; ok { + cpuTs.Samples = append(cpuTs.Samples, common.Sample{ + Timestamp: timestamp.Unix(), + Value: float64(usage.MilliValue()), + }) + } + + if usage, ok := containerMetrics.Usage[resourceToPortraitMetrics[v1.ResourceMemory.String()]]; ok { + memoryTs.Samples = append(memoryTs.Samples, common.Sample{ + Timestamp: timestamp.Unix(), + Value: float64(usage.Value()), + }) + } + } + } + } + + sort.Sort(common.Samples(cpuTs.Samples)) + sort.Sort(common.Samples(memoryTs.Samples)) + klog.V(6).Infof("getSPDPortrait, cpu: %v, memory: %v", cpuTs.Samples, memoryTs.Samples) + return cpuTs, memoryTs +} + +func (p *Prediction) resourceToOvercommitRatio(nodeName string, resourceName string, request float64, estimateUsage float64, nodeAllocatable float64) float64 { + if request == 0 { + klog.Warningf("node %v request is zero", nodeName) + return 0 + } + if estimateUsage == 0 { + klog.Warningf("node %v estimateUsage is zero", nodeName) + return 0 + } + if nodeAllocatable == 0 { + klog.Errorf("node %v allocatable is zero", nodeName) + return 0 + } + + nodeMaxLoad := estimateUsage / request + if nodeMaxLoad > 1 { + nodeMaxLoad = 1 + } + var podExpectedLoad, nodeTargetLoad float64 + switch resourceName { + case v1.ResourceCPU.String(): + podExpectedLoad = p.conf.Prediction.PodEstimatedCPULoad + nodeTargetLoad = p.conf.Prediction.NodeCPUTargetLoad + case v1.ResourceMemory.String(): + podExpectedLoad = p.conf.Prediction.PodEstimatedMemoryLoad + nodeTargetLoad = p.conf.Prediction.NodeMemoryTargetLoad + default: + klog.Warningf("unknown resourceName: %v", resourceName) + return 0 + } + if nodeMaxLoad < podExpectedLoad { + nodeMaxLoad = podExpectedLoad + } + + overcommitRatio := ((nodeAllocatable*nodeTargetLoad-estimateUsage)/nodeMaxLoad + request) / nodeAllocatable + + klog.V(6).Infof("resource %v request: %v, allocatable: %v, usage: %v, targetLoad: %v, nodeMaxLoad: %v, overcommitRatio: %v", + resourceName, request, nodeAllocatable, estimateUsage, nodeTargetLoad, nodeMaxLoad, overcommitRatio) + if overcommitRatio < 1.0 { + overcommitRatio = 1.0 + } + return overcommitRatio +} + +func (p *Prediction) podToWorkloadNameAndType(pod *v1.Pod) (string, string, bool) { + if p.conf.Prediction.TargetReferenceNameKey != "" && p.conf.Prediction.TargetReferenceTypeKey != "" { + return p.podToWorkloadByLabel(pod) + } + return p.podToWorkloadByOwner(pod) +} + +// get pod owner name and type by specified label key +func (p *Prediction) podToWorkloadByLabel(pod *v1.Pod) (string, string, bool) { + if pod.Labels == nil { + return "", "", false + } + + workloadName, ok := pod.Labels[p.conf.Prediction.TargetReferenceNameKey] + if !ok { + return "", "", false + } + + workloadType, ok := pod.Labels[p.conf.Prediction.TargetReferenceTypeKey] + if !ok { + return "", "", false + } + return workloadName, workloadType, ok +} + +func (p *Prediction) podToWorkloadByOwner(pod *v1.Pod) (string, string, bool) { + for _, owner := range pod.OwnerReferences { + kind := owner.Kind + switch kind { + // resource portrait time series predicted and stored by deployment, but pod owned by rs + case "ReplicaSet": + names := strings.Split(owner.Name, "-") + if len(names) <= 1 { + klog.Warningf("unexpected rs name: %v", owner.Name) + return "", "", false + } + names = names[0 : len(names)-1] + return strings.Join(names, "-"), "Deployment", true + default: + return owner.Name, kind, true + } + } + + return "", "", false +} + +func (p *Prediction) updatePredictOvercommitRatio(cpu, memory float64, node *v1.Node) error { + nodeCopy := node.DeepCopy() + nodeAnnotation := nodeCopy.Annotations + if nodeAnnotation == nil { + nodeAnnotation = make(map[string]string) + } + + if cpu < 1 { + delete(nodeAnnotation, consts.NodeAnnotationPredictCPUOvercommitRatioKey) + } else { + nodeAnnotation[consts.NodeAnnotationPredictCPUOvercommitRatioKey] = fmt.Sprintf("%.2f", cpu) + } + if memory < 1 { + delete(nodeAnnotation, consts.NodeAnnotationPredictMemoryOvercommitRatioKey) + } else { + nodeAnnotation[consts.NodeAnnotationPredictMemoryOvercommitRatioKey] = fmt.Sprintf("%.2f", memory) + } + + nodeCopy.Annotations = nodeAnnotation + return p.nodeUpdater.PatchNode(p.ctx, node, nodeCopy) +} + +// use request * config.scaleFactor as usage time series if pod resource portrait not exist +func (p *Prediction) timeSeriesByRequest(podResource v1.ResourceList) (*common.TimeSeries, *common.TimeSeries) { + cpuTs := p.cpuTimeSeriesByRequest(podResource) + memoryTs := p.memoryTimeSeriesByRequest(podResource) + return cpuTs, memoryTs +} + +func (p *Prediction) cpuTimeSeriesByRequest(podResource v1.ResourceList) *common.TimeSeries { + return p.genTimeSeries(v1.ResourceCPU, podResource, p.conf.Prediction.CPUScaleFactor) +} + +func (p *Prediction) memoryTimeSeriesByRequest(podResource v1.ResourceList) *common.TimeSeries { + return p.genTimeSeries(v1.ResourceMemory, podResource, p.conf.Prediction.MemoryScaleFactor) +} + +func (p *Prediction) genTimeSeries(resourceName v1.ResourceName, podResource v1.ResourceList, scaleFactor float64) *common.TimeSeries { + timeSeries := &common.TimeSeries{ + Samples: make([]common.Sample, 24, 24), + } + + if quantity, ok := podResource[resourceName]; ok { + usage := native.MultiplyResourceQuantity(resourceName, quantity, scaleFactor) + for i := range timeSeries.Samples { + value := usage.Value() + if resourceName == v1.ResourceCPU { + value = usage.MilliValue() + } + timeSeries.Samples[i] = common.Sample{ + Value: float64(value), + } + } + } + return timeSeries +} + +func nodePodIndexFunc(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok || pod == nil { + return nil, fmt.Errorf("failed to reflect a obj to pod") + } + + if pod.Spec.NodeName == "" { + return []string{}, nil + } + + return []string{pod.Spec.NodeName}, nil +} + +func (p *Prediction) addDeleteHandler(informers map[string]native.DynamicInformer) { + if p.predictor != nil && p.provider != nil { + for _, informer := range informers { + informer.Informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: p.deleteWorkloadUsageCache, + }) + } + } +} + +func (p *Prediction) deleteWorkloadUsageCache(obj interface{}) { + if p.workloadUsageCache == nil { + return + } + + object, ok := obj.(*unstructured.Unstructured) + if !ok { + klog.Errorf("cannot convert obj to Unstructured: %v", obj) + return + } + + name := workloadUsageCacheName(object.GetNamespace(), object.GetKind(), object.GetName()) + + p.Lock() + defer p.Unlock() + delete(p.workloadUsageCache, name) +} + +// get node allocatable before overcommit +func getNodeAllocatable(node *v1.Node) v1.ResourceList { + res := v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0"), + v1.ResourceMemory: resource.MustParse("0"), + } + + // get allocatable from node annotation first + cpu, ok := node.Annotations[consts.NodeAnnotationOriginalAllocatableCPUKey] + if ok { + res[v1.ResourceCPU] = resource.MustParse(cpu) + } else { + // if no allocatable in node annotation, get allocatable from node resource + res[v1.ResourceCPU] = *node.Status.Allocatable.Cpu() + } + + mem, ok := node.Annotations[consts.NodeAnnotationOriginalAllocatableMemoryKey] + if ok { + res[v1.ResourceMemory] = resource.MustParse(mem) + } else { + res[v1.ResourceMemory] = *node.Status.Allocatable.Memory() + } + return res +} diff --git a/pkg/controller/overcommit/prediction/prediction_test.go b/pkg/controller/overcommit/prediction/prediction_test.go new file mode 100644 index 0000000000..f933eaa586 --- /dev/null +++ b/pkg/controller/overcommit/prediction/prediction_test.go @@ -0,0 +1,974 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prediction + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v13 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/client-go/tools/cache" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" + + "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1" + "github.com/kubewharf/katalyst-api/pkg/consts" + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/pkg/config/controller" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/common" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/predictor" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/provider/prom" + "github.com/kubewharf/katalyst-core/pkg/util/datasource/prometheus" +) + +func TestReconcileNodes(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + conf *controller.OvercommitConfig + node *v1.Node + pods []*v1.Pod + spds []*v1alpha1.ServiceProfileDescriptor + caches map[string]map[string]*common.TimeSeries + success bool + }{ + { + name: "test1", + success: true, + conf: &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + CPUScaleFactor: 0.5, + MemoryScaleFactor: 1, + NodeCPUTargetLoad: 0.8, + PodEstimatedCPULoad: 0.6, + NodeMemoryTargetLoad: 0.9, + PodEstimatedMemoryLoad: 0.8, + }, + }, + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + consts.NodeAnnotationOriginalAllocatableCPUKey: "31200m", + consts.NodeAnnotationOriginalAllocatableMemoryKey: "29258114498560m", + }, + }, + }, + spds: []*v1alpha1.ServiceProfileDescriptor{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "testDeployment2", + Namespace: "default", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + testAggMetrics(4000, 8*1024*1024*1024, 24), + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "testDeployment1", + Namespace: "default", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + testAggMetrics(2000, 4*1024*1024*1024, 24), + }, + }, + }, + }, + pods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod2", + UID: "pod2", + Namespace: "default", + OwnerReferences: []v12.OwnerReference{ + { + APIVersion: "app/v1", + Kind: "Deployment", + Name: "testDeployment2", + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + UID: "pod1", + Namespace: "default", + OwnerReferences: []v12.OwnerReference{ + { + APIVersion: "app/v1", + Kind: "Deployment", + Name: "testDeployment1", + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "node without pods", + success: false, + conf: &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + CPUScaleFactor: 0.5, + MemoryScaleFactor: 1, + NodeCPUTargetLoad: 0.8, + PodEstimatedCPULoad: 0.6, + NodeMemoryTargetLoad: 0.9, + PodEstimatedMemoryLoad: 0.8, + }, + }, + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + consts.NodeAnnotationOriginalAllocatableCPUKey: "31200m", + consts.NodeAnnotationOriginalAllocatableMemoryKey: "29258114498560m", + }, + }, + }, + pods: []*v1.Pod{}, + spds: nil, + }, + { + name: "pod time series error", + success: false, + conf: &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + CPUScaleFactor: 0.5, + MemoryScaleFactor: 1, + NodeCPUTargetLoad: 0.8, + PodEstimatedCPULoad: 0.6, + NodeMemoryTargetLoad: 0.9, + PodEstimatedMemoryLoad: 0.8, + }, + }, + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + consts.NodeAnnotationOriginalAllocatableCPUKey: "31200m", + consts.NodeAnnotationOriginalAllocatableMemoryKey: "29258114498560m", + }, + }, + }, + spds: []*v1alpha1.ServiceProfileDescriptor{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "testDeployment2", + Namespace: "default", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + testAggMetrics(4000, 8*1024*1024*1024, 20), + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "testDeployment1", + Namespace: "default", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + testAggMetrics(2000, 4*1024*1024*1024, 18), + }, + }, + }, + }, + pods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod2", + UID: "pod2", + Namespace: "default", + OwnerReferences: []v12.OwnerReference{ + { + APIVersion: "app/v1", + Kind: "Deployment", + Name: "testDeployment2", + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, { + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + UID: "pod1", + Namespace: "default", + OwnerReferences: []v12.OwnerReference{ + { + APIVersion: "app/v1", + Kind: "Deployment", + Name: "testDeployment1", + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "pod without owner", + success: true, + conf: &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + CPUScaleFactor: 0.5, + MemoryScaleFactor: 1, + NodeCPUTargetLoad: 0.8, + PodEstimatedCPULoad: 0.6, + NodeMemoryTargetLoad: 0.9, + PodEstimatedMemoryLoad: 0.8, + }, + }, + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + consts.NodeAnnotationOriginalAllocatableCPUKey: "31200m", + consts.NodeAnnotationOriginalAllocatableMemoryKey: "29258114498560m", + }, + }, + }, + pods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod2", + UID: "pod2", + Namespace: "default", + OwnerReferences: []v12.OwnerReference{}, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, { + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + UID: "pod1", + Namespace: "default", + OwnerReferences: []v12.OwnerReference{ + { + APIVersion: "app/v1", + Kind: "Deployment", + Name: "testDeployment1", + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "spd without data", + success: true, + conf: &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + CPUScaleFactor: 0.5, + MemoryScaleFactor: 1, + NodeCPUTargetLoad: 0.8, + PodEstimatedCPULoad: 0.6, + NodeMemoryTargetLoad: 0.9, + PodEstimatedMemoryLoad: 0.8, + }, + }, + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + consts.NodeAnnotationOriginalAllocatableCPUKey: "31200m", + consts.NodeAnnotationOriginalAllocatableMemoryKey: "29258114498560m", + }, + }, + }, + spds: []*v1alpha1.ServiceProfileDescriptor{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "testDeployment2", + Namespace: "default", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{}, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "testDeployment1", + Namespace: "default", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{}, + }, + }, + }, + pods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod2", + UID: "pod2", + Namespace: "default", + OwnerReferences: []v12.OwnerReference{ + { + APIVersion: "app/v1", + Kind: "Deployment", + Name: "testDeployment2", + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, { + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + UID: "pod1", + Namespace: "default", + OwnerReferences: []v12.OwnerReference{ + { + APIVersion: "app/v1", + Kind: "Deployment", + Name: "testDeployment1", + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "predict by cache", + success: true, + conf: &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + CPUScaleFactor: 0.5, + MemoryScaleFactor: 1, + NodeCPUTargetLoad: 0.8, + PodEstimatedCPULoad: 0.6, + NodeMemoryTargetLoad: 0.9, + PodEstimatedMemoryLoad: 0.8, + }, + }, + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + consts.NodeAnnotationOriginalAllocatableCPUKey: "31200m", + consts.NodeAnnotationOriginalAllocatableMemoryKey: "29258114498560m", + }, + }, + }, + caches: map[string]map[string]*common.TimeSeries{ + workloadUsageCacheName("default", "Deployment", "testDeployment2"): testWorkloadCaches(4000, 8*1024*1024*1024, 24), + workloadUsageCacheName("default", "Deployment", "testDeployment1"): testWorkloadCaches(2000, 4*1024*1024*1024, 24), + }, + pods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod2", + UID: "pod2", + Namespace: "default", + OwnerReferences: []v12.OwnerReference{ + { + APIVersion: "app/v1", + Kind: "Deployment", + Name: "testDeployment2", + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + UID: "pod1", + Namespace: "default", + OwnerReferences: []v12.OwnerReference{ + { + APIVersion: "app/v1", + Kind: "Deployment", + Name: "testDeployment1", + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + }, + }, + }, + }, + }, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ctx := context.TODO() + controlCtx, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + + p, err := newTestController(ctx, controlCtx, tc.conf) + assert.NoError(t, err) + + if tc.caches != nil { + p.predictor = &predictor.FakePredictor{} + p.provider = &prom.FakeProvider{} + } + + if tc.caches != nil { + p.workloadUsageCache = tc.caches + } + + _, err = controlCtx.Client.KubeClient.CoreV1().Nodes().Create(ctx, tc.node, v12.CreateOptions{}) + assert.NoError(t, err) + for _, pod := range tc.pods { + _, err = controlCtx.Client.KubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, v12.CreateOptions{}) + assert.NoError(t, err) + } + for _, spd := range tc.spds { + _, err = controlCtx.Client.InternalClient.WorkloadV1alpha1().ServiceProfileDescriptors(spd.Namespace). + Create(ctx, spd, v12.CreateOptions{}) + assert.NoError(t, err) + } + + controlCtx.StartInformer(ctx) + + synced := cache.WaitForCacheSync(context.TODO().Done(), p.syncedFunc...) + assert.True(t, synced) + + p.reconcileNodes() + + time.Sleep(time.Second) + if tc.success { + node, err := p.nodeLister.Get(tc.node.Name) + assert.NoError(t, err) + _, ok := node.Annotations[consts.NodeAnnotationPredictCPUOvercommitRatioKey] + assert.True(t, ok) + _, ok = node.Annotations[consts.NodeAnnotationPredictMemoryOvercommitRatioKey] + assert.True(t, ok) + } + }) + } +} + +func TestPodToWorkloadName(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + pod *v1.Pod + labelNameKey string + labelTypeKey string + expectedName string + expectedType string + }{ + { + name: "by label", + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Labels: map[string]string{ + "app": "testApp", + "appType": "Deployment", + }, + }, + }, + labelNameKey: "app", + labelTypeKey: "appType", + expectedName: "testApp", + expectedType: "Deployment", + }, + { + name: "by label miss", + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Labels: map[string]string{ + "app": "testApp", + }, + }, + }, + labelNameKey: "testapp", + labelTypeKey: "appType", + expectedName: "", + expectedType: "", + }, + { + name: "by label nil", + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{}, + }, + labelNameKey: "app", + labelTypeKey: "appType", + expectedName: "", + expectedType: "", + }, + { + name: "by owner statefulSet", + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + OwnerReferences: []v12.OwnerReference{ + { + Kind: "StatefulSet", + Name: "testApp", + }, + }, + }, + }, + labelNameKey: "", + labelTypeKey: "appType", + expectedName: "testApp", + expectedType: "StatefulSet", + }, + { + name: "by owner rs", + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + OwnerReferences: []v12.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "test-app-xxxx", + }, + }, + }, + }, + labelNameKey: "", + labelTypeKey: "appType", + expectedName: "test-app", + expectedType: "Deployment", + }, + { + name: "by owner rs fail", + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + OwnerReferences: []v12.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "test", + }, + }, + }, + }, + labelNameKey: "", + labelTypeKey: "appType", + expectedName: "", + expectedType: "", + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + p := &Prediction{ + conf: &controller.OvercommitConfig{}, + } + p.conf.Prediction.TargetReferenceNameKey = tc.labelNameKey + p.conf.Prediction.TargetReferenceTypeKey = tc.labelTypeKey + + name, kind, _ := p.podToWorkloadNameAndType(tc.pod) + assert.Equal(t, tc.expectedName, name) + assert.Equal(t, tc.expectedType, kind) + }) + } +} + +func TestNodePodIndexFunc(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + pod *v1.Pod + err bool + expectRes []string + }{ + { + name: "nil pod", + pod: nil, + err: true, + }, + { + name: "without node name", + pod: &v1.Pod{}, + err: false, + expectRes: []string{}, + }, + { + name: "with node name", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + NodeName: "testNode", + }, + }, + err: false, + expectRes: []string{"testNode"}, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + res, err := nodePodIndexFunc(tc.pod) + if tc.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectRes, res) + } + }) + } +} + +func TestTimeSeriesByRequest(t *testing.T) { + t.Parallel() + + generateExpectTimeSeries := func(value float64) *common.TimeSeries { + res := common.EmptyTimeSeries() + for i := 0; i < 24; i++ { + res.Samples = append(res.Samples, common.Sample{ + Value: value, + }) + } + return res + } + + for _, tc := range []struct { + name string + resourceList v1.ResourceList + scaleFactor float64 + expectCPU *common.TimeSeries + expectMemory *common.TimeSeries + }{ + { + name: "test1", + resourceList: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("4Gi"), + }, + scaleFactor: 1, + expectCPU: generateExpectTimeSeries(2000), + expectMemory: generateExpectTimeSeries(4 * 1024 * 1024 * 1024), + }, + { + name: "test2", + resourceList: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + scaleFactor: 0.5, + expectCPU: generateExpectTimeSeries(2000), + expectMemory: generateExpectTimeSeries(4 * 1024 * 1024 * 1024), + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + p := &Prediction{ + conf: &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + CPUScaleFactor: tc.scaleFactor, + MemoryScaleFactor: tc.scaleFactor, + }, + }, + } + + cpuTs, memTs := p.timeSeriesByRequest(tc.resourceList) + assert.Equal(t, tc.expectCPU.Samples, cpuTs.Samples) + assert.Equal(t, tc.expectMemory.Samples, memTs.Samples) + }) + } +} + +func TestNewPredictionController(t *testing.T) { + t.Parallel() + controlCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{}, + []runtime.Object{}, []runtime.Object{ + &v13.Deployment{ + TypeMeta: v12.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + }, + }) + assert.NoError(t, err) + + conf := &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + PromConfig: &prometheus.PromConfig{}, + }, + } + p, err := NewPredictionController(context.TODO(), controlCtx, conf) + assert.NoError(t, err) + assert.NotNil(t, p) +} + +func TestPodNameByWorkload(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + workloadName string + workloadType string + expectRes string + }{ + { + name: "daemonset", + workloadName: "test", + workloadType: "DaemonSet", + expectRes: "^test-[a-z0-9]{5}$", + }, + { + name: "statefulset", + workloadName: "test", + workloadType: "StatefulSet", + expectRes: "^test-[0-9]*$", + }, + { + name: "others", + workloadName: "test", + workloadType: "unknown", + expectRes: "^test-.*", + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + res := podNameByWorkload(tc.workloadName, tc.workloadType) + assert.Equal(t, tc.expectRes, res) + }) + } +} + +func TestResourceToOvercommitRatio(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + resourceName string + request float64 + estimateUsage float64 + nodeAllocatable float64 + expectRes float64 + }{ + { + name: "zero request", + request: 0, + expectRes: 0, + }, + { + name: "zero usage", + request: 1, + estimateUsage: 0, + expectRes: 0, + }, + { + name: "zero allocatable", + request: 1, + estimateUsage: 1, + nodeAllocatable: 0, + expectRes: 0, + }, + { + name: "unknown resource", + resourceName: "", + request: 1, + estimateUsage: 1, + nodeAllocatable: 1, + expectRes: 0, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + p := &Prediction{} + res := p.resourceToOvercommitRatio("", tc.resourceName, tc.request, tc.estimateUsage, tc.nodeAllocatable) + assert.Equal(t, tc.expectRes, res) + }) + } +} + +func testAggMetrics(cpu, memory int64, len int) v1alpha1.AggPodMetrics { + now := time.Now() + rand.Seed(now.Unix()) + startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Add(24 * time.Hour) + + res := v1alpha1.AggPodMetrics{ + Scope: spdPortraitScope, + Items: []v1beta1.PodMetrics{}, + } + + for i := 0; i < len; i++ { + res.Items = append(res.Items, v1beta1.PodMetrics{ + Timestamp: v12.Time{Time: startTime}, + Containers: []v1beta1.ContainerMetrics{ + { + Name: spdPortraitLoadAwareMetricName, + Usage: map[v1.ResourceName]resource.Quantity{ + metricNameCpuUtilizationUsageSecondsMax: *resource.NewMilliQuantity(rand.Int63nRange(0, cpu), resource.DecimalSI), + metricNameMemoryUtilizationMax: *resource.NewQuantity(rand.Int63nRange(0, memory), resource.BinarySI), + }, + }, + }, + }) + startTime = startTime.Add(time.Hour) + } + + return res +} + +func testWorkloadCaches(cpu, memory int64, len int) map[string]*common.TimeSeries { + now := time.Now() + rand.Seed(now.Unix()) + startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Add(24 * time.Hour) + + res := map[string]*common.TimeSeries{ + "cpu": { + Samples: make([]common.Sample, len), + }, + "memory": { + Samples: make([]common.Sample, len), + }, + } + for i := 0; i < len; i++ { + res["cpu"].Samples[i] = common.Sample{ + Timestamp: startTime.Unix(), + Value: float64(rand.Int63nRange(0, cpu)), + } + res["memory"].Samples[i] = common.Sample{ + Timestamp: startTime.Unix(), + Value: float64(rand.Int63nRange(0, memory)), + } + startTime = startTime.Add(time.Hour) + } + return res +} diff --git a/pkg/controller/overcommit/prediction/predictor/interface.go b/pkg/controller/overcommit/prediction/predictor/interface.go new file mode 100644 index 0000000000..1c671d9428 --- /dev/null +++ b/pkg/controller/overcommit/prediction/predictor/interface.go @@ -0,0 +1,47 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predictor + +import ( + "context" + "fmt" + + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/common" +) + +type Interface interface { + // PredictTimeSeries predict time series data by time series history metrics + PredictTimeSeries(ctx context.Context, args *common.PredictArgs, history *common.TimeSeries) (*common.TimeSeries, error) +} + +type FakePredictor struct { + ts *common.TimeSeries +} + +func NewFakePredictor(timeSeries *common.TimeSeries) *FakePredictor { + return &FakePredictor{ + ts: timeSeries, + } +} + +func (f *FakePredictor) PredictTimeSeries(ctx context.Context, args *common.PredictArgs, history *common.TimeSeries) (*common.TimeSeries, error) { + if f.ts == nil { + return nil, fmt.Errorf("test error") + } + + return f.ts, nil +} diff --git a/pkg/controller/overcommit/prediction/predictor/nsigma/predictor.go b/pkg/controller/overcommit/prediction/predictor/nsigma/predictor.go new file mode 100644 index 0000000000..33ad4b788c --- /dev/null +++ b/pkg/controller/overcommit/prediction/predictor/nsigma/predictor.go @@ -0,0 +1,120 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nsigma + +import ( + "context" + "math" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/common" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/predictor" +) + +const NSigmaPredictor = "NSigma" + +type Predictor struct { + N int + Buckets int +} + +func NewPredictor(factor int, buckets int) predictor.Interface { + return &Predictor{ + N: factor, + Buckets: buckets, + } +} + +func (p *Predictor) PredictTimeSeries(_ context.Context, args *common.PredictArgs, history *common.TimeSeries) (*common.TimeSeries, error) { + historyBuckets := make([][]common.Sample, p.Buckets) + location, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + klog.Errorf("load location fail: %v", err) + location = time.Now().Location() + } + + for i := range history.Samples { + hour := time.Unix(history.Samples[i].Timestamp/1000, 0).In(location).Hour() + if historyBuckets[hour%p.Buckets] == nil { + historyBuckets[hour%p.Buckets] = make([]common.Sample, 0) + } + historyBuckets[hour%p.Buckets] = append(historyBuckets[hour%p.Buckets], history.Samples[i]) + } + + res := common.EmptyTimeSeries() + for i := range history.Metadata { + res.Metadata = append(res.Metadata, history.Metadata[i]) + } + + // use next day hours as timestamp + nextDay := truncateToNextDay(location) + for i := range historyBuckets { + peak := p.predictPeak(historyBuckets[i]) + if args.ResourceName == v1.ResourceCPU.String() { + peak = peak * 1000 + } + res.Samples = append(res.Samples, common.Sample{ + Value: peak, + Timestamp: nextDay.Add(time.Duration(i) * time.Hour).Unix(), + }) + } + + return res, nil +} + +func (p *Predictor) predictPeak(samples []common.Sample) float64 { + m := mean(samples) + std := stdev(samples, m) + + return m + float64(p.N)*std +} + +func mean(samples []common.Sample) float64 { + l := len(samples) + if l <= 0 { + return 0 + } + + sum := 0.0 + for i := range samples { + sum += samples[i].Value + } + + return sum / float64(l) +} + +func stdev(samples []common.Sample, mean float64) float64 { + l := len(samples) + if l <= 0 { + return 0 + } + + sd := 0.0 + for i := range samples { + sd += math.Pow(samples[i].Value-mean, 2) + } + sd = math.Sqrt(sd / float64(l)) + return sd +} + +func truncateToNextDay(location *time.Location) time.Time { + now := time.Now() + return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, location).Add(24 * time.Hour) +} diff --git a/pkg/controller/overcommit/prediction/predictor/nsigma/predictor_test.go b/pkg/controller/overcommit/prediction/predictor/nsigma/predictor_test.go new file mode 100644 index 0000000000..b30e2e4c49 --- /dev/null +++ b/pkg/controller/overcommit/prediction/predictor/nsigma/predictor_test.go @@ -0,0 +1,88 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nsigma + +import ( + "context" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/common" +) + +func TestPredictTimeSeries(t *testing.T) { + t.Parallel() + + history := generateTestTimeSeries() + + p := NewPredictor(3, 24) + predictTs, err := p.PredictTimeSeries(context.TODO(), &common.PredictArgs{ResourceName: "cpu"}, history) + assert.NoError(t, err) + assert.Equal(t, len(predictTs.Samples), 24) + + predictTs2, err := p.PredictTimeSeries(context.TODO(), &common.PredictArgs{ResourceName: "cpu"}, generateRandTimeSeries()) + assert.NoError(t, err) + assert.Equal(t, len(predictTs2.Samples), 24) + + p = NewPredictor(3, 1) + predictTs3, err := p.PredictTimeSeries(context.TODO(), &common.PredictArgs{ResourceName: "cpu"}, history) + assert.NoError(t, err) + assert.Equal(t, len(predictTs3.Samples), 1) +} + +func generateTestTimeSeries() *common.TimeSeries { + rand.Seed(time.Now().UnixNano()) + now := time.Now() + day := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) + nextDay := day.Add(24 * time.Hour) + mean := 2.0 + stddev := 0.5 + + res := common.EmptyTimeSeries() + for day.Before(nextDay) { + res.Samples = append(res.Samples, common.Sample{ + Value: rand.NormFloat64()*stddev + mean, + Timestamp: day.Unix(), + }) + + day = day.Add(1 * time.Minute) + } + + return res +} + +func generateRandTimeSeries() *common.TimeSeries { + rand.Seed(time.Now().UnixNano()) + now := time.Now() + day := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) + nextDay := day.Add(24 * time.Hour) + + res := common.EmptyTimeSeries() + for day.Before(nextDay) { + res.Samples = append(res.Samples, common.Sample{ + Value: rand.Float64() * 4, + Timestamp: day.Unix(), + }) + + day = day.Add(1 * time.Minute) + } + + return res +} diff --git a/pkg/controller/overcommit/prediction/provider/prom/fake_provider.go b/pkg/controller/overcommit/prediction/provider/prom/fake_provider.go new file mode 100644 index 0000000000..e9290361ad --- /dev/null +++ b/pkg/controller/overcommit/prediction/provider/prom/fake_provider.go @@ -0,0 +1,46 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prom + +import ( + "context" + "fmt" + "time" + + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/common" +) + +type FakeProvider struct { + ts []*common.TimeSeries +} + +func NewFakeProvider(timeSeries []*common.TimeSeries) *FakeProvider { + return &FakeProvider{ + ts: timeSeries, + } +} + +func (f *FakeProvider) QueryTimeSeries(_ context.Context, _ string, _, _ time.Time, _ time.Duration) ([]*common.TimeSeries, error) { + if f.ts == nil { + return nil, fmt.Errorf("test error") + } + return f.ts, nil +} + +func (f *FakeProvider) BuildQuery(_ string, _ []common.Metadata) (string, error) { + return "", nil +} diff --git a/pkg/controller/overcommit/prediction/provider/prom/provider.go b/pkg/controller/overcommit/prediction/provider/prom/provider.go new file mode 100644 index 0000000000..4aa8eb9458 --- /dev/null +++ b/pkg/controller/overcommit/prediction/provider/prom/provider.go @@ -0,0 +1,198 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prom + +import ( + "context" + "fmt" + "strings" + "time" + + prometheus "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/common" + datasource "github.com/kubewharf/katalyst-core/pkg/util/datasource/prometheus" +) + +type Interface interface { + QueryTimeSeries(ctx context.Context, query string, startTime, endTime time.Time, step time.Duration) ([]*common.TimeSeries, error) + BuildQuery(metricName string, matchs []common.Metadata) (string, error) +} + +type Provider struct { + client prometheus.Client + api v1.API + maxPointsLimit int +} + +func NewProvider( + config *datasource.PromConfig, +) (*Provider, error) { + client, err := datasource.NewPrometheusClient(config) + if err != nil { + err = fmt.Errorf("new prometheus client fail: %v", err) + klog.Error(err) + return nil, err + } + + return &Provider{ + client: client, + api: v1.NewAPI(client), + maxPointsLimit: config.MaxPointsLimitPerTimeSeries, + }, nil +} + +// QueryTimeSeries query time series metrics from prometheus server +func (p *Provider) QueryTimeSeries(ctx context.Context, query string, startTime, endTime time.Time, step time.Duration) ([]*common.TimeSeries, error) { + klog.V(6).Infof("prom provider QueryTimeSeries, query: %v, startTime: %v, endTime: %v, step: %v", + query, startTime, endTime, step) + + r := v1.Range{ + Start: startTime, + End: endTime, + Step: step, + } + + // max resolution per timeSeries of prom server is limited, default 11000 points. + Shards := p.computeWindowShard(query, &r) + + res := make([]*common.TimeSeries, 0) + for i := range Shards.windows { + ts, err := p.queryTimeSeries(ctx, Shards.query, Shards.windows[i]) + if err != nil { + klog.Errorf("prom provider query time series fail, query: %v, startTime: %v, endTime: %v, step: %v, err: %v", + query, startTime, endTime, step, err) + return nil, err + } + res = append(res, ts...) + } + + return res, nil +} + +func (p *Provider) BuildQuery(metricName string, matchs []common.Metadata) (string, error) { + matchExpr := strings.Builder{} + // generate matchs + i := 0 + for _, data := range matchs { + symbol, ok := matchLabels[data.Key] + if !ok { + symbol = "=" + } + if i != 0 { + matchExpr.WriteString(",") + } + matchExpr.WriteString(data.Key) + matchExpr.WriteString(symbol) + matchExpr.WriteString(fmt.Sprintf(`"%s"`, data.Value)) + i++ + } + + switch metricName { + case corev1.ResourceCPU.String(): + return fmt.Sprintf(workloadMaxCPUUsageTemplate, matchExpr.String(), defaultCPUUsageInterval), nil + case corev1.ResourceMemory.String(): + return fmt.Sprintf(workloadMaxMemoryUsageTemplate, matchExpr.String()), nil + default: + return "", fmt.Errorf("metric %v not support yet", metricName) + } +} + +func (p *Provider) queryTimeSeries(ctx context.Context, query string, queryRange *v1.Range) ([]*common.TimeSeries, error) { + timeout, cancel := context.WithTimeout(ctx, time.Second*2) + defer cancel() + + results, warnings, err := p.api.QueryRange(timeout, query, *queryRange) + if len(warnings) != 0 { + klog.V(5).Infof("prom provider queryRange warnings: %v", warnings) + } + + if err != nil { + return nil, err + } + + return promResultsToTimeSeries(results) +} + +func promResultsToTimeSeries(value model.Value) ([]*common.TimeSeries, error) { + var ( + res = []*common.TimeSeries{} + valueType = value.Type() + ) + + switch valueType { + case model.ValMatrix: + matrix, ok := value.(model.Matrix) + if !ok { + return nil, fmt.Errorf("prom provider matrix value assert fail") + } + for _, sample := range matrix { + if sample == nil { + continue + } + + ts := common.EmptyTimeSeries() + for key, val := range sample.Metric { + ts.Metadata = append(ts.Metadata, common.Metadata{Key: string(key), Value: string(val)}) + } + for _, pair := range sample.Values { + ts.Samples = append(ts.Samples, common.Sample{Value: float64(pair.Value), Timestamp: int64(pair.Timestamp)}) + } + res = append(res, ts) + } + + return res, nil + default: + return nil, fmt.Errorf("prom provider value type %v not supported yet", valueType.String()) + } +} + +func (p *Provider) computeWindowShard(query string, window *v1.Range) *QueryShards { + shardIndex := 0 + nextPoint := window.Start + prePoint := nextPoint + var shards []*v1.Range + for { + if nextPoint.After(window.End) { + shards = append(shards, &v1.Range{ + Start: prePoint, + End: window.End, + Step: window.Step, + }) + break + } + if shardIndex != 0 && shardIndex%p.maxPointsLimit == 0 { + shards = append(shards, &v1.Range{ + Start: prePoint, + End: nextPoint.Add(-window.Step), + Step: window.Step, + }) + prePoint = nextPoint + } + nextPoint = nextPoint.Add(window.Step) + shardIndex++ + } + + return &QueryShards{ + query: query, + windows: shards, + } +} diff --git a/pkg/controller/overcommit/prediction/provider/prom/provider_test.go b/pkg/controller/overcommit/prediction/provider/prom/provider_test.go new file mode 100644 index 0000000000..8694197222 --- /dev/null +++ b/pkg/controller/overcommit/prediction/provider/prom/provider_test.go @@ -0,0 +1,178 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prom + +import ( + "testing" + "time" + + v12 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/common" +) + +func TestBuildQuery(t *testing.T) { + t.Parallel() + + p := &Provider{} + + for _, tc := range []struct { + name string + metricName string + matches []common.Metadata + expectRes string + }{ + { + name: "cpu", + metricName: v1.ResourceCPU.String(), + matches: []common.Metadata{ + {Key: "namespace", Value: "default"}, {Key: "pod", Value: "katalyst-controller-.*-.*"}, + }, + expectRes: `max(sum(rate(container_cpu_usage_seconds_total{namespace="default",pod=~"katalyst-controller-.*-.*"}[2m])) by (pod))`, + }, + { + name: "memory", + metricName: v1.ResourceMemory.String(), + matches: []common.Metadata{ + {Key: "namespace", Value: "default"}, {Key: "pod", Value: "katalyst-controller-.*-.*"}, + }, + expectRes: `max(sum(container_memory_working_set_bytes{namespace="default",pod=~"katalyst-controller-.*-.*"}) by (pod))`, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + res, err := p.BuildQuery(tc.metricName, tc.matches) + assert.NoError(t, err) + assert.Equal(t, tc.expectRes, res) + }) + } +} + +func TestPromResultsToTimeSeries(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + value model.Value + expectRes []*common.TimeSeries + }{ + { + name: "matrix type", + value: model.Matrix{ + &model.SampleStream{ + Metric: map[model.LabelName]model.LabelValue{"test": "test"}, + Values: []model.SamplePair{ + { + Timestamp: 10, + Value: 0.5, + }, + { + Timestamp: 11, + Value: 0.6, + }, + { + Timestamp: 12, + Value: 0.7, + }, + }, + }, + }, + expectRes: []*common.TimeSeries{ + { + Metadata: []common.Metadata{ + { + Key: "test", + Value: "test", + }, + }, + Samples: []common.Sample{ + { + Value: 0.5, + Timestamp: 10, + }, + { + Value: 0.6, + Timestamp: 11, + }, + { + Value: 0.7, + Timestamp: 12, + }, + }, + }, + }, + }, + { + name: "unsupported type", + value: model.Vector{}, + expectRes: nil, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + res, err := promResultsToTimeSeries(tc.value) + if res == nil { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, len(res), len(tc.expectRes)) + for i := range res { + assert.Equal(t, *res[i], *tc.expectRes[i]) + } + } + }) + } +} + +func TestComputeWindowShard(t *testing.T) { + t.Parallel() + + p := &Provider{} + + for _, tc := range []struct { + name string + query string + window *v12.Range + maxPointsLimit int + expectShards int + }{ + { + name: "test1", + query: "testQuery", + window: &v12.Range{ + Step: time.Second * 60, + Start: time.Now().Add(-9 * time.Minute), + End: time.Now(), + }, + maxPointsLimit: 5, + expectShards: 2, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + p.maxPointsLimit = tc.maxPointsLimit + shareds := p.computeWindowShard(tc.query, tc.window) + assert.Equal(t, tc.expectShards, len(shareds.windows)) + }) + } +} diff --git a/pkg/controller/overcommit/prediction/provider/prom/types.go b/pkg/controller/overcommit/prediction/provider/prom/types.go new file mode 100644 index 0000000000..1fe288bbd2 --- /dev/null +++ b/pkg/controller/overcommit/prediction/provider/prom/types.go @@ -0,0 +1,38 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prom + +import v1 "github.com/prometheus/client_golang/api/prometheus/v1" + +const ( + workloadMaxCPUUsageTemplate = "max(sum(rate(container_cpu_usage_seconds_total{%s}[%s])) by (pod))" + + workloadMaxMemoryUsageTemplate = "max(sum(container_memory_working_set_bytes{%s}) by (pod))" + + defaultCPUUsageInterval = "2m" +) + +var matchLabels = map[string]string{ + "namespace": "=", + "pod": "=~", + "container": "!=", +} + +type QueryShards struct { + query string + windows []*v1.Range +} diff --git a/pkg/controller/overcommit/prediction/types.go b/pkg/controller/overcommit/prediction/types.go new file mode 100644 index 0000000000..5707372c70 --- /dev/null +++ b/pkg/controller/overcommit/prediction/types.go @@ -0,0 +1,75 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prediction + +import ( + "fmt" + "time" + + v1 "k8s.io/api/core/v1" +) + +const ( + PostRegMatchesPodDeployment = `[a-z0-9]*-[a-z0-9]{5}$` + PostRegMatchesPodDaemonSet = `[a-z0-9]{5}$` + PostRegMatchesPodStatefulset = `[0-9]*$` + PostRegMatchesPodDefault = `.*` +) + +const ( + namespaceMatchKey = "namespace" + podMatchKey = "pod" + containerMatchKey = "container" +) + +const ( + defaultStep = 60 * time.Second +) + +const ( + nodePodIndex = "overcommit-nodepod-index" + + workloadUsageCacheNameFmt = "%s-%s-%s" // {namespace}-{workloadType}-{workloadName} + + workloadUsageDataLength = 24 + spdPortraitLoadAwareMetricName = "overcommit-predict" + spdPortraitScope = "ResourcePortraitIndicatorPlugin" + metricNameCpuUtilizationUsageSecondsMax = "cpu_utilization_usage_seconds_max" + metricNameMemoryUtilizationMax = "memory_utilization_max" +) + +func podNameByWorkload(workloadName string, workloadType string) string { + switch workloadType { + case "Deployment": + return fmt.Sprintf("^%s-%s", workloadName, PostRegMatchesPodDeployment) + case "DaemonSet": + return fmt.Sprintf("^%s-%s", workloadName, PostRegMatchesPodDaemonSet) + case "StatefulSet": + return fmt.Sprintf("^%s-%s", workloadName, PostRegMatchesPodStatefulset) + default: + return fmt.Sprintf("^%s-%s", workloadName, PostRegMatchesPodDefault) + } +} + +func workloadUsageCacheName(namespace, workloadType, workloadName string) string { + return fmt.Sprintf(workloadUsageCacheNameFmt, namespace, workloadType, workloadName) +} + +var resourceToPortraitMetrics = map[string]v1.ResourceName{ + v1.ResourceCPU.String(): metricNameCpuUtilizationUsageSecondsMax, + v1.ResourceMemory.String(): metricNameMemoryUtilizationMax, +} diff --git a/pkg/controller/overcommit/prediction/workload.go b/pkg/controller/overcommit/prediction/workload.go new file mode 100644 index 0000000000..799b3e793f --- /dev/null +++ b/pkg/controller/overcommit/prediction/workload.go @@ -0,0 +1,180 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prediction + +import ( + "fmt" + "time" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" + + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/common" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/predictor/nsigma" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/provider/prom" +) + +func (p *Prediction) initPredictor() error { + switch p.conf.Prediction.Predictor { + case nsigma.NSigmaPredictor: + p.predictor = nsigma.NewPredictor(p.conf.Prediction.Factor, p.conf.Prediction.Buckets) + case "": + klog.Infof("predictor not set, skip") + default: + return fmt.Errorf("predictor %v not support yet", p.conf.Prediction.Predictor) + } + + return nil +} + +func (p *Prediction) initProvider() error { + if p.conf.Prediction.Address == "" { + klog.Warning("provider address not set") + return nil + } + // init provider + promProvider, err := prom.NewProvider(p.conf.Prediction.PromConfig) + if err != nil { + err = fmt.Errorf("new prom provider fail: %v", err) + return err + } + p.provider = promProvider + return nil +} + +// predict workloads cpu and memory usage by history time series +func (p *Prediction) reconcileWorkloads() { + for gvr, lister := range p.workloadLister { + objs, err := lister.List(labels.Everything()) + if err != nil { + klog.Errorf("workload %v list fail: %v", gvr.String(), err) + continue + } + klog.V(6).Infof("reconcile %v workload, length: %v", gvr.String(), len(objs)) + + for _, obj := range objs { + err := p.predictWorkload(obj) + if err != nil { + klog.Error(err) + continue + } + } + } + p.firstReconcileWorkload = true +} + +func (p *Prediction) predictWorkload(obj runtime.Object) error { + workload := obj.(*unstructured.Unstructured) + + predictTimeSeriesRes := make(map[string]*common.TimeSeries, 0) + for resource := range resourceToPortraitMetrics { + // generate query + query, err := p.generateQuery(resource, workload) + if err != nil { + klog.Errorf("%v %v generateQuery fail: %v", workload.GetKind(), workload.GetName(), err) + return err + } + + // request metrics + timeSeries, err := p.requestHistoryTimeSeries(query) + if err != nil { + klog.Errorf("%v %v requestHistoryTimeSeries fail: %v", workload.GetKind(), workload.GetName(), err) + return err + } + + // validate timeSeries + ok, err := p.validateTimeSeries(timeSeries) + if !ok { + klog.V(6).Infof("workload %v validateTimeSeries: %v", workload.GetName(), err) + return err + } + + // predict + predictTimeSeries, err := p.predictor.PredictTimeSeries(p.ctx, &common.PredictArgs{WorkloadName: workload.GetName(), ResourceName: resource}, timeSeries[0]) + if err != nil { + klog.Errorf("%v %v PredictTimeSeries fail: %v", workload.GetKind(), workload.GetName(), err) + return err + } + + klog.V(6).Infof("%v %v predict timeSeries: %v", workload.GetKind(), workload.GetName(), predictTimeSeries.Samples) + predictTimeSeriesRes[resource] = predictTimeSeries + } + + err := p.updateWorkloadUsageCache(workload.GetNamespace(), workload.GetKind(), workload.GetName(), predictTimeSeriesRes) + if err != nil { + klog.Errorf("%v %v update resource portrait result fail: %v", workload.GetKind(), workload.GetName(), err) + } + + return nil +} + +func (p *Prediction) generateQuery(resourceName string, workload *unstructured.Unstructured) (string, error) { + matchLabels := []common.Metadata{ + {Key: namespaceMatchKey, Value: workload.GetNamespace()}, + {Key: podMatchKey, Value: podNameByWorkload(workload.GetName(), workload.GetKind())}, + {Key: containerMatchKey, Value: ""}, + } + + return p.provider.BuildQuery(resourceName, matchLabels) +} + +func (p *Prediction) requestHistoryTimeSeries(query string) ([]*common.TimeSeries, error) { + now := time.Now() + endTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local) + startTime := endTime.Add(-p.conf.Prediction.MaxTimeSeriesDuration) + + return p.provider.QueryTimeSeries(p.ctx, query, startTime, endTime, defaultStep) +} + +func (p *Prediction) validateTimeSeries(timeSeries []*common.TimeSeries) (bool, error) { + if len(timeSeries) <= 0 { + return false, fmt.Errorf("timeSeries without data") + } + + if len(timeSeries) > 1 { + return false, fmt.Errorf("more than 1 timeSeries") + } + + startTime := timeSeries[0].Samples[0].Timestamp + endTime := timeSeries[0].Samples[len(timeSeries[0].Samples)-1].Timestamp + + if p.conf.Prediction.MinTimeSeriesDuration > time.Duration(endTime-startTime)*time.Second { + return false, fmt.Errorf("not enough data, startTime: %v, endTime: %v, minDuration: %v", startTime, endTime, p.conf.Prediction.MinTimeSeriesDuration) + } + + return true, nil +} + +func (p *Prediction) updateWorkloadUsageCache(namespace, workloadType, workloadName string, metricTimeSeries map[string]*common.TimeSeries) error { + if len(metricTimeSeries) <= 0 { + return fmt.Errorf("update workload %v usage cache, get null timeSeries", workloadName) + } + + cacheKey := workloadUsageCacheName(namespace, workloadType, workloadName) + + p.Lock() + defer p.Unlock() + + if p.workloadUsageCache == nil { + p.workloadUsageCache = make(map[string]map[string]*common.TimeSeries) + } + p.workloadUsageCache[cacheKey] = metricTimeSeries + + return nil +} diff --git a/pkg/controller/overcommit/prediction/workload_test.go b/pkg/controller/overcommit/prediction/workload_test.go new file mode 100644 index 0000000000..7987dcfeb5 --- /dev/null +++ b/pkg/controller/overcommit/prediction/workload_test.go @@ -0,0 +1,338 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prediction + +import ( + "context" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v13 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" + + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/pkg/client/control" + "github.com/kubewharf/katalyst-core/pkg/config/controller" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/common" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/predictor" + "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/prediction/provider/prom" +) + +var deploymentGVR = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + +func TestInitPredictor(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + conf *controller.OvercommitConfig + expectErr bool + }{ + { + name: "nsigma", + conf: &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + Predictor: "NSigma", + }, + }, + expectErr: false, + }, + { + name: "skip", + conf: &controller.OvercommitConfig{}, + expectErr: false, + }, + { + name: "unknow", + conf: &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + Predictor: "test", + }, + }, + expectErr: true, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + p := &Prediction{} + p.conf = tc.conf + + err := p.initPredictor() + if tc.expectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestReconcileWorkloads(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + workload interface{} + conf *controller.OvercommitConfig + timeSeries *common.TimeSeries + predictTimeSeries *common.TimeSeries + expectRprLen int + }{ + { + name: "patch resource portrait result", + workload: &v13.Deployment{ + TypeMeta: v12.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: v12.ObjectMeta{ + Name: "testDeployment1", + Namespace: "default", + }, + }, + conf: &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + MinTimeSeriesDuration: time.Minute, + NSigmaPredictorConfig: controller.NSigmaPredictorConfig{ + Factor: 3, + Buckets: 24, + }, + }, + }, + timeSeries: generateRandTimeSeries(), + predictTimeSeries: generatePredictTimeSeries(), + expectRprLen: 24, + }, + { + name: "create resource portrait result", + workload: &v13.Deployment{ + TypeMeta: v12.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: v12.ObjectMeta{ + Name: "testDeployment2", + Namespace: "default", + }, + }, + conf: &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + MinTimeSeriesDuration: time.Minute, + NSigmaPredictorConfig: controller.NSigmaPredictorConfig{ + Factor: 3, + Buckets: 24, + }, + }, + }, + timeSeries: generateRandTimeSeries(), + predictTimeSeries: generatePredictTimeSeries(), + expectRprLen: 24, + }, + { + name: "validate fail", + workload: &v13.Deployment{ + TypeMeta: v12.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: v12.ObjectMeta{ + Name: "testDeployment3", + Namespace: "default", + }, + }, + conf: &controller.OvercommitConfig{ + Prediction: controller.PredictionConfig{ + MinTimeSeriesDuration: 48 * time.Hour, + NSigmaPredictorConfig: controller.NSigmaPredictorConfig{ + Factor: 3, + Buckets: 24, + }, + }, + }, + timeSeries: generateRandTimeSeries(), + predictTimeSeries: generatePredictTimeSeries(), + expectRprLen: 0, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + workload := tc.workload.(*v13.Deployment) + controlCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{}, + []runtime.Object{}, []runtime.Object{workload}) + assert.NoError(t, err) + + controller, err := newTestController(context.TODO(), controlCtx, tc.conf) + assert.NoError(t, err) + + controlCtx.StartInformer(context.TODO()) + + controller.provider = prom.NewFakeProvider([]*common.TimeSeries{tc.timeSeries}) + controller.predictor = predictor.NewFakePredictor(tc.predictTimeSeries) + + synced := cache.WaitForCacheSync(context.TODO().Done(), controller.syncedFunc...) + assert.True(t, synced) + + controller.reconcileWorkloads() + + d := tc.workload.(*v13.Deployment) + cacheKey := workloadUsageCacheName(d.Namespace, d.Kind, d.Name) + _, ok := controller.workloadUsageCache[cacheKey] + + if tc.expectRprLen != 0 { + assert.True(t, ok) + assert.NotNil(t, controller.workloadUsageCache[cacheKey][v1.ResourceCPU.String()]) + assert.Equal(t, tc.expectRprLen, len(controller.workloadUsageCache[cacheKey][v1.ResourceCPU.String()].Samples)) + } else { + assert.False(t, ok) + } + }) + } +} + +func TestDeleteWorkloadUsageCache(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + workload *v13.Deployment + conf *controller.OvercommitConfig + }{ + { + name: "test1", + workload: &v13.Deployment{ + TypeMeta: v12.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: v12.ObjectMeta{ + Name: "testDeployment", + Namespace: "default", + }, + }, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + workload := tc.workload + controlCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{}, + []runtime.Object{}, []runtime.Object{workload}) + assert.NoError(t, err) + + controller, err := newTestController(context.TODO(), controlCtx, tc.conf) + assert.NoError(t, err) + controller.provider = prom.NewFakeProvider(nil) + controller.predictor = predictor.NewFakePredictor(nil) + + controller.addDeleteHandler(controlCtx.DynamicResourcesManager.GetDynamicInformers()) + + controlCtx.StartInformer(context.TODO()) + + synced := cache.WaitForCacheSync(context.TODO().Done(), controller.syncedFunc...) + assert.True(t, synced) + + err = controlCtx.Client.DynamicClient.Resource(deploymentGVR).Namespace(workload.Namespace).Delete(context.TODO(), workload.Name, v12.DeleteOptions{}) + assert.NoError(t, err) + + _, err = controlCtx.Client.DynamicClient.Resource(deploymentGVR).Namespace(workload.Namespace).Get(context.TODO(), workload.Name, v12.GetOptions{}) + assert.Error(t, err) + + time.Sleep(time.Second) + + cacheKey := workloadUsageCacheName(workload.Namespace, workload.Kind, workload.Name) + controller.RLock() + _, ok := controller.workloadUsageCache[cacheKey] + controller.RUnlock() + assert.False(t, ok) + }) + } +} + +func newTestController( + ctx context.Context, + controlCtx *katalyst_base.GenericContext, + overcommitConf *controller.OvercommitConfig, +) (*Prediction, error) { + predictionController := &Prediction{ + ctx: ctx, + metricsEmitter: controlCtx.EmitterPool.GetDefaultMetricsEmitter(), + conf: overcommitConf, + workloadLister: map[schema.GroupVersionResource]cache.GenericLister{}, + workloadUsageCache: map[string]map[string]*common.TimeSeries{}, + } + + workloadInformers := controlCtx.DynamicResourcesManager.GetDynamicInformers() + for _, wf := range workloadInformers { + predictionController.workloadLister[wf.GVR] = wf.Informer.Lister() + predictionController.syncedFunc = append(predictionController.syncedFunc, wf.Informer.Informer().HasSynced) + } + + predictionController.spdLister = controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors().Lister() + predictionController.syncedFunc = append(predictionController.syncedFunc, controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors().Informer().HasSynced) + predictionController.nodeLister = controlCtx.KubeInformerFactory.Core().V1().Nodes().Lister() + predictionController.syncedFunc = append(predictionController.syncedFunc, controlCtx.KubeInformerFactory.Core().V1().Nodes().Informer().HasSynced) + predictionController.podIndexer = controlCtx.KubeInformerFactory.Core().V1().Pods().Informer().GetIndexer() + predictionController.podIndexer.AddIndexers(cache.Indexers{ + nodePodIndex: nodePodIndexFunc, + }) + predictionController.nodeUpdater = control.NewRealNodeUpdater(controlCtx.Client.KubeClient) + + return predictionController, nil +} + +func generateRandTimeSeries() *common.TimeSeries { + rand.Seed(time.Now().UnixNano()) + now := time.Now() + day := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) + nextDay := day.Add(24 * time.Hour) + + res := common.EmptyTimeSeries() + for day.Before(nextDay) { + res.Samples = append(res.Samples, common.Sample{ + Value: rand.Float64() * 4, + Timestamp: day.Unix(), + }) + + day = day.Add(1 * time.Minute) + } + + return res +} + +func generatePredictTimeSeries() *common.TimeSeries { + now := time.Now() + startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Add(24 * time.Hour) + + res := common.EmptyTimeSeries() + for i := 0; i < 24; i++ { + res.Samples = append(res.Samples, common.Sample{ + Value: rand.Float64() * 4, + Timestamp: startTime.Unix(), + }) + + startTime = startTime.Add(1 * time.Hour) + } + + return res +} diff --git a/pkg/scheduler/plugins/nodeovercommitment/cache/cache.go b/pkg/scheduler/plugins/nodeovercommitment/cache/cache.go index 1a258a3539..8d82fd872b 100644 --- a/pkg/scheduler/plugins/nodeovercommitment/cache/cache.go +++ b/pkg/scheduler/plugins/nodeovercommitment/cache/cache.go @@ -21,12 +21,14 @@ import ( "sync" v1 "k8s.io/api/core/v1" + clientCache "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + v1alpha12 "github.com/kubewharf/katalyst-api/pkg/apis/overcommit/v1alpha1" "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/util/native" ) @@ -44,6 +46,7 @@ func init() { type overcommitCache struct { sync.RWMutex nodeCaches map[string]*NodeCache + nocIndexer clientCache.Indexer } func GetCache() *overcommitCache { @@ -62,6 +65,29 @@ func (c *overcommitCache) GetNode(name string) (*NodeCache, error) { return node, nil } +func (c *overcommitCache) GetNocByIndexer(labelVal string) (*v1alpha12.NodeOvercommitConfig, error) { + nocList, err := c.nocIndexer.ByIndex(NOCIndexerKey, labelVal) + if err != nil { + return nil, err + } + + if len(nocList) == 0 { + return nil, nil + } + + if len(nocList) > 1 { + return nil, fmt.Errorf("matched more than 1 noc with label value %s", labelVal) + } + + nocInterface := nocList[0] + noc, ok := nocInterface.(*v1alpha12.NodeOvercommitConfig) + if !ok { + return nil, fmt.Errorf("transfer interface to noc fail") + } + + return noc, nil +} + func (c *overcommitCache) AddPod(pod *v1.Pod) error { key, err := framework.GetPodKey(pod) if err != nil { diff --git a/pkg/scheduler/plugins/nodeovercommitment/cache/handler.go b/pkg/scheduler/plugins/nodeovercommitment/cache/handler.go index 010e5f508e..056bd241da 100644 --- a/pkg/scheduler/plugins/nodeovercommitment/cache/handler.go +++ b/pkg/scheduler/plugins/nodeovercommitment/cache/handler.go @@ -26,6 +26,7 @@ import ( "k8s.io/klog/v2" "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + v1alpha12 "github.com/kubewharf/katalyst-api/pkg/apis/overcommit/v1alpha1" "github.com/kubewharf/katalyst-api/pkg/client/informers/externalversions" schedulercache "github.com/kubewharf/katalyst-core/pkg/scheduler/cache" "github.com/kubewharf/katalyst-core/pkg/scheduler/eventhandlers" @@ -35,6 +36,10 @@ import ( const ( OvercommitPodHandler = "OvercommitPodHandler" OvercommitCNRHandler = "OvercommitCNRHandler" + + OvercommitNOCHandler = "OvercommitNOCHandler" + + NOCIndexerKey = "overcommitIndexerKey" ) // RegisterPodHandler register handler to scheduler event handlers @@ -83,6 +88,33 @@ func RegisterCNRHandler() { }) } +func RegisterNOCHandler() { + eventhandlers.RegisterEventHandler(OvercommitNOCHandler, func(_ informers.SharedInformerFactory, internalInformerFactory externalversions.SharedInformerFactory) { + nocInformer := internalInformerFactory.Overcommit().V1alpha1().NodeOvercommitConfigs() + + err := nocInformer.Informer().AddIndexers(clientgocache.Indexers{ + NOCIndexerKey: func(obj interface{}) ([]string, error) { + noc, ok := obj.(*v1alpha12.NodeOvercommitConfig) + if !ok { + klog.Warningf("transfer obj to noc fail") + return []string{}, nil + } + + if noc.Spec.NodeOvercommitSelectorVal == "" { + return []string{}, nil + } + + return []string{noc.Spec.NodeOvercommitSelectorVal}, nil + }, + }) + if err != nil { + klog.Fatalf("RegisterNOCHandler fail: %v", err) + } + + cache.nocIndexer = nocInformer.Informer().GetIndexer() + }) +} + func addPod(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { diff --git a/pkg/scheduler/plugins/nodeovercommitment/fit.go b/pkg/scheduler/plugins/nodeovercommitment/fit.go index 1d1cc073ca..b04fabf3f4 100644 --- a/pkg/scheduler/plugins/nodeovercommitment/fit.go +++ b/pkg/scheduler/plugins/nodeovercommitment/fit.go @@ -137,14 +137,19 @@ func (n *NodeOvercommitment) nodeOvercommitRatio(nodeInfo *framework.NodeInfo) ( return } - annotation := nodeInfo.Node().GetAnnotations() - CPUOvercommitRatio, err = overcommitutil.OvercommitRatioValidate(annotation, consts.NodeAnnotationCPUOvercommitRatioKey, consts.NodeAnnotationRealtimeCPUOvercommitRatioKey) + var ( + annotation = nodeInfo.Node().GetAnnotations() + enableDynamicOvercommit = n.enableDynamicOvercommit(nodeInfo.Node()) + ) + klog.V(6).Infof("node %v enable dynamic overcommit: %v", nodeInfo.Node().Name, enableDynamicOvercommit) + + CPUOvercommitRatio, err = overcommitutil.OvercommitRatioValidate(annotation, consts.NodeAnnotationCPUOvercommitRatioKey, consts.NodeAnnotationPredictCPUOvercommitRatioKey, consts.NodeAnnotationRealtimeCPUOvercommitRatioKey, enableDynamicOvercommit) if err != nil { klog.Error(err) return } - memoryOvercommitRatio, err = overcommitutil.OvercommitRatioValidate(annotation, consts.NodeAnnotationMemoryOvercommitRatioKey, consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey) + memoryOvercommitRatio, err = overcommitutil.OvercommitRatioValidate(annotation, consts.NodeAnnotationMemoryOvercommitRatioKey, consts.NodeAnnotationPredictMemoryOvercommitRatioKey, consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey, enableDynamicOvercommit) if err != nil { klog.Error(err) return @@ -153,6 +158,27 @@ func (n *NodeOvercommitment) nodeOvercommitRatio(nodeInfo *framework.NodeInfo) ( return } +func (n *NodeOvercommitment) enableDynamicOvercommit(node *v1.Node) bool { + if node == nil || node.Labels == nil { + return false + } + + overcommitPool, ok := node.Labels[consts.NodeOvercommitSelectorKey] + if !ok { + return false + } + + // get pool matched config + noc, err := cache.GetCache().GetNocByIndexer(overcommitPool) + if err != nil { + klog.Errorf("get node overcommit fail, node: %v, labelValue: %v, err: %v", node.Name, overcommitPool, err) + return false + } + klog.V(6).Infof("node %v matched noc %v with label value %v, enableDynamicOvercommit: %v", node.Name, noc.Name, overcommitPool, noc.Spec.EnableDynamicOvercommit) + + return noc.Spec.EnableDynamicOvercommit +} + func computePodResourceRequest(pod *v1.Pod) *preFilterState { result := &preFilterState{} diff --git a/pkg/scheduler/plugins/nodeovercommitment/plugin.go b/pkg/scheduler/plugins/nodeovercommitment/plugin.go index ab5cec5890..2c6100064d 100644 --- a/pkg/scheduler/plugins/nodeovercommitment/plugin.go +++ b/pkg/scheduler/plugins/nodeovercommitment/plugin.go @@ -50,5 +50,6 @@ func New(args runtime.Object, h framework.Handle) (framework.Plugin, error) { cache.RegisterPodHandler() cache.RegisterCNRHandler() + cache.RegisterNOCHandler() return &NodeOvercommitment{}, nil } diff --git a/pkg/util/overcommit/overcommit.go b/pkg/util/overcommit/overcommit.go index 08cc373411..06c668fe1d 100644 --- a/pkg/util/overcommit/overcommit.go +++ b/pkg/util/overcommit/overcommit.go @@ -17,6 +17,7 @@ limitations under the License. package overcommit import ( + "fmt" "strconv" "k8s.io/klog/v2" @@ -24,7 +25,8 @@ import ( func OvercommitRatioValidate( nodeAnnotation map[string]string, - setOvercommitKey, realtimeOvercommitKey string, + setOvercommitKey, predictOvercommitKey, realtimeOvercommitKey string, + enableDynamicOvercommit bool, ) (float64, error) { // overcommit is not allowed if overcommitRatio is not set by user setOvercommitVal, ok := nodeAnnotation[setOvercommitKey] @@ -37,6 +39,21 @@ func OvercommitRatioValidate( return 1.0, err } + if !enableDynamicOvercommit { + return overcommitRatio, nil + } + + predictOvercommitVal, ok := nodeAnnotation[predictOvercommitKey] + if ok { + predictOvercommitRatio, err := strconv.ParseFloat(predictOvercommitVal, 64) + if err != nil { + klog.Errorf("predict overcommit %s validate fail: %v", predictOvercommitVal, err) + } + if predictOvercommitRatio < overcommitRatio { + overcommitRatio = predictOvercommitRatio + } + } + realtimeOvercommitVal, ok := nodeAnnotation[realtimeOvercommitKey] if ok { realtimeOvercommitRatio, err := strconv.ParseFloat(realtimeOvercommitVal, 64) @@ -49,7 +66,8 @@ func OvercommitRatioValidate( } if overcommitRatio < 1.0 { - klog.Warningf("overcommitRatio should be greater than 1") + err = fmt.Errorf("overcommitRatio should be greater than 1") + klog.Error(err) return 1.0, nil } diff --git a/pkg/webhook/mutating/node/allocatable_mutator.go b/pkg/webhook/mutating/node/allocatable_mutator.go index d81da40e2a..387396a7b4 100644 --- a/pkg/webhook/mutating/node/allocatable_mutator.go +++ b/pkg/webhook/mutating/node/allocatable_mutator.go @@ -181,7 +181,9 @@ func cpuOvercommitRatioValidate(nodeAnnotation map[string]string) (float64, erro return overcommitutil.OvercommitRatioValidate( nodeAnnotation, consts.NodeAnnotationCPUOvercommitRatioKey, + consts.NodeAnnotationPredictCPUOvercommitRatioKey, consts.NodeAnnotationRealtimeCPUOvercommitRatioKey, + true, ) } @@ -189,6 +191,8 @@ func memOvercommitRatioValidate(nodeAnnotation map[string]string) (float64, erro return overcommitutil.OvercommitRatioValidate( nodeAnnotation, consts.NodeAnnotationMemoryOvercommitRatioKey, + consts.NodeAnnotationPredictMemoryOvercommitRatioKey, consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey, + true, ) } diff --git a/pkg/webhook/mutating/node/node.go b/pkg/webhook/mutating/node/node.go index 98b14088f5..4ec7530fe5 100644 --- a/pkg/webhook/mutating/node/node.go +++ b/pkg/webhook/mutating/node/node.go @@ -93,7 +93,7 @@ func (wn *WebhookNode) Run() bool { } func (wn *WebhookNode) Mutate(ctx context.Context, obj metav1.Object) (bool, error) { - klog.V(5).Info("webhookNode notice an obj to be mutated") + klog.V(6).Info("webhookNode notice an obj to be mutated") node, ok := obj.(*core.Node) if !ok { @@ -125,7 +125,7 @@ func (wn *WebhookNode) Mutate(ctx context.Context, obj metav1.Object) (bool, err } wn.emitMetrics(true, start, mutator) } - klog.Infof("node %s was mutated", node.Name) + klog.V(6).Infof("node %s was mutated", node.Name) return true, nil }