diff --git a/cmd/katalyst-agent/app/options/dynamic/adminqos/eviction/reclaimed_resources_eviction.go b/cmd/katalyst-agent/app/options/dynamic/adminqos/eviction/reclaimed_resources_eviction.go index e969c9374d..bcc0207ea3 100644 --- a/cmd/katalyst-agent/app/options/dynamic/adminqos/eviction/reclaimed_resources_eviction.go +++ b/cmd/katalyst-agent/app/options/dynamic/adminqos/eviction/reclaimed_resources_eviction.go @@ -26,6 +26,7 @@ import ( type ReclaimedResourcesEvictionOptions struct { EvictionThreshold native.ResourceThreshold + SoftEvictionThreshold native.ResourceThreshold GracePeriod int64 ThresholdMetToleranceDuration int64 } @@ -36,6 +37,10 @@ func NewReclaimedResourcesEvictionOptions() *ReclaimedResourcesEvictionOptions { consts.ReclaimedResourceMilliCPU: 5.0, consts.ReclaimedResourceMemory: 5.0, }, + SoftEvictionThreshold: native.ResourceThreshold{ + consts.ReclaimedResourceMilliCPU: 1.5, + consts.ReclaimedResourceMemory: 1.2, + }, GracePeriod: 60, ThresholdMetToleranceDuration: 0, } @@ -54,6 +59,7 @@ func (o *ReclaimedResourcesEvictionOptions) AddFlags(fss *cliflag.NamedFlagSets) func (o *ReclaimedResourcesEvictionOptions) ApplyTo(c *eviction.ReclaimedResourcesEvictionConfiguration) error { c.EvictionThreshold = o.EvictionThreshold + c.SoftEvictionThreshold = o.SoftEvictionThreshold c.DeletionGracePeriod = o.GracePeriod c.ThresholdMetToleranceDuration = o.ThresholdMetToleranceDuration return nil diff --git a/cmd/katalyst-agent/app/options/eviction/eviction_base.go b/cmd/katalyst-agent/app/options/eviction/eviction_base.go index 034d2737b5..bc0672c791 100644 --- a/cmd/katalyst-agent/app/options/eviction/eviction_base.go +++ b/cmd/katalyst-agent/app/options/eviction/eviction_base.go @@ -58,6 +58,9 @@ type GenericEvictionOptions struct { // RecordManager specifies the eviction record manager to use RecordManager string + + // HostPathNotifierPathRoot is the root path for host-path notifier + HostPathNotifierRootPath string } // NewGenericEvictionOptions creates a new Options with a default config. @@ -69,6 +72,7 @@ func NewGenericEvictionOptions() *GenericEvictionOptions { EvictionSkippedAnnotationKeys: []string{}, EvictionSkippedLabelKeys: []string{}, EvictionBurst: 3, + HostPathNotifierRootPath: "/opt/katalyst", PodKiller: consts.KillerNameEvictionKiller, StrictAuthentication: false, } @@ -111,6 +115,9 @@ func (o *GenericEvictionOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs.StringVar(&o.RecordManager, "eviction-record-manager", o.RecordManager, "the eviction record manager to use") + + fs.StringVar(&o.HostPathNotifierRootPath, "pod-notifier-root-path", o.HostPathNotifierRootPath, + "root path of host-path notifier") } // ApplyTo fills up config with options @@ -126,6 +133,7 @@ func (o *GenericEvictionOptions) ApplyTo(c *evictionconfig.GenericEvictionConfig c.StrictAuthentication = o.StrictAuthentication c.PodMetricLabels.Insert(o.PodMetricLabels...) c.RecordManager = o.RecordManager + c.HostPathNotifierRootPath = o.HostPathNotifierRootPath return nil } diff --git a/go.mod b/go.mod index 56953cf18e..a98e568407 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/google/uuid v1.3.0 github.com/h2non/gock v1.2.0 github.com/klauspost/cpuid/v2 v2.2.6 - github.com/kubewharf/katalyst-api v0.5.8-0.20251212030746-894fa2521a86 + github.com/kubewharf/katalyst-api v0.5.9-0.20251226033202-b56ae9d0382b github.com/moby/sys/mountinfo v0.6.2 github.com/montanaflynn/stats v0.7.1 github.com/opencontainers/runc v1.1.6 diff --git a/go.sum b/go.sum index 816e098731..5f24bf65b7 100644 --- a/go.sum +++ b/go.sum @@ -574,8 +574,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubewharf/katalyst-api v0.5.8-0.20251212030746-894fa2521a86 h1:GCqe9PcoTQ7akNDyAmavhnSrPV7sMAoYJ5jKEaJg4Ac= -github.com/kubewharf/katalyst-api v0.5.8-0.20251212030746-894fa2521a86/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k= +github.com/kubewharf/katalyst-api v0.5.9-0.20251226033202-b56ae9d0382b h1:SbneAGWJCpT2hCmhZ9StBKX3XyTcX1lU/Hf7qrJqELU= +github.com/kubewharf/katalyst-api v0.5.9-0.20251226033202-b56ae9d0382b/go.mod h1:BZMVGVl3EP0eCn5xsDgV41/gjYkoh43abIYxrB10e3k= github.com/kubewharf/kubelet v1.24.6-kubewharf.9 h1:jOTYZt7h/J7I8xQMKMUcJjKf5UFBv37jHWvNp5VRFGc= github.com/kubewharf/kubelet v1.24.6-kubewharf.9/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c= github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8= diff --git a/pkg/agent/evictionmanager/eviction_resp_collector.go b/pkg/agent/evictionmanager/eviction_resp_collector.go index 1738ff30ee..52402f57d2 100644 --- a/pkg/agent/evictionmanager/eviction_resp_collector.go +++ b/pkg/agent/evictionmanager/eviction_resp_collector.go @@ -169,6 +169,42 @@ func (e *evictionRespCollector) collectMetThreshold(dryRunPlugins []string, plug } } +func (e *evictionRespCollector) collectTopSoftEvictionPods(dryRunPlugins []string, pluginName string, + threshold *pluginapi.ThresholdMetResponse, resp *pluginapi.GetTopEvictionPodsResponse, +) { + dryRun := e.isDryRun(dryRunPlugins, pluginName) + + targetPods := make([]*v1.Pod, 0, len(resp.TargetPods)) + for i, pod := range resp.TargetPods { + if pod == nil { + continue + } + + general.Infof("%v plugin %v request to notify topN pod %v/%v, reason: met threshold in scope [%v]", + e.getLogPrefix(dryRun), pluginName, pod.Namespace, pod.Name, threshold.EvictionScope) + if dryRun { + metricsPodToEvict(e.emitter, e.conf.GenericConfiguration.QoSConfiguration, pluginName, pod, dryRun, e.conf.GenericEvictionConfiguration.PodMetricLabels) + } else { + targetPods = append(targetPods, resp.TargetPods[i]) + } + } + + for _, pod := range targetPods { + reason := fmt.Sprintf("plugin %s met threshold in scope %s, target %v, observed %v", + pluginName, threshold.EvictionScope, threshold.ThresholdValue, threshold.ObservedValue) + + e.getSoftEvictPods()[string(pod.UID)] = &rule.RuledEvictPod{ + EvictPod: &pluginapi.EvictPod{ + Pod: pod.DeepCopy(), + Reason: reason, + ForceEvict: false, + EvictionPluginName: pluginName, + }, + Scope: threshold.EvictionScope, + } + } +} + func (e *evictionRespCollector) collectTopEvictionPods(dryRunPlugins []string, pluginName string, threshold *pluginapi.ThresholdMetResponse, resp *pluginapi.GetTopEvictionPodsResponse, ) { diff --git a/pkg/agent/evictionmanager/manager.go b/pkg/agent/evictionmanager/manager.go index 48c8ed3eb3..8c1cef2784 100644 --- a/pkg/agent/evictionmanager/manager.go +++ b/pkg/agent/evictionmanager/manager.go @@ -37,6 +37,7 @@ import ( clocks "k8s.io/utils/clock" "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-api/pkg/plugins/registration" pluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1" endpointpkg "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/endpoint" @@ -46,6 +47,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/plugin/resource" "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/plugin/rootfs" "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/podkiller" + "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/podnotifier" "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/record" "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/rule" "github.com/kubewharf/katalyst-core/pkg/client" @@ -109,7 +111,8 @@ type EvictionManger struct { // easy to test the code. clock clocks.WithTickerAndDelayedExecution - podKiller podkiller.PodKiller + podNotifier podnotifier.PodNotifier + podKiller podkiller.PodKiller killQueue rule.EvictionQueue killStrategy rule.EvictionStrategy @@ -236,6 +239,12 @@ func NewEvictionManager(genericClient *client.GenericClientSet, recorder events. podKiller := podkiller.NewAsynchronizedPodKiller(killer, metaServer.PodFetcher, genericClient.KubeClient) + notifier, err := podnotifier.NewHostPathPodNotifier(conf, genericClient.KubeClient, metaServer, recorder, emitter) + if err != nil { + return nil, fmt.Errorf("failed to create pod notifier: %v", err) + } + podNotifier := podnotifier.NewSynchronizedPodNotifier(notifier) + cnrTaintReporter, err := control.NewGenericReporterPlugin(cnrTaintReporterPluginName, conf, emitter) if err != nil { return nil, fmt.Errorf("failed to initialize cnr taint reporter plugin: %v", err) @@ -264,6 +273,7 @@ func NewEvictionManager(genericClient *client.GenericClientSet, recorder events. metaGetter: metaServer, emitter: emitter, podKiller: podKiller, + podNotifier: podNotifier, cnrTaintReporter: cnrTaintReporter, endpoints: make(map[string]endpointpkg.Endpoint), conf: conf, @@ -317,6 +327,7 @@ func (m *EvictionManger) Run(ctx context.Context) { general.RegisterHeartbeatCheck(reportTaintHealthCheckName, reportTaintToleration, general.HealthzCheckStateNotReady, reportTaintToleration) m.podKiller.Start(ctx) + m.podNotifier.Start(ctx) for _, endpoint := range m.endpoints { endpoint.Start() } @@ -360,6 +371,11 @@ func (m *EvictionManger) sync(ctx context.Context) { } errList := make([]error, 0) + notifyErr := m.doNotify(collector.getSoftEvictPods()) + if notifyErr != nil { + errList = append(errList, notifyErr) + } + evictErr := m.doEvict(collector.getSoftEvictPods(), collector.getForceEvictPods()) if evictErr != nil { errList = append(errList, evictErr) @@ -430,8 +446,8 @@ func (m *EvictionManger) collectEvictionResult(ctx context.Context, pods []*v1.P records := m.getEvictionRecords(ctx, collector.currentCandidatePods) for pluginName, threshold := range thresholdsMet { - if threshold.MetType != pluginapi.ThresholdMetType_HARD_MET { - general.Infof(" the type: %s of met threshold from plugin: %s isn't %s", threshold.MetType.String(), pluginName, pluginapi.ThresholdMetType_HARD_MET.String()) + if threshold.MetType == pluginapi.ThresholdMetType_NOT_MET { + general.Infof("resp from plugin: %s not met threshold", pluginName) continue } @@ -453,13 +469,20 @@ func (m *EvictionManger) collectEvictionResult(ctx context.Context, pods []*v1.P } } } + + topN := uint64(0) + forceEvict := false + if threshold.MetType == pluginapi.ThresholdMetType_HARD_MET { + topN = 1 + forceEvict = true + } + resp, err := m.endpoints[pluginName].GetTopEvictionPods(context.Background(), &pluginapi.GetTopEvictionPodsRequest{ ActivePods: activePods, - TopN: 1, + TopN: topN, EvictionScope: threshold.EvictionScope, CandidateEvictionRecords: candidateEvictionRecords, }) - m.endpointLock.RUnlock() if err != nil { general.Errorf(" calling GetTopEvictionPods of plugin: %s failed with error: %v", pluginName, err) @@ -473,12 +496,38 @@ func (m *EvictionManger) collectEvictionResult(ctx context.Context, pods []*v1.P continue } - collector.collectTopEvictionPods(dynamicConfig.DryRun, pluginName, threshold, resp) + if forceEvict { + collector.collectTopEvictionPods(dynamicConfig.DryRun, pluginName, threshold, resp) + } else { + collector.collectTopSoftEvictionPods(dynamicConfig.DryRun, pluginName, threshold, resp) + } + } return collector, errors.NewAggregate(errList) } +func (m *EvictionManger) doNotify(softEvictPods map[string]*rule.RuledEvictPod) error { + errList := make([]error, 0) + + for _, pod := range softEvictPods { + if pod == nil || pod.EvictPod.Pod == nil { + continue + } + + if v, ok := pod.EvictPod.Pod.Annotations[apiconsts.PodAnnotationSoftEvictNotificationKey]; !ok || strings.ToLower(v) != "true" { + continue + } + + err := m.podNotifier.NotifyPod(pod) + if err != nil { + errList = append(errList, err) + } + } + + return errors.NewAggregate(errList) +} + func (m *EvictionManger) doEvict(softEvictPods, forceEvictPods map[string]*rule.RuledEvictPod) error { softEvictPods = filterOutCandidatePodsWithForcePods(softEvictPods, forceEvictPods) bestSuitedCandidate := m.getEvictPodFromCandidates(softEvictPods) @@ -637,6 +686,10 @@ func (m *EvictionManger) getEvictPodFromCandidates(candidateEvictPods map[string for _, rp := range candidateEvictPods { // only killing pods that pass candidate validation if rp != nil && rp.Pod != nil && m.killStrategy.CandidateValidate(rp) { + // do NOT select soft evict pod with notification-enable as candidate + if v, ok := rp.Pod.Annotations[apiconsts.PodAnnotationSoftEvictNotificationKey]; ok && strings.ToLower(v) == "true" { + continue + } rpList = append(rpList, rp) } } diff --git a/pkg/agent/evictionmanager/manager_test.go b/pkg/agent/evictionmanager/manager_test.go index c0fbe47195..1cf1f640ef 100644 --- a/pkg/agent/evictionmanager/manager_test.go +++ b/pkg/agent/evictionmanager/manager_test.go @@ -108,6 +108,7 @@ func makeConf() *config.Configuration { conf.PodKiller = consts.KillerNameEvictionKiller conf.GenericConfiguration.AuthConfiguration.AuthType = credential.AuthTypeInsecure conf.GenericConfiguration.AuthConfiguration.AccessControlType = authorization.AccessControlTypeInsecure + conf.HostPathNotifierRootPath = "/opt/katalyst" return conf } @@ -232,12 +233,46 @@ func (p plugin2) GetEvictPods(_ context.Context, _ *pluginapi.GetEvictPodsReques return &pluginapi.GetEvictPodsResponse{EvictPods: []*pluginapi.EvictPod{}}, nil } +type plugin3 struct { + pluginSkeleton +} + +func (p plugin3) ThresholdMet(_ context.Context, _ *pluginapi.GetThresholdMetRequest) (*pluginapi.ThresholdMetResponse, error) { + return &pluginapi.ThresholdMetResponse{ + MetType: pluginapi.ThresholdMetType_SOFT_MET, + ThresholdValue: 0.8, + ObservedValue: 0.9, + ThresholdOperator: pluginapi.ThresholdOperator_GREATER_THAN, + EvictionScope: "plugin3_scope", + GracePeriodSeconds: -1, + }, nil +} + +func (p plugin3) GetTopEvictionPods(_ context.Context, _ *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) { + return &pluginapi.GetTopEvictionPodsResponse{TargetPods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + UID: "pod-3", + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + }, + }}, nil +} + +func (p plugin3) GetEvictPods(_ context.Context, _ *pluginapi.GetEvictPodsRequest) (*pluginapi.GetEvictPodsResponse, error) { + return &pluginapi.GetEvictPodsResponse{EvictPods: []*pluginapi.EvictPod{}}, nil +} + func makeEvictionManager(t *testing.T) *EvictionManger { mgr, err := NewEvictionManager(&client.GenericClientSet{}, nil, makeMetaServer(), metrics.DummyMetrics{}, makeConf()) assert.NoError(t, err) mgr.endpoints = map[string]endpointpkg.Endpoint{ "plugin1": &plugin1{}, "plugin2": &plugin2{}, + "plugin3": &plugin3{}, } return mgr @@ -258,6 +293,7 @@ func TestEvictionManger_collectEvictionResult(t *testing.T) { dryrun: []string{}, wantSoftEvictPods: sets.String{ "pod-1": sets.Empty{}, + "pod-3": sets.Empty{}, "pod-5": sets.Empty{}, }, wantForceEvictPods: sets.String{ @@ -269,9 +305,11 @@ func TestEvictionManger_collectEvictionResult(t *testing.T) { }, }, { - name: "dryrun plugin1", - dryrun: []string{"plugin1"}, - wantSoftEvictPods: sets.String{}, + name: "dryrun plugin1", + dryrun: []string{"plugin1"}, + wantSoftEvictPods: sets.String{ + "pod-3": sets.Empty{}, + }, wantForceEvictPods: sets.String{ "pod-3": sets.Empty{}, }, @@ -284,6 +322,7 @@ func TestEvictionManger_collectEvictionResult(t *testing.T) { dryrun: []string{"plugin2"}, wantSoftEvictPods: sets.String{ "pod-1": sets.Empty{}, + "pod-3": sets.Empty{}, "pod-5": sets.Empty{}, }, wantForceEvictPods: sets.String{ @@ -292,9 +331,11 @@ func TestEvictionManger_collectEvictionResult(t *testing.T) { wantConditions: sets.String{}, }, { - name: "dryrun plugin1 & plugin2", - dryrun: []string{"plugin1", "plugin2"}, - wantSoftEvictPods: sets.String{}, + name: "dryrun plugin1 & plugin2", + dryrun: []string{"plugin1", "plugin2"}, + wantSoftEvictPods: sets.String{ + "pod-3": sets.Empty{}, + }, wantForceEvictPods: sets.String{}, wantConditions: sets.String{}, }, diff --git a/pkg/agent/evictionmanager/plugin/resource/reclaimed_numa_resources.go b/pkg/agent/evictionmanager/plugin/resource/reclaimed_numa_resources.go index 075813398e..d03930263f 100644 --- a/pkg/agent/evictionmanager/plugin/resource/reclaimed_numa_resources.go +++ b/pkg/agent/evictionmanager/plugin/resource/reclaimed_numa_resources.go @@ -52,6 +52,10 @@ func NewReclaimedNumaResourcesEvictionPlugin(_ *client.GenericClientSet, _ event } } + reclaimedSoftThresholdGetter := func(resourceName v1.ResourceName) *float64 { + return nil + } + deletionGracePeriodGetter := func() int64 { return conf.GetDynamicConfiguration().ReclaimedResourcesEvictionConfiguration.DeletionGracePeriod } @@ -66,6 +70,8 @@ func NewReclaimedNumaResourcesEvictionPlugin(_ *client.GenericClientSet, _ event emitter, PodNUMARequestResourcesGetter, reclaimedThresholdGetter, + reclaimedSoftThresholdGetter, + nil, deletionGracePeriodGetter, thresholdMetToleranceDurationGetter, conf.SkipZeroQuantityResourceNames, diff --git a/pkg/agent/evictionmanager/plugin/resource/reclaimed_resources.go b/pkg/agent/evictionmanager/plugin/resource/reclaimed_resources.go index 8a2337f4a5..410b1d937a 100644 --- a/pkg/agent/evictionmanager/plugin/resource/reclaimed_resources.go +++ b/pkg/agent/evictionmanager/plugin/resource/reclaimed_resources.go @@ -62,6 +62,10 @@ func NewReclaimedResourcesEvictionPlugin(_ *client.GenericClientSet, _ events.Ev } } + reclaimedSoftThresholdGetter := func(resourceName v1.ResourceName) *float64 { + return nil + } + deletionGracePeriodGetter := func() int64 { return conf.GetDynamicConfiguration().ReclaimedResourcesEvictionConfiguration.DeletionGracePeriod } @@ -76,6 +80,8 @@ func NewReclaimedResourcesEvictionPlugin(_ *client.GenericClientSet, _ events.Ev reclaimedResourcesGetter, native.SumUpPodRequestResources, reclaimedThresholdGetter, + reclaimedSoftThresholdGetter, + nil, deletionGracePeriodGetter, thresholdMetToleranceDurationGetter, conf.SkipZeroQuantityResourceNames, diff --git a/pkg/agent/evictionmanager/plugin/resource/resources.go b/pkg/agent/evictionmanager/plugin/resource/resources.go index 5908a757dd..31498e4b88 100644 --- a/pkg/agent/evictionmanager/plugin/resource/resources.go +++ b/pkg/agent/evictionmanager/plugin/resource/resources.go @@ -19,13 +19,17 @@ package resource import ( "context" "fmt" + "math" "sort" + "strings" "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" + apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" pluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" @@ -47,6 +51,8 @@ type PodRequestResourcesGetter func(pod *v1.Pod) v1.ResourceList type ThresholdGetter func(resourceName v1.ResourceName) *float64 +type EvictScoreGetter func(pod *v1.Pod) *float64 + type GracePeriodGetter func() int64 // ResourcesEvictionPlugin implements EvictPlugin interface it trigger @@ -57,10 +63,12 @@ type ResourcesEvictionPlugin struct { // thresholdGetter is used to get the threshold of resources. thresholdGetter ThresholdGetter + softThresholdGetter ThresholdGetter resourcesGetter ResourcesGetter podRequestResourcesGetter PodRequestResourcesGetter deletionGracePeriodGetter GracePeriodGetter thresholdMetToleranceDurationGetter GracePeriodGetter + evictScoreGetter EvictScoreGetter skipZeroQuantityResourceNames sets.String podFilter func(pod *v1.Pod) (bool, error) @@ -72,6 +80,7 @@ type ResourcesEvictionPlugin struct { func NewResourcesEvictionPlugin(pluginName string, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter, resourcesGetter ResourcesGetter, podRequestResourcesGetter PodRequestResourcesGetter, thresholdGetter ThresholdGetter, + softThresholdGetter ThresholdGetter, evictScoreGetter EvictScoreGetter, deletionGracePeriodGetter GracePeriodGetter, thresholdMetToleranceDurationGetter GracePeriodGetter, skipZeroQuantityResourceNames sets.String, podFilter func(pod *v1.Pod) (bool, error), @@ -84,6 +93,8 @@ func NewResourcesEvictionPlugin(pluginName string, metaServer *metaserver.MetaSe resourcesGetter: resourcesGetter, podRequestResourcesGetter: podRequestResourcesGetter, thresholdGetter: thresholdGetter, + softThresholdGetter: softThresholdGetter, + evictScoreGetter: evictScoreGetter, deletionGracePeriodGetter: deletionGracePeriodGetter, thresholdMetToleranceDurationGetter: thresholdMetToleranceDurationGetter, skipZeroQuantityResourceNames: skipZeroQuantityResourceNames, @@ -210,6 +221,30 @@ func (b *ResourcesEvictionPlugin) ThresholdMet(ctx context.Context, _ *pluginapi GracePeriodSeconds: b.thresholdMetToleranceDurationGetter(), }, nil } + + softThresholdRate := b.softThresholdGetter(resourceName) + if softThresholdRate == nil { + continue + } + + thresholdValue = *softThresholdRate * total + klog.Infof("[%s] resources %v: total %v, used %v, softThresholdRate %v, softThresholdValue: %v", b.pluginName, + resourceName, total, used, *softThresholdRate, thresholdValue) + + exceededValue = thresholdValue - used + if exceededValue < 0 { + klog.Infof("[%s] resources %v exceeded: total %v, used %v, softThresholdRate %v, thresholdValue: %v", b.pluginName, + resourceName, total, used, *softThresholdRate, thresholdValue) + + return &pluginapi.ThresholdMetResponse{ + ThresholdValue: thresholdValue, + ObservedValue: used, + ThresholdOperator: pluginapi.ThresholdOperator_GREATER_THAN, + MetType: pluginapi.ThresholdMetType_SOFT_MET, + EvictionScope: string(resourceName), + GracePeriodSeconds: b.thresholdMetToleranceDurationGetter(), + }, nil + } } return &pluginapi.ThresholdMetResponse{ @@ -217,32 +252,174 @@ func (b *ResourcesEvictionPlugin) ThresholdMet(ctx context.Context, _ *pluginapi }, nil } -func (b *ResourcesEvictionPlugin) GetTopEvictionPods(_ context.Context, request *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) { - if request == nil { - return nil, fmt.Errorf("GetTopEvictionPods got nil request") +func (b *ResourcesEvictionPlugin) getTopHardEvictionPods(ctx context.Context, request *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) { + activeFilteredPods := native.FilterPods(request.ActivePods, b.podFilter) + if len(activeFilteredPods) == 0 { + klog.Infof("GetTopEvictionPods got empty active pods after filtering: %d", len(request.ActivePods)) + return &pluginapi.GetTopEvictionPodsResponse{}, nil } - if len(request.ActivePods) == 0 { - klog.Warningf("[%s] GetTopEvictionPods got empty active pods list", b.pluginName) + // try to get pod score if score getter exist + scores := make(map[string]float64, len(activeFilteredPods)) + if b.evictScoreGetter != nil { + for _, pod := range activeFilteredPods { + if s := b.evictScoreGetter(pod); s != nil { + scores[string(pod.UID)] = *s + } + } + } + + // sort pods + sort.Slice(activeFilteredPods, func(i, j int) bool { + // sort by score first + valueI, valueJ := math.MaxFloat64, math.MaxFloat64 + if s, ok := scores[string(activeFilteredPods[i].UID)]; ok { + valueI = s + } + if s, ok := scores[string(activeFilteredPods[j].UID)]; ok { + valueJ = s + } + if valueI != valueJ { + return valueI < valueJ + } + + // sort by request if with same score + reqI, reqJ := int64(0), int64(0) + resourceI, resourceJ := b.podRequestResourcesGetter(activeFilteredPods[i]), b.podRequestResourcesGetter(activeFilteredPods[j]) + if quantity, ok := resourceI[v1.ResourceName(request.EvictionScope)]; ok { + reqI = (&quantity).Value() + } + if quantity, ok := resourceJ[v1.ResourceName(request.EvictionScope)]; ok { + reqJ = (&quantity).Value() + } + return reqI > reqJ + }) + + retLen := general.MinUInt64(request.TopN, uint64(len(activeFilteredPods))) + if retLen == 0 { return &pluginapi.GetTopEvictionPodsResponse{}, nil } + var deletionOptions *pluginapi.DeletionOptions + if gracePeriod := b.deletionGracePeriodGetter(); gracePeriod > 0 { + deletionOptions = &pluginapi.DeletionOptions{ + GracePeriodSeconds: gracePeriod, + } + } + + return &pluginapi.GetTopEvictionPodsResponse{ + TargetPods: activeFilteredPods[:retLen], + DeletionOptions: deletionOptions, + }, nil +} + +func (b *ResourcesEvictionPlugin) getTopSoftEvictionPods(ctx context.Context, request *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) { activeFilteredPods := native.FilterPods(request.ActivePods, b.podFilter) + if len(activeFilteredPods) == 0 { + klog.Infof("GetTopEvictionPods got empty active pods after filtering: %d", len(request.ActivePods)) + return &pluginapi.GetTopEvictionPodsResponse{}, nil + } + // soft evict handle notification-enabled pod only + var notifyFilteredPods []*v1.Pod + for _, pod := range activeFilteredPods { + if v, ok := pod.Annotations[apiconsts.PodAnnotationSoftEvictNotificationKey]; ok && strings.ToLower(v) == "true" { + notifyFilteredPods = append(notifyFilteredPods, pod) + } + } + activeFilteredPods = notifyFilteredPods + if len(activeFilteredPods) == 0 { + klog.Infof("GetTopEvictionPods got empty active pods after filtering: %d", len(request.ActivePods)) + return &pluginapi.GetTopEvictionPodsResponse{}, nil + } + + // check resource type + if request.EvictionScope == "" { + klog.Infof("[%s]: not assign eviction scope", b.pluginName) + return &pluginapi.GetTopEvictionPodsResponse{}, nil + } + resourceName := v1.ResourceName(request.EvictionScope) + + // cal thresholdValue + allocatable, err := b.resourcesGetter(ctx) + if err != nil { + klog.Errorf("[%s] failed to get resources %v", b.pluginName, err) + return &pluginapi.GetTopEvictionPodsResponse{}, nil + } + totalQuantity, ok := allocatable[resourceName] + if !ok { + klog.Warningf("[%s] used resource: %s doesn't exist in allocatable", b.pluginName, resourceName) + return &pluginapi.GetTopEvictionPodsResponse{}, nil + } + + softThresholdRate := b.softThresholdGetter(resourceName) + if softThresholdRate == nil { + klog.Infof("[%s] failed to get soft threshold", b.pluginName) + return &pluginapi.GetTopEvictionPodsResponse{}, nil + } + thresholdValue := *softThresholdRate * float64((&totalQuantity).Value()) + + // get pod score if score getter exist + scores := make(map[string]float64, len(activeFilteredPods)) + if b.evictScoreGetter != nil { + for _, pod := range activeFilteredPods { + if s := b.evictScoreGetter(pod); s != nil { + scores[string(pod.UID)] = *s + } + } + } + + // sort pods sort.Slice(activeFilteredPods, func(i, j int) bool { - valueI, valueJ := int64(0), int64(0) + // sort by score first + valueI, valueJ := math.MaxFloat64, math.MaxFloat64 + if s, ok := scores[string(activeFilteredPods[i].UID)]; ok { + valueI = s + } + if s, ok := scores[string(activeFilteredPods[j].UID)]; ok { + valueJ = s + } + if valueI != valueJ { + return valueI > valueJ + } + // sort by request if with same score + reqI, reqJ := int64(0), int64(0) resourceI, resourceJ := b.podRequestResourcesGetter(activeFilteredPods[i]), b.podRequestResourcesGetter(activeFilteredPods[j]) if quantity, ok := resourceI[v1.ResourceName(request.EvictionScope)]; ok { - valueI = (&quantity).Value() + reqI = (&quantity).Value() } if quantity, ok := resourceJ[v1.ResourceName(request.EvictionScope)]; ok { - valueJ = (&quantity).Value() + reqJ = (&quantity).Value() } - return valueI > valueJ + return reqI < reqJ }) - retLen := general.MinUInt64(request.TopN, uint64(len(activeFilteredPods))) + // get pods that can keep running + usedQuantity := resource.NewQuantity(0, resource.DecimalSI) + cnt := 0 + for _, pod := range activeFilteredPods { + q := native.GetPodRequestResources(pod, resourceName) + if q == nil { + klog.Warningf("[%s] GetTopEvictionPods got empty pod resources with pod %s", b.pluginName, pod.Name) + continue + } + + usedQuantity.Add(*q) + if float64(usedQuantity.Value()) >= thresholdValue { + break + } + cnt++ + } + + // cal topN soft evict pods + general.SliceReverse(activeFilteredPods) + podCnt := general.Max(len(activeFilteredPods)-cnt, 1) + retLen := uint64(general.Min(len(activeFilteredPods), podCnt)) + klog.Infof("[%s] GetTopEvictionPods get %d soft pods", b.pluginName, retLen) + if retLen == 0 { + return &pluginapi.GetTopEvictionPodsResponse{}, nil + } var deletionOptions *pluginapi.DeletionOptions if gracePeriod := b.deletionGracePeriodGetter(); gracePeriod > 0 { @@ -257,6 +434,28 @@ func (b *ResourcesEvictionPlugin) GetTopEvictionPods(_ context.Context, request }, nil } +func (b *ResourcesEvictionPlugin) GetTopEvictionPods(ctx context.Context, request *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) { + if request == nil { + return nil, fmt.Errorf("GetTopEvictionPods got nil request") + } + + if len(request.ActivePods) == 0 { + klog.Warningf("[%s] GetTopEvictionPods got empty active pods list", b.pluginName) + return &pluginapi.GetTopEvictionPodsResponse{}, nil + } + + isHardEvict := true + if request.TopN == 0 { + isHardEvict = false + } + + if !isHardEvict { + return b.getTopSoftEvictionPods(ctx, request) + } else { + return b.getTopHardEvictionPods(ctx, request) + } +} + func (b *ResourcesEvictionPlugin) GetEvictPods(_ context.Context, request *pluginapi.GetEvictPodsRequest) (*pluginapi.GetEvictPodsResponse, error) { if request == nil { return nil, fmt.Errorf("GetEvictPods got nil request") diff --git a/pkg/agent/evictionmanager/plugin/resource/resources_test.go b/pkg/agent/evictionmanager/plugin/resource/resources_test.go new file mode 100644 index 0000000000..024ed0a76c --- /dev/null +++ b/pkg/agent/evictionmanager/plugin/resource/resources_test.go @@ -0,0 +1,188 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resource + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + "github.com/kubewharf/katalyst-api/pkg/consts" + pluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1" + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +func TestNewResourcesEvictionPlugin(t *testing.T) { + t.Parallel() + + testNodeName := "test-node" + testConf := generateTestConfiguration(t, testNodeName) + pods := []*corev1.Pod{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "test-pod-1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelReclaimedCores, + consts.PodAnnotationSoftEvictNotificationKey: "true", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container-1", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + consts.ReclaimedResourceMilliCPU: resource.MustParse("100"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "test-pod-2", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelReclaimedCores, + consts.PodAnnotationSoftEvictNotificationKey: "true", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container-2", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + consts.ReclaimedResourceMilliCPU: resource.MustParse("50"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "test-pod-2", + }, + }, + } + ctx, err := katalyst_base.GenerateFakeGenericContext(nil, []runtime.Object{ + &v1alpha1.CustomNodeResource{ + ObjectMeta: v1.ObjectMeta{ + Name: testNodeName, + }, + Status: v1alpha1.CustomNodeResourceStatus{ + Resources: v1alpha1.Resources{ + Allocatable: &corev1.ResourceList{ + consts.ReclaimedResourceMilliCPU: resource.MustParse("90"), + }, + }, + }, + }, + }, nil) + assert.NoError(t, err) + + testMetaServer := generateTestMetaServer(ctx.Client, testConf, pods) + + reclaimedResourcesGetter := func(ctx context.Context) (corev1.ResourceList, error) { + cnr, err := testMetaServer.GetCNR(ctx) + if err != nil { + return nil, err + } + + allocatable := make(corev1.ResourceList) + if cnr != nil && cnr.Status.Resources.Allocatable != nil { + allocatable = *cnr.Status.Resources.Allocatable + } + return allocatable, nil + } + + reclaimedThresholdGetter := func(resourceName corev1.ResourceName) *float64 { + if threshold, ok := testConf.GetDynamicConfiguration().EvictionThreshold[resourceName]; !ok { + return nil + } else { + return &threshold + } + } + + reclaimedSoftThresholdGetter := func(resourceName corev1.ResourceName) *float64 { + if threshold, ok := testConf.GetDynamicConfiguration().SoftEvictionThreshold[resourceName]; !ok { + return nil + } else { + return &threshold + } + } + + scoreGetter := func(pod *corev1.Pod) *float64 { + score := 100.0 + return &score + } + + deletionGracePeriodGetter := func() int64 { + return testConf.GetDynamicConfiguration().ReclaimedResourcesEvictionConfiguration.DeletionGracePeriod + } + thresholdMetToleranceDurationGetter := func() int64 { + return testConf.GetDynamicConfiguration().ThresholdMetToleranceDuration + } + + p := NewResourcesEvictionPlugin( + "test", + testMetaServer, + metrics.DummyMetrics{}, + reclaimedResourcesGetter, + native.SumUpPodRequestResources, + reclaimedThresholdGetter, + reclaimedSoftThresholdGetter, + scoreGetter, + deletionGracePeriodGetter, + thresholdMetToleranceDurationGetter, + testConf.SkipZeroQuantityResourceNames, + testConf.CheckReclaimedQoSForPod, + ) + + resp, err := p.ThresholdMet(context.Background(), nil) + assert.NoError(t, err) + + _, err = p.GetTopEvictionPods(context.Background(), &pluginapi.GetTopEvictionPodsRequest{ + ActivePods: []*corev1.Pod{}, + TopN: 0, + EvictionScope: resp.EvictionScope, + }) + assert.NoError(t, err) + + _, err = p.GetTopEvictionPods(context.Background(), &pluginapi.GetTopEvictionPodsRequest{ + ActivePods: pods, + TopN: 0, + EvictionScope: resp.EvictionScope, + }) + assert.NoError(t, err) + + _, err = p.GetTopEvictionPods(context.Background(), &pluginapi.GetTopEvictionPodsRequest{ + ActivePods: pods, + TopN: 1, + EvictionScope: resp.EvictionScope, + }) + assert.NoError(t, err) +} diff --git a/pkg/agent/evictionmanager/plugin/resource/zone_resources.go b/pkg/agent/evictionmanager/plugin/resource/zone_resources.go index 0e17453fcf..dc83622ba3 100644 --- a/pkg/agent/evictionmanager/plugin/resource/zone_resources.go +++ b/pkg/agent/evictionmanager/plugin/resource/zone_resources.go @@ -19,6 +19,7 @@ package resource import ( "context" "fmt" + "math" "sort" "strings" "sync" @@ -28,6 +29,7 @@ import ( "k8s.io/klog/v2" "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" pluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" @@ -53,8 +55,10 @@ type ZoneResourcesPlugin struct { podZoneRequestResourcesGetter PodZoneRequestResourcesGetter // thresholdGetter is used to get the threshold of resources. thresholdGetter ThresholdGetter + softThresholdGetter ThresholdGetter deletionGracePeriodGetter GracePeriodGetter thresholdMetToleranceDurationGetter GracePeriodGetter + evictScoreGetter EvictScoreGetter zoneType v1alpha1.TopologyType pluginName string @@ -70,6 +74,8 @@ func NewZoneResourcesPlugin( emitter metrics.MetricEmitter, podZoneRequestResourcesGetter PodZoneRequestResourcesGetter, thresholdGetter ThresholdGetter, + softThresholdGetter ThresholdGetter, + evictScoreGetter EvictScoreGetter, deletionGracePeriodGetter GracePeriodGetter, thresholdMetToleranceDurationGetter GracePeriodGetter, skipZeroQuantityResourceNames sets.String, @@ -84,6 +90,8 @@ func NewZoneResourcesPlugin( metaServer: metaServer, podZoneRequestResourcesGetter: podZoneRequestResourcesGetter, thresholdGetter: thresholdGetter, + softThresholdGetter: softThresholdGetter, + evictScoreGetter: evictScoreGetter, deletionGracePeriodGetter: deletionGracePeriodGetter, thresholdMetToleranceDurationGetter: thresholdMetToleranceDurationGetter, zoneType: zoneType, @@ -244,6 +252,31 @@ func (p *ZoneResourcesPlugin) ThresholdMet(ctx context.Context, _ *pluginapi.Get GracePeriodSeconds: p.thresholdMetToleranceDurationGetter(), }, nil } + + softThresholdRate := p.softThresholdGetter(resourceName) + if softThresholdRate == nil { + klog.Warningf("[%s] skip %s resource soft eviction because threshold is empty", p.pluginName, resourceName) + continue + } + + thresholdValue = *softThresholdRate * total + klog.V(4).Infof("[%s] zone %s resource %v: total %v, used %v, softThresholdRate %v, softThresholdValue %v", p.pluginName, zoneID, + resourceName, total, used, *softThresholdRate, thresholdValue) + + exceededValue = thresholdValue - used + if exceededValue < 0 { + klog.Infof("[%s] zone %s resource %v exceeded: total %v, used %v, softThresholdRate %v, softThresholdValue %v", p.pluginName, zoneID, + resourceName, total, used, *softThresholdRate, thresholdValue) + + return &pluginapi.ThresholdMetResponse{ + ThresholdValue: thresholdValue, + ObservedValue: used, + ThresholdOperator: pluginapi.ThresholdOperator_GREATER_THAN, + MetType: pluginapi.ThresholdMetType_SOFT_MET, + EvictionScope: fmt.Sprintf("zone%s%s%s", zoneID, evictionScopeSplitter, string(resourceName)), + GracePeriodSeconds: p.thresholdMetToleranceDurationGetter(), + }, nil + } } } @@ -277,6 +310,13 @@ func (p *ZoneResourcesPlugin) GetTopEvictionPods(ctx context.Context, request *p return nil, fmt.Errorf("failed to get cnr from metaServer: %v", err) } + allocatable, err := p.getZoneAllocatable(cnr) + if err != nil { + errMsg := fmt.Sprintf("failed to get allocatable: %v", err) + klog.Errorf("[%s] %s", p.pluginName, errMsg) + return nil, fmt.Errorf(errMsg) + } + zoneAllocations, err := p.getZoneAllocation(cnr) if err != nil { errMsg := fmt.Sprintf("failed to get resources: %v", err) @@ -341,11 +381,31 @@ func (p *ZoneResourcesPlugin) GetTopEvictionPods(ctx context.Context, request *p }(pod) } wg.Wait() - sort.Slice(candidateEvictionPods, func(i, j int) bool { - return values[string(candidateEvictionPods[i].UID)] > values[string(candidateEvictionPods[j].UID)] - }) - retLen := general.MinUInt64(request.TopN, uint64(len(candidateEvictionPods))) + isHardEvict := true + if request.TopN == 0 { + isHardEvict = false + } + + var targetPods []*v1.Pod + if isHardEvict { + targetPods = p.getTopHardEvictionPods(candidateEvictionPods, values, request.TopN) + } else { + // get soft threshold + totalQuantity, ok := allocatable[evictionZoneID][v1.ResourceName(resourceName)] + if !ok { + klog.Warningf("used resource: %s doesn't exist in allocatable", resourceName) + return &pluginapi.GetTopEvictionPodsResponse{}, nil + } + + softThresholdRate := p.softThresholdGetter(v1.ResourceName(resourceName)) + if softThresholdRate == nil { + klog.Warningf("failed to get soft threshold: %v", resourceName) + return &pluginapi.GetTopEvictionPodsResponse{}, nil + } + + targetPods = p.getTopSoftEvictionPods(candidateEvictionPods, values, *softThresholdRate*float64(totalQuantity.Value())) + } var deletionOptions *pluginapi.DeletionOptions if gracePeriod := p.deletionGracePeriodGetter(); gracePeriod >= 0 { @@ -355,11 +415,89 @@ func (p *ZoneResourcesPlugin) GetTopEvictionPods(ctx context.Context, request *p } return &pluginapi.GetTopEvictionPodsResponse{ - TargetPods: candidateEvictionPods[:retLen], + TargetPods: targetPods, DeletionOptions: deletionOptions, }, nil } +func (p *ZoneResourcesPlugin) getTopSoftEvictionPods(candidateEvictionPods []*v1.Pod, resReq map[string]int64, threshold float64) (targetPods []*v1.Pod) { + // soft evict handle notification-enabled pod only + var notifyFilteredPods []*v1.Pod + for _, pod := range candidateEvictionPods { + if v, ok := pod.Annotations[apiconsts.PodAnnotationSoftEvictNotificationKey]; ok && strings.ToLower(v) == "true" { + notifyFilteredPods = append(notifyFilteredPods, pod) + } + } + + if len(notifyFilteredPods) == 0 { + klog.Infof("GetTopEvictionPods got empty active pods after filtering: %d", len(candidateEvictionPods)) + return + } + + // get pod score if score getter exist + scores := make(map[string]float64, len(notifyFilteredPods)) + if p.evictScoreGetter != nil { + for _, pod := range notifyFilteredPods { + if s := p.evictScoreGetter(pod); s != nil { + scores[string(pod.UID)] = *s + } + } + } + + // sort pods + sort.Slice(notifyFilteredPods, func(i, j int) bool { + // sort by score first + valueI, valueJ := math.MaxFloat64, math.MaxFloat64 + if s, ok := scores[string(notifyFilteredPods[i].UID)]; ok { + valueI = s + } + + if s, ok := scores[string(notifyFilteredPods[j].UID)]; ok { + valueJ = s + } + + if valueI != valueJ { + return valueI > valueJ + } + + // sort by request if with same score + return resReq[string(notifyFilteredPods[i].UID)] < resReq[string(notifyFilteredPods[j].UID)] + }) + + // get pods that can keep running + var usedRequest int64 + cnt := 0 + for _, pod := range notifyFilteredPods { + req, ok := resReq[string(pod.UID)] + if !ok { + klog.Warningf("pod resource request not exist: %s", pod.Name) + continue + } + + usedRequest += req + if float64(usedRequest) >= threshold { + break + } + cnt++ + } + + // get topN pod to eviction + general.SliceReverse(notifyFilteredPods) + podCnt := general.Max(len(notifyFilteredPods)-cnt, 1) + retLen := uint64(general.Min(len(notifyFilteredPods), podCnt)) + klog.Infof("getTopSoftEvictionPods get %d soft pods", retLen) + + return notifyFilteredPods[:retLen] +} + +func (p *ZoneResourcesPlugin) getTopHardEvictionPods(candidateEvictionPods []*v1.Pod, resReq map[string]int64, topN uint64) []*v1.Pod { + sort.Slice(candidateEvictionPods, func(i, j int) bool { + return resReq[string(candidateEvictionPods[i].UID)] > resReq[string(candidateEvictionPods[j].UID)] + }) + + return candidateEvictionPods[:general.MinUInt64(topN, uint64(len(candidateEvictionPods)))] +} + // getZoneAllocatable traverses the topology tree and collects allocatable resources // for zones matching the plugin's zoneType. Children are recursively visited. func (p *ZoneResourcesPlugin) getZoneAllocatable( diff --git a/pkg/agent/evictionmanager/plugin/resource/zone_resources_test.go b/pkg/agent/evictionmanager/plugin/resource/zone_resources_test.go index 9dba02787f..d250d1a197 100644 --- a/pkg/agent/evictionmanager/plugin/resource/zone_resources_test.go +++ b/pkg/agent/evictionmanager/plugin/resource/zone_resources_test.go @@ -51,6 +51,8 @@ func TestZoneResourcesPlugin_BasicAPIs(t *testing.T) { metrics.DummyMetrics{}, nil, func(corev1.ResourceName) *float64 { return nil }, + func(corev1.ResourceName) *float64 { return nil }, + nil, func() int64 { return 0 }, func() int64 { return 0 }, nil, @@ -92,6 +94,8 @@ func TestZoneResourcesPlugin_GetTopEvictionPods_EmptyActive(t *testing.T) { metrics.DummyMetrics{}, nil, func(corev1.ResourceName) *float64 { return nil }, + func(corev1.ResourceName) *float64 { return nil }, + nil, func() int64 { return 1 }, func() int64 { return 0 }, nil, @@ -130,6 +134,8 @@ func TestZoneResourcesPlugin_ThresholdMet_NoFilteredPods(t *testing.T) { metrics.DummyMetrics{}, nil, func(corev1.ResourceName) *float64 { r := 0.5; return &r }, + func(corev1.ResourceName) *float64 { return nil }, + nil, func() int64 { return 0 }, func() int64 { return 0 }, nil, diff --git a/pkg/agent/evictionmanager/plugin/resource/zone_resources_threshold_test.go b/pkg/agent/evictionmanager/plugin/resource/zone_resources_threshold_test.go index f2fe4ce23d..ab3be38c55 100644 --- a/pkg/agent/evictionmanager/plugin/resource/zone_resources_threshold_test.go +++ b/pkg/agent/evictionmanager/plugin/resource/zone_resources_threshold_test.go @@ -76,7 +76,7 @@ func TestZoneResourcesPlugin_ThresholdMet_GPU(t *testing.T) { ctx.Client.InternalClient.NodeV1alpha1().CustomNodeResources()), }} p := NewZoneResourcesPlugin("test", nodev1alpha1.TopologyTypeGPU, ms, metrics.DummyMetrics{}, nil, - func(corev1.ResourceName) *float64 { r := 0.5; return &r }, func() int64 { return 0 }, func() int64 { return 0 }, + func(corev1.ResourceName) *float64 { r := 0.5; return &r }, func(corev1.ResourceName) *float64 { return nil }, nil, func() int64 { return 0 }, func() int64 { return 0 }, nil, func(*corev1.Pod) (bool, error) { return true, nil }) met, err := p.ThresholdMet(context.TODO(), nil) if err != nil { @@ -125,7 +125,9 @@ func TestZoneResourcesPlugin_ThresholdMet_SkipZero(t *testing.T) { }} skip := sets.NewString("nvidia.com/gpu") p := NewZoneResourcesPlugin("test", nodev1alpha1.TopologyTypeGPU, ms, metrics.DummyMetrics{}, nil, - func(corev1.ResourceName) *float64 { r := 0.5; return &r }, func() int64 { return 0 }, func() int64 { return 0 }, + func(corev1.ResourceName) *float64 { r := 0.5; return &r }, + func(corev1.ResourceName) *float64 { return nil }, nil, + func() int64 { return 0 }, func() int64 { return 0 }, skip, func(*corev1.Pod) (bool, error) { return true, nil }) met, err := p.ThresholdMet(context.TODO(), nil) if err != nil { diff --git a/pkg/agent/evictionmanager/podnotifier/notifier.go b/pkg/agent/evictionmanager/podnotifier/notifier.go new file mode 100644 index 0000000000..81d0bc939a --- /dev/null +++ b/pkg/agent/evictionmanager/podnotifier/notifier.go @@ -0,0 +1,232 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podnotifier + +import ( + "context" + "fmt" + "os" + "path" + "path/filepath" + "strings" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/events" + "k8s.io/klog/v2" + + apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +const ( + MetricsNameNotifyPod = "notify_pod" + + HostPathNotifyFileName = "soft_eviction_notify" + HostPathNotifyFileSeparator = "@@__@@" +) + +var ( + SyncHostPathIntv = time.Minute + CleanHostPathIntv = 10 * time.Minute +) + +// Notifier implements pod notify logic +type Notifier interface { + // Name returns name as identifier for a specific Notifier. + Name() string + + // Run + Run(ctx context.Context) + + // Notify a pod that eviction maybe happened. + Notify(ctx context.Context, pod *v1.Pod, reason, plugin string) error +} + +// HostPathPodNotifier implements Notifier interface by hostpath. +type HostPathPodNotifier struct { + rootPath string + client kubernetes.Interface + metaServer *metaserver.MetaServer + emitter metrics.MetricEmitter + recorder events.EventRecorder +} + +// NewHostPathPodNotifier returns a new updater Object. +func NewHostPathPodNotifier(cfg *config.Configuration, client kubernetes.Interface, metaServer *metaserver.MetaServer, recorder events.EventRecorder, emitter metrics.MetricEmitter) (Notifier, error) { + if cfg.HostPathNotifierRootPath == "" { + return nil, fmt.Errorf("HostPathNotifierRootPath must be set") + } + + return &HostPathPodNotifier{ + client: client, + metaServer: metaServer, + emitter: emitter, + recorder: recorder, + rootPath: cfg.HostPathNotifierRootPath, + }, nil +} + +func (n *HostPathPodNotifier) Name() string { return consts.NotifierNameHostPath } + +func (n *HostPathPodNotifier) Run(ctx context.Context) { + go wait.Until(n.cleanHostPath, CleanHostPathIntv, ctx.Done()) + go wait.Until(n.createHostPath, SyncHostPathIntv, ctx.Done()) +} + +func (n *HostPathPodNotifier) genPodNotifyPath(pod *v1.Pod) string { + if pod == nil { + return "" + } + + return filepath.Join(n.rootPath, pod.Name+HostPathNotifyFileSeparator+pod.Namespace) +} + +func (n *HostPathPodNotifier) createHostPath() { + klog.Infof("[createHostPath] creating host path") + podFilter := func(pod *v1.Pod) bool { + if pod == nil { + return false + } + + if !native.PodIsActive(pod) { + return false + } + + v, ok := pod.Annotations[apiconsts.PodAnnotationSoftEvictNotificationKey] + if !ok { + return false + } + + return strings.ToLower(v) == "true" + } + + notifyPods, err := n.metaServer.GetPodList(context.TODO(), podFilter) + if err != nil { + klog.Errorf("get notify pods failed: %s", err) + return + } + + for _, pod := range notifyPods { + podPath := n.genPodNotifyPath(pod) + _, err := os.Stat(podPath) + if err == nil { + continue + } else if os.IsNotExist(err) { + err := os.MkdirAll(podPath, os.ModePerm) + if err != nil { + klog.Errorf("create pod path failed: %s, %s", podPath, err) + continue + } + } + } +} + +func (n *HostPathPodNotifier) cleanHostPath() { + klog.Infof("[cleanHostPath] cleaning host path") + dirs, err := os.ReadDir(n.rootPath) + if err != nil { + klog.Errorf("failed to read dir %s: %v", n.rootPath, err) + return + } + + podList, err := n.metaServer.GetPodList(context.Background(), nil) + if err != nil { + klog.Errorf("get pod list failed: %s", err) + return + } + + existingPods := make(map[string]bool) + for _, pod := range podList { + existingPods[path.Base(n.genPodNotifyPath(pod))] = true + } + + for _, dir := range dirs { + if !dir.IsDir() { + continue + } + + dirName := dir.Name() + if !strings.Contains(dirName, HostPathNotifyFileSeparator) { + continue + } + + if _, ok := existingPods[dirName]; !ok { + err = os.RemoveAll(filepath.Join(n.rootPath, dirName)) + klog.Infof("remove non-exists pod dir %s: %v", filepath.Join(n.rootPath, dirName), err) + } + } +} + +func (n *HostPathPodNotifier) Notify(_ context.Context, pod *v1.Pod, reason, plugin string) error { + notifyPod := func(pod *v1.Pod) error { + klog.Infof("[host-path-notifier] send notify to pod %v/%v", pod.Namespace, pod.Name) + + notifyPath := filepath.Join(n.genPodNotifyPath(pod), HostPathNotifyFileName) + f, err := os.OpenFile(notifyPath, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + return fmt.Errorf("[host-path-notifier] failed to open notify file: %s, %v", notifyPath, err) + } + + _, err = f.WriteString(time.Now().String()) + if err != nil { + return fmt.Errorf("[host-path-notifier] failed to write notify file: %s, %v", notifyPath, err) + } + + return nil + } + + return notify(n.recorder, n.emitter, pod, reason, plugin, notifyPod) +} + +func notify(recorder events.EventRecorder, emitter metrics.MetricEmitter, pod *v1.Pod, + reason, plugin string, notifyPod func(_ *v1.Pod) error, +) error { + if pod == nil { + return fmt.Errorf("[notifier] invalid pod") + } + + klog.Infof("[notifier] notify pod %v/%v", pod.Namespace, pod.Name) + if err := notifyPod(pod); err != nil { + recorder.Eventf(pod, nil, v1.EventTypeNormal, consts.EventReasonNotifyFailed, consts.EventActionNotifying, + fmt.Sprintf("notify failed: %s", err)) + _ = emitter.StoreInt64(MetricsNameNotifyPod, 1, metrics.MetricTypeNameRaw, + metrics.MetricTag{Key: "state", Val: "failed"}, + metrics.MetricTag{Key: "pod_ns", Val: pod.Namespace}, + metrics.MetricTag{Key: "pod_name", Val: pod.Name}, + metrics.MetricTag{Key: "plugin_name", Val: plugin}) + + return fmt.Errorf("[notifier] notify pod failed: %s, %v", pod.Name, err) + } + + recorder.Eventf(pod, nil, v1.EventTypeNormal, consts.EventReasonNotifySuccess, consts.EventActionNotifying, + "notify pod successfully; reason: %s", reason) + _ = emitter.StoreInt64(MetricsNameNotifyPod, 1, metrics.MetricTypeNameRaw, + metrics.MetricTag{Key: "state", Val: "succeeded"}, + metrics.MetricTag{Key: "pod_ns", Val: pod.Namespace}, + metrics.MetricTag{Key: "pod_name", Val: pod.Name}, + metrics.MetricTag{Key: "plugin_name", Val: plugin}) + klog.Infof("[notifier] successfully notify pod %v/%v", pod.Namespace, pod.Name) + + return nil +} diff --git a/pkg/agent/evictionmanager/podnotifier/notifier_test.go b/pkg/agent/evictionmanager/podnotifier/notifier_test.go new file mode 100644 index 0000000000..74fef28350 --- /dev/null +++ b/pkg/agent/evictionmanager/podnotifier/notifier_test.go @@ -0,0 +1,109 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podnotifier + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/events" + + apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" + "github.com/kubewharf/katalyst-core/pkg/metrics" +) + +var pods = []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: "pod-1", + Annotations: map[string]string{apiconsts.PodAnnotationSoftEvictNotificationKey: "true"}, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + UID: "pod-2", + Annotations: map[string]string{apiconsts.PodAnnotationSoftEvictNotificationKey: "true"}, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + UID: "pod-3", + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + }, +} + +func makeConf() *config.Configuration { + conf := config.NewConfiguration() + conf.HostPathNotifierRootPath = "/tmp/katalyst" + os.MkdirAll(conf.HostPathNotifierRootPath, os.ModePerm) + return conf +} + +func makeMetaServer() *metaserver.MetaServer { + return &metaserver.MetaServer{ + MetaAgent: &agent.MetaAgent{ + PodFetcher: &pod.PodFetcherStub{PodList: pods}, + }, + } +} + +func TestHostPathPodNotifier(t *testing.T) { + t.Parallel() + + client := fake.NewSimpleClientset( + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: "pod-1", + }, + }, + ) + + notifier, err := NewHostPathPodNotifier(makeConf(), client, makeMetaServer(), events.NewFakeRecorder(4096), metrics.DummyMetrics{}) + require.NoError(t, err) + SyncHostPathIntv = time.Second + notifier.Run(context.Background()) + time.Sleep(5 * SyncHostPathIntv) + + err = notifier.Notify(context.Background(), pods[0], "test", "test") + require.NoError(t, err) + + err = notifier.Notify(context.Background(), pods[2], "test", "test") + require.Error(t, err) +} diff --git a/pkg/agent/evictionmanager/podnotifier/podnotifier.go b/pkg/agent/evictionmanager/podnotifier/podnotifier.go new file mode 100644 index 0000000000..fb512d2e66 --- /dev/null +++ b/pkg/agent/evictionmanager/podnotifier/podnotifier.go @@ -0,0 +1,102 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podnotifier + +import ( + "context" + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/rule" +) + +const ( + SynchronizedPodNotifierName = "synchronized-pod-notifier" +) + +// PodNotifier implements the notify soft eviction actions for given pods. +type PodNotifier interface { + // Name returns name as identifier for a specific notifier. + Name() string + + // Start pod notifier logic. + Start(ctx context.Context) + + // NotifyPods notify a list pods. + NotifyPods(rpList rule.RuledEvictPodList) error + + // NotifyPod notify a pod. + NotifyPod(rp *rule.RuledEvictPod) error +} + +// SynchronizedPodNotifier trigger notify actions immediately after +// receiving notify requests; only returns true if all pods are +// successfully notified. +type SynchronizedPodNotifier struct { + notifier Notifier +} + +func NewSynchronizedPodNotifier(notifier Notifier) PodNotifier { + return &SynchronizedPodNotifier{ + notifier: notifier, + } +} + +func (s *SynchronizedPodNotifier) Name() string { return SynchronizedPodNotifierName } + +func (s *SynchronizedPodNotifier) Start(ctx context.Context) { + klog.Infof("[synchronized] pod-notifier run with notifier %v", s.notifier.Name()) + s.notifier.Run(ctx) + defer klog.Infof("[synchronized] pod-notifier started") +} + +func (s *SynchronizedPodNotifier) NotifyPod(rp *rule.RuledEvictPod) error { + if rp == nil || rp.Pod == nil { + return fmt.Errorf("NotifyPod got nil pod") + } + + err := s.notifier.Notify(context.Background(), rp.Pod, rp.Reason, rp.EvictionPluginName) + if err != nil { + return fmt.Errorf("notify pod: %s/%s failed with error: %v", rp.Pod.Namespace, rp.Pod.Name, err) + } + + return nil +} + +func (s *SynchronizedPodNotifier) NotifyPods(rpList rule.RuledEvictPodList) error { + var errList []error + var mtx sync.Mutex + + klog.Infof("[synchronized] pod-notifier evict %d totally", len(rpList)) + syncNodeUtilizationAndAdjust := func(i int) { + err := s.NotifyPod(rpList[i]) + + mtx.Lock() + if err != nil { + errList = append(errList, err) + } + mtx.Unlock() + } + workqueue.ParallelizeUntil(context.Background(), 3, len(rpList), syncNodeUtilizationAndAdjust) + + klog.Infof("[synchronized] successfully evict %d totally", len(rpList)-len(errList)) + return errors.NewAggregate(errList) +} diff --git a/pkg/agent/evictionmanager/podnotifier/podnotifier_test.go b/pkg/agent/evictionmanager/podnotifier/podnotifier_test.go new file mode 100644 index 0000000000..cf707272df --- /dev/null +++ b/pkg/agent/evictionmanager/podnotifier/podnotifier_test.go @@ -0,0 +1,74 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podnotifier + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + pluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1" + "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/rule" +) + +// mock implementations for interfaces + +type mockNotifier struct { + NotifyFunc func(ctx context.Context, pod *v1.Pod, reason, plugin string) error +} + +func (m *mockNotifier) Name() string { + return "mock-notifier" +} + +func (m *mockNotifier) Run(_ context.Context) { + return +} + +func (m *mockNotifier) Notify(ctx context.Context, pod *v1.Pod, reason, plugin string) error { + if m.NotifyFunc != nil { + return m.NotifyFunc(ctx, pod, reason, plugin) + } + return nil +} + +func TestSynchronizedPodNotifier(t *testing.T) { + t.Parallel() + + testPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: "uid-1", + }, + } + + notifyPod := &pluginapi.EvictPod{ + Pod: testPod, + Reason: "test", + ForceEvict: false, + EvictionPluginName: "test", + } + + notifier := NewSynchronizedPodNotifier(&mockNotifier{}) + notifier.Name() + notifier.Start(context.Background()) + err := notifier.NotifyPods(rule.RuledEvictPodList{&rule.RuledEvictPod{EvictPod: notifyPod}}) + require.NoError(t, err) +} diff --git a/pkg/config/agent/dynamic/adminqos/eviction/reclaimed_resources_eviction.go b/pkg/config/agent/dynamic/adminqos/eviction/reclaimed_resources_eviction.go index c1ac410f55..144062eeb2 100644 --- a/pkg/config/agent/dynamic/adminqos/eviction/reclaimed_resources_eviction.go +++ b/pkg/config/agent/dynamic/adminqos/eviction/reclaimed_resources_eviction.go @@ -23,13 +23,15 @@ import ( type ReclaimedResourcesEvictionConfiguration struct { EvictionThreshold native.ResourceThreshold + SoftEvictionThreshold native.ResourceThreshold DeletionGracePeriod int64 ThresholdMetToleranceDuration int64 } func NewReclaimedResourcesEvictionConfiguration() *ReclaimedResourcesEvictionConfiguration { return &ReclaimedResourcesEvictionConfiguration{ - EvictionThreshold: native.ResourceThreshold{}, + EvictionThreshold: native.ResourceThreshold{}, + SoftEvictionThreshold: native.ResourceThreshold{}, } } @@ -41,6 +43,10 @@ func (c *ReclaimedResourcesEvictionConfiguration) ApplyConfiguration(conf *crd.D c.EvictionThreshold[resourceName] = value } + for resourceName, value := range config.SoftEvictionThreshold { + c.SoftEvictionThreshold[resourceName] = value + } + if config.GracePeriod != nil { c.DeletionGracePeriod = *config.GracePeriod } diff --git a/pkg/config/agent/eviction/eviciton_base.go b/pkg/config/agent/eviction/eviciton_base.go index 2e7f1791ba..22765ed172 100644 --- a/pkg/config/agent/eviction/eviciton_base.go +++ b/pkg/config/agent/eviction/eviciton_base.go @@ -57,6 +57,9 @@ type GenericEvictionConfiguration struct { // RecordManager specifies the eviction record manager to use RecordManager string + + // HostPathNotifierRootPath + HostPathNotifierRootPath string } type EvictionConfiguration struct { diff --git a/pkg/consts/common.go b/pkg/consts/common.go index 31295d2e60..97286def60 100644 --- a/pkg/consts/common.go +++ b/pkg/consts/common.go @@ -41,12 +41,16 @@ const ( EventReasonEvictExceededGracePeriod = "EvictExceededGracePeriod" EventReasonEvictSucceeded = "EvictSucceeded" + EventReasonNotifyFailed = "NotifyFailed" + EventReasonNotifySuccess = "NotifySuccess" + EventReasonContainerStopped = "ContainerStopped" ) // const variable for pod eviction action identifier in event. const ( EventActionEvicting = "Evicting" + EventActionNotifying = "Notifying" EventActionContainerStopping = "ContainerStopping" ) diff --git a/pkg/consts/evictionmanager.go b/pkg/consts/evictionmanager.go index 6dc5577a05..e28ca52bf0 100644 --- a/pkg/consts/evictionmanager.go +++ b/pkg/consts/evictionmanager.go @@ -21,6 +21,8 @@ const ( KillerNameEvictionKiller = "eviction-api-killer" KillerNameDeletionKiller = "deletion-api-killer" KillerNameContainerKiller = "container-killer" + + NotifierNameHostPath = "host-path-notifier" ) const ( diff --git a/pkg/controller/kcc/kcct_integration_test.go b/pkg/controller/kcc/kcct_integration_test.go index 6e11739c65..17bfd39544 100644 --- a/pkg/controller/kcc/kcct_integration_test.go +++ b/pkg/controller/kcc/kcct_integration_test.go @@ -86,6 +86,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 5.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 1.5, + }, }, }, }, @@ -107,6 +110,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 10.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 6.0, + }, }, }, }, @@ -136,6 +142,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 15.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 10.0, + }, }, }, }, @@ -177,6 +186,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 5.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 1.5, + }, }, }, }, @@ -186,7 +198,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { CanaryNodes: 1, UpdatedTargetNodes: 1, UpdatedNodes: 1, - CurrentHash: "b3ffe223d46e", + CurrentHash: "190b30322065", ObservedGeneration: 1, Conditions: []v1alpha1.GenericConfigCondition{ { @@ -213,6 +225,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 10.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 6.0, + }, }, }, }, @@ -222,7 +237,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { CanaryNodes: 1, UpdatedTargetNodes: 1, UpdatedNodes: 1, - CurrentHash: "8c1738cd5743", + CurrentHash: "71cf0929a297", ObservedGeneration: 1, Conditions: []v1alpha1.GenericConfigCondition{ { @@ -257,6 +272,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 15.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 10.0, + }, }, }, }, @@ -266,7 +284,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { CanaryNodes: 1, UpdatedTargetNodes: 1, UpdatedNodes: 1, - CurrentHash: "a7f48154abe2", + CurrentHash: "d4b70d79b95f", ObservedGeneration: 1, Conditions: []v1alpha1.GenericConfigCondition{ { @@ -289,7 +307,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { ConfigType: crd.AdminQoSConfigurationGVR, ConfigNamespace: "default", ConfigName: "node-1", - Hash: "a7f48154abe2", + Hash: "d4b70d79b95f", }, }, }, @@ -307,7 +325,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { ConfigType: crd.AdminQoSConfigurationGVR, ConfigNamespace: "default", ConfigName: "aa-bb", - Hash: "8c1738cd5743", + Hash: "71cf0929a297", }, }, }, @@ -322,7 +340,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { ConfigType: crd.AdminQoSConfigurationGVR, ConfigNamespace: "default", ConfigName: "default", - Hash: "b3ffe223d46e", + Hash: "190b30322065", }, }, }, @@ -363,6 +381,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 5.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 1.5, + }, }, }, }, @@ -384,6 +405,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 10.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 6.0, + }, }, }, }, @@ -406,6 +430,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 15.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 10.0, + }, }, }, }, @@ -477,6 +504,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 5.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 1.5, + }, }, }, }, @@ -501,6 +531,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 10.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 6.0, + }, }, }, }, @@ -534,6 +567,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 15.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 10.0, + }, }, }, }, @@ -636,6 +672,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 5.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 1.5, + }, }, }, }, @@ -645,7 +684,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { CanaryNodes: 1, UpdatedTargetNodes: 1, UpdatedNodes: 1, - CurrentHash: "b3ffe223d46e", + CurrentHash: "190b30322065", ObservedGeneration: 1, Conditions: []v1alpha1.GenericConfigCondition{ { @@ -681,6 +720,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 15.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 10.0, + }, }, }, }, @@ -690,7 +732,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { CanaryNodes: 2, UpdatedTargetNodes: 2, UpdatedNodes: 2, - CurrentHash: "8c1738cd5743", + CurrentHash: "d4b70d79b95f", ObservedGeneration: 1, Conditions: []v1alpha1.GenericConfigCondition{ { @@ -713,7 +755,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { ConfigType: crd.AdminQoSConfigurationGVR, ConfigNamespace: "default", ConfigName: "default", - Hash: "b3ffe223d46e", + Hash: "190b30322065", }, }, }, @@ -770,6 +812,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 5.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 1.5, + }, }, }, }, @@ -779,7 +824,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { CanaryNodes: 1, UpdatedTargetNodes: 1, UpdatedNodes: 1, - CurrentHash: "b3ffe223d46e", + CurrentHash: "190b30322065", ObservedGeneration: 1, Conditions: []v1alpha1.GenericConfigCondition{ { @@ -815,6 +860,9 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ v1.ResourceCPU: 15.0, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + v1.ResourceCPU: 10.0, + }, }, }, }, @@ -824,7 +872,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { CanaryNodes: 1, UpdatedTargetNodes: 1, UpdatedNodes: 1, - CurrentHash: "a7f48154abe2", + CurrentHash: "d4b70d79b95f", ObservedGeneration: 2, Conditions: []v1alpha1.GenericConfigCondition{ { @@ -847,7 +895,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { ConfigType: crd.AdminQoSConfigurationGVR, ConfigNamespace: "default", ConfigName: "default", - Hash: "b3ffe223d46e", + Hash: "190b30322065", }, }, }, @@ -865,7 +913,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { ConfigType: crd.AdminQoSConfigurationGVR, ConfigNamespace: "default", ConfigName: "aa-bb", - Hash: "a7f48154abe2", + Hash: "d4b70d79b95f", }, }, }, @@ -934,7 +982,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { CanaryNodes: 1, UpdatedTargetNodes: 1, UpdatedNodes: 1, - CurrentHash: "b3ffe223d46e", + CurrentHash: "d66c6afd9fc2", ObservedGeneration: 1, Conditions: []v1alpha1.GenericConfigCondition{ { @@ -998,7 +1046,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { ConfigType: crd.AdminQoSConfigurationGVR, ConfigNamespace: "default", ConfigName: "default", - Hash: "b3ffe223d46e", + Hash: "d66c6afd9fc2", }, }, }, @@ -1046,7 +1094,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { CanaryNodes: 2, UpdatedTargetNodes: 2, UpdatedNodes: 2, - CurrentHash: "b3ffe223d46e", + CurrentHash: "d66c6afd9fc2", ObservedGeneration: 1, Conditions: []v1alpha1.GenericConfigCondition{ { @@ -1108,7 +1156,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { ConfigType: crd.AdminQoSConfigurationGVR, ConfigNamespace: "default", ConfigName: "default", - Hash: "b3ffe223d46e", + Hash: "d66c6afd9fc2", }, }, }, @@ -1126,7 +1174,7 @@ func TestKatalystCustomConfigTargetController_Run(t *testing.T) { ConfigType: crd.AdminQoSConfigurationGVR, ConfigNamespace: "default", ConfigName: "default", - Hash: "b3ffe223d46e", + Hash: "d66c6afd9fc2", }, }, }, diff --git a/pkg/util/general/slice.go b/pkg/util/general/slice.go index a0412b5d97..f3c28f65ac 100644 --- a/pkg/util/general/slice.go +++ b/pkg/util/general/slice.go @@ -165,3 +165,11 @@ func ConvertIntSliceToBitmapString(nums []int64) (string, error) { } return bitmapStr, nil } + +func SliceReverse(s interface{}) { + n := reflect.ValueOf(s).Len() + swap := reflect.Swapper(s) + for i, j := 0, n-1; i < j; i, j = i+1, j-1 { + swap(i, j) + } +} diff --git a/pkg/util/general/slice_test.go b/pkg/util/general/slice_test.go index e1712433c0..45b682a695 100644 --- a/pkg/util/general/slice_test.go +++ b/pkg/util/general/slice_test.go @@ -565,6 +565,50 @@ func TestConvertIntSliceToBitmapString(t *testing.T) { } } +func TestSliceReverse(t *testing.T) { + t.Parallel() + + type args struct { + a []int + } + tests := []struct { + name string + args args + want []int + }{ + { + name: "empty slice", + args: args{ + a: []int{}, + }, + want: []int{}, + }, { + name: "positive numbers", + args: args{ + a: []int{1, 2, 3}, + }, + want: []int{3, 2, 1}, + }, { + name: "negative numbers", + args: args{ + a: []int{-1, -2, -3}, + }, + want: []int{-3, -2, -1}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + SliceReverse(tt.args.a) + if !equalIntSlices(tt.args.a, tt.want) { + t.Errorf("SliceReverse() = %v, want %v", tt.args.a, tt.want) + } + }) + } +} + // Helper functions for comparing slices func equalInt64Slices(a, b []int64) bool { if len(a) != len(b) { diff --git a/pkg/util/kcct_test.go b/pkg/util/kcct_test.go index 3c84905e49..c8d2faacbb 100644 --- a/pkg/util/kcct_test.go +++ b/pkg/util/kcct_test.go @@ -918,6 +918,9 @@ func Test_kccTargetResource_GenerateConfigHash(t *testing.T) { EvictionThreshold: map[v1.ResourceName]float64{ "aa": 11, }, + SoftEvictionThreshold: map[v1.ResourceName]float64{ + "bb": 22, + }, }, MemoryPressureEvictionConfig: &v1alpha1.MemoryPressureEvictionConfig{ NumaFreeBelowWatermarkTimesThreshold: &nonDefaultNumaFreeBelowWatermarkTimesThreshold, @@ -931,7 +934,7 @@ func Test_kccTargetResource_GenerateConfigHash(t *testing.T) { }, }), }, - want: "c16ed236c692", + want: "07f339db4d03", }, { name: "test-2", diff --git a/pkg/util/native/pods.go b/pkg/util/native/pods.go index 079580e5b0..524b475799 100644 --- a/pkg/util/native/pods.go +++ b/pkg/util/native/pods.go @@ -21,6 +21,7 @@ import ( "strings" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" @@ -77,6 +78,29 @@ func FilterPods(pods []*v1.Pod, filterFunc func(*v1.Pod) (bool, error)) []*v1.Po return filtered } +// GetPodRequestResources get pod resource quantity by resource name +func GetPodRequestResources(pod *v1.Pod, name v1.ResourceName) *resource.Quantity { + ret := resource.NewQuantity(0, resource.DecimalSI) + + for _, container := range pod.Spec.Containers { + if q, ok := container.Resources.Requests[name]; ok { + ret.Add(q) + } + } + + if q, ok := pod.Spec.Overhead[name]; ok { + ret.Add(q) + } + + for _, container := range pod.Spec.InitContainers { + if q, ok := container.Resources.Requests[name]; ok { + ret.Add(q) + } + } + + return ret +} + // SumUpPodRequestResources sum up resources in all containers request // init container is included (count on the max request of all init containers) func SumUpPodRequestResources(pod *v1.Pod) v1.ResourceList { diff --git a/pkg/util/native/pods_test.go b/pkg/util/native/pods_test.go index 04f3a0f442..4e9878b011 100644 --- a/pkg/util/native/pods_test.go +++ b/pkg/util/native/pods_test.go @@ -22,7 +22,10 @@ import ( "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kubewharf/katalyst-api/pkg/consts" ) func TestFilterPodAnnotations(t *testing.T) { @@ -329,3 +332,61 @@ func TestPodIsPending(t *testing.T) { }) } } + +func TestGetPodRequestResources(t *testing.T) { + t.Parallel() + + pods := []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelReclaimedCores, + consts.PodAnnotationSoftEvictNotificationKey: "hostpath", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "test-container-1", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + consts.ReclaimedResourceMilliCPU: resource.MustParse("100"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-2", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelReclaimedCores, + consts.PodAnnotationSoftEvictNotificationKey: "hostpath", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "test-container-2", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + consts.ReclaimedResourceMilliCPU: resource.MustParse("50"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-2", + }, + }, + } + + res := GetPodRequestResources(pods[0], consts.ReclaimedResourceMilliCPU) + assert.NotEqual(t, res, (*resource.Quantity)(nil)) + assert.Equal(t, res.Value(), int64(100)) +}