Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/katalyst-agent/app/options/qrm/cpu_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type CPUDynamicPolicyOptions struct {
CPUNUMAHintPreferLowThreshold float64
SharedCoresNUMABindingResultAnnotationKey string
EnableReserveCPUReversely bool
EnableCPUBurst bool
*irqtuner.IRQTunerOptions
*hintoptimizer.HintOptimizerOptions
}
Expand All @@ -69,6 +70,7 @@ func NewCPUOptions() *CPUOptions {
EnableCPUPressureEviction: false,
EnableSyncingCPUIdle: false,
EnableCPUIdle: false,
EnableCPUBurst: false,
LoadPressureEvictionSkipPools: []string{
commonstate.PoolNameReclaim,
commonstate.PoolNameDedicated,
Expand Down Expand Up @@ -122,6 +124,9 @@ func (o *CPUOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs.BoolVar(&o.EnableReserveCPUReversely, "enable-reserve-cpu-reversely",
o.EnableReserveCPUReversely, "by default, the reservation of cpu starts from the cpu with lower id,"+
"if set to true, it starts from the cpu with higher id")
fs.BoolVar(&o.EnableCPUBurst, "enable-cpu-burst", o.EnableCPUBurst, "This is a flag that enables the cpu burst handler to sync periodically."+
"However, actually setting cpu burst on a pod must be done through 2 enabling methods, via annotations and via kcc. Shared_cores only "+
"supports enabling via annotations, while dedicated_cores supports enabling via annotations and kcc.")
o.HintOptimizerOptions.AddFlags(fss)
o.IRQTunerOptions.AddFlags(fss)
}
Expand All @@ -141,6 +146,7 @@ func (o *CPUOptions) ApplyTo(conf *qrmconfig.CPUQRMPluginConfig) error {
conf.CPUAllocationOption = o.CPUAllocationOption
conf.SharedCoresNUMABindingResultAnnotationKey = o.SharedCoresNUMABindingResultAnnotationKey
conf.EnableReserveCPUReversely = o.EnableReserveCPUReversely
conf.EnableCPUBurst = o.EnableCPUBurst
if err := o.HintOptimizerOptions.ApplyTo(conf.HintOptimizerConfiguration); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/google/uuid v1.3.0
github.com/h2non/gock v1.2.0
github.com/klauspost/cpuid/v2 v2.2.6
github.com/kubewharf/katalyst-api v0.5.9-0.20251226033202-b56ae9d0382b
github.com/kubewharf/katalyst-api v0.5.9-0.20260105114159-f6b016ef885e
github.com/moby/sys/mountinfo v0.6.2
github.com/montanaflynn/stats v0.7.1
github.com/opencontainers/runc v1.1.6
Expand Down Expand Up @@ -67,7 +67,7 @@ require (
k8s.io/kubelet v0.24.6
k8s.io/kubernetes v1.24.16
k8s.io/metrics v0.25.0
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
sigs.k8s.io/controller-runtime v0.11.2
sigs.k8s.io/custom-metrics-apiserver v1.24.0
sigs.k8s.io/descheduler v0.24.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.5.9-0.20251226033202-b56ae9d0382b h1:SbneAGWJCpT2hCmhZ9StBKX3XyTcX1lU/Hf7qrJqELU=
github.com/kubewharf/katalyst-api v0.5.9-0.20251226033202-b56ae9d0382b/go.mod h1:BZMVGVl3EP0eCn5xsDgV41/gjYkoh43abIYxrB10e3k=
github.com/kubewharf/katalyst-api v0.5.9-0.20260105114159-f6b016ef885e h1:jYLnowuJ0msddtg4yw1p7klNvSgdK/vEBckvtYYClC4=
github.com/kubewharf/katalyst-api v0.5.9-0.20260105114159-f6b016ef885e/go.mod h1:BZMVGVl3EP0eCn5xsDgV41/gjYkoh43abIYxrB10e3k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9 h1:jOTYZt7h/J7I8xQMKMUcJjKf5UFBv37jHWvNp5VRFGc=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down Expand Up @@ -1652,8 +1652,8 @@ k8s.io/system-validators v1.7.0/go.mod h1:gP1Ky+R9wtrSiFbrpEPwWMeYz9yqyy1S/KOh0V
k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 h1:GfD9OzL11kvZN5iArC6oTS7RTj7oJOIfnislxYlqTj8=
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k=
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/qrm-plugins/cpu/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
SyncCPUIdle = CPUPluginDynamicPolicyName + "_sync_cpu_idle"
IRQTuning = CPUPluginDynamicPolicyName + "_irq_tuning"
CommunicateWithAdvisor = CPUPluginDynamicPolicyName + "_communicate_with_advisor"
SyncCPUBurst = CPUPluginDynamicPolicyName + "_sync_cpu_burst"
)

const (
Expand Down
166 changes: 166 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuburst/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cpuburst

import (
"context"
"fmt"
"sync"

v1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/util"
"github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
"github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

type Manager interface {
UpdateCPUBurst(qosConf *generic.QoSConfiguration, dynamicConfig *dynamic.DynamicAgentConfiguration) error
}

type managerImpl struct {
metaServer *metaserver.MetaServer
}

var (
instance *managerImpl
once sync.Once
)

// GetManager returns a single global instance of the cpu burst manager
func GetManager(metaServer *metaserver.MetaServer) Manager {
once.Do(func() {
instance = newManager(metaServer)
})
return instance
}

func newManager(metaServer *metaserver.MetaServer) *managerImpl {
return &managerImpl{
metaServer: metaServer,
}
}

// UpdateCPUBurst calculates the value of cpu burst and sets it to the cgroup.
func (m *managerImpl) UpdateCPUBurst(qosConf *generic.QoSConfiguration, dynamicConfig *dynamic.DynamicAgentConfiguration) error {
if m.metaServer == nil {
return fmt.Errorf("nil metaServer")
}

ctx := context.Background()
podList, err := m.metaServer.GetPodList(ctx, native.PodIsActive)
if err != nil {
return fmt.Errorf("error getting pod list: %v", err)
}

var errList []error

for _, pod := range podList {
cpuBurstPolicy, err := util.GetPodCPUBurstPolicy(qosConf, pod, dynamicConfig)
if err != nil {
errList = append(errList, fmt.Errorf("error getting cpu burst policy for pod %s: %v", pod.Name, err))
continue
}

switch cpuBurstPolicy {
case consts.PodAnnotationCPUEnhancementCPUBurstPolicyClosed:
// For closed policy, we just set the cpu burst value to be 0.
if err = m.updateCPUBurstByPercent(0, pod); err != nil {
errList = append(errList, fmt.Errorf("error setting cpu burst for policy %s for pod %s: %v",
consts.PodAnnotationCPUEnhancementCPUBurstPolicyClosed, pod.Name, err))
}
case consts.PodAnnotationCPUEnhancementCPUBurstPolicyDefault:
continue
case consts.PodAnnotationCPUEnhancementCPUBurstPolicyStatic:
// For static policy, calculate the cpu burst value that we need to set.
cpuBurstPercent, err := util.GetPodCPUBurstPercent(qosConf, pod, dynamicConfig)
if err != nil {
errList = append(errList, fmt.Errorf("error getting cpu burst percent for pod %s: %v", pod.Name, err))
continue
}

if err = m.updateCPUBurstByPercent(cpuBurstPercent, pod); err != nil {
errList = append(errList, fmt.Errorf("error setting cpu burst for policy %s for pod %s: %v",
consts.PodAnnotationCPUEnhancementCPUBurstPolicyStatic, pod.Name, err))
}
case consts.PodAnnotationCPUEnhancementCPUBurstPolicyDynamic:
errList = append(errList, fmt.Errorf("dynamic cpu burst policy is not supported yet"))
default:
errList = append(errList, fmt.Errorf("cpu burst policy %s is not supported", cpuBurstPolicy))
}
}

return utilerrors.NewAggregate(errList)
}

// updateCPUBurstByPercent updates the value of cpu burst for static policy by taking the
// cpu quota from cgroup and calculating the cpu burst value by taking cpu quota * percent / 100.
func (m *managerImpl) updateCPUBurstByPercent(percent float64, pod *v1.Pod) error {
var errList []error
podUID := string(pod.GetUID())
podName := pod.Name

for _, container := range pod.Spec.Containers {
containerName := container.Name
containerID, err := m.metaServer.GetContainerID(podUID, containerName)
if err != nil {
general.Errorf("get container id failed, pod: %s, podName: %s, container: %s(%s), err: %v", podUID, podName, containerName, containerID, err)
continue
}

if exist, err := common.IsContainerCgroupExist(podUID, containerID); err != nil {
general.Errorf("check if container cgroup exists failed, pod: %s, podName: %s, container: %s(%s), err: %v",
podUID, podName, containerName, containerID, err)
continue
} else if !exist {
general.Infof("container cgroup does not exist, pod: %s, podName: %s, container: %s(%s)", podUID, podName, containerName, containerID)
continue
}

containerAbsoluteCgroupPath, err := common.GetContainerAbsCgroupPath(common.CgroupSubsysCPU, podUID, containerID)
if err != nil {
general.Errorf("get container absolute cgroup path failed, pod: %s, podName: %s, container: %s(%s), err: %v", podUID, podName, containerName, containerID, err)
errList = append(errList, err)
continue
}

cpuStats, err := manager.GetCPUWithAbsolutePath(containerAbsoluteCgroupPath)
if err != nil {
general.Errorf("get container cpu stats failed, pod: %s, podName: %s, container: %s(%s), err: %v", podUID, podName, containerName, containerID, err)
errList = append(errList, err)
continue
}

cpuBurstValue := util.CalculateCPUBurstFromPercent(percent, cpuStats.CpuQuota)
if err = manager.ApplyCPUWithAbsolutePath(containerAbsoluteCgroupPath, &common.CPUData{CpuBurst: cpuBurstValue}); err != nil {
general.Errorf("apply container cpu burst failed, pod: %s, podName: %s, container: %s(%s), err: %v", podUID, podName, containerName, containerID, err)
errList = append(errList, err)
continue
}

general.Infof("apply container cpu burst successfully, pod: %s, podName: %s, container: %s(%s), cpu burst: %d", podUID, podName, containerName, containerID, cpuBurstValue)
}

return utilerrors.NewAggregate(errList)
}
Loading