Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

type ReclaimedResourcesEvictionOptions struct {
EvictionThreshold native.ResourceThreshold
SoftEvictionThreshold native.ResourceThreshold
GracePeriod int64
ThresholdMetToleranceDuration int64
}
Expand All @@ -36,6 +37,10 @@ func NewReclaimedResourcesEvictionOptions() *ReclaimedResourcesEvictionOptions {
consts.ReclaimedResourceMilliCPU: 5.0,
consts.ReclaimedResourceMemory: 5.0,
},
SoftEvictionThreshold: native.ResourceThreshold{
consts.ReclaimedResourceMilliCPU: 1.5,
consts.ReclaimedResourceMemory: 1.2,
},
GracePeriod: 60,
ThresholdMetToleranceDuration: 0,
}
Expand All @@ -54,6 +59,7 @@ func (o *ReclaimedResourcesEvictionOptions) AddFlags(fss *cliflag.NamedFlagSets)

func (o *ReclaimedResourcesEvictionOptions) ApplyTo(c *eviction.ReclaimedResourcesEvictionConfiguration) error {
c.EvictionThreshold = o.EvictionThreshold
c.SoftEvictionThreshold = o.SoftEvictionThreshold
c.DeletionGracePeriod = o.GracePeriod
c.ThresholdMetToleranceDuration = o.ThresholdMetToleranceDuration
return nil
Expand Down
8 changes: 8 additions & 0 deletions cmd/katalyst-agent/app/options/eviction/eviction_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type GenericEvictionOptions struct {

// RecordManager specifies the eviction record manager to use
RecordManager string

// HostPathNotifierPathRoot is the root path for host-path notifier
HostPathNotifierRootPath string
}

// NewGenericEvictionOptions creates a new Options with a default config.
Expand All @@ -69,6 +72,7 @@ func NewGenericEvictionOptions() *GenericEvictionOptions {
EvictionSkippedAnnotationKeys: []string{},
EvictionSkippedLabelKeys: []string{},
EvictionBurst: 3,
HostPathNotifierRootPath: "/opt/katalyst",
PodKiller: consts.KillerNameEvictionKiller,
StrictAuthentication: false,
}
Expand Down Expand Up @@ -111,6 +115,9 @@ func (o *GenericEvictionOptions) AddFlags(fss *cliflag.NamedFlagSets) {

fs.StringVar(&o.RecordManager, "eviction-record-manager", o.RecordManager,
"the eviction record manager to use")

fs.StringVar(&o.HostPathNotifierRootPath, "pod-notifier-root-path", o.HostPathNotifierRootPath,
"root path of host-path notifier")
}

// ApplyTo fills up config with options
Expand All @@ -126,6 +133,7 @@ func (o *GenericEvictionOptions) ApplyTo(c *evictionconfig.GenericEvictionConfig
c.StrictAuthentication = o.StrictAuthentication
c.PodMetricLabels.Insert(o.PodMetricLabels...)
c.RecordManager = o.RecordManager
c.HostPathNotifierRootPath = o.HostPathNotifierRootPath
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/google/uuid v1.3.0
github.com/h2non/gock v1.2.0
github.com/klauspost/cpuid/v2 v2.2.6
github.com/kubewharf/katalyst-api v0.5.8-0.20251212030746-894fa2521a86
github.com/kubewharf/katalyst-api v0.5.9-0.20251226033202-b56ae9d0382b
github.com/moby/sys/mountinfo v0.6.2
github.com/montanaflynn/stats v0.7.1
github.com/opencontainers/runc v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.5.8-0.20251212030746-894fa2521a86 h1:GCqe9PcoTQ7akNDyAmavhnSrPV7sMAoYJ5jKEaJg4Ac=
github.com/kubewharf/katalyst-api v0.5.8-0.20251212030746-894fa2521a86/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/katalyst-api v0.5.9-0.20251226033202-b56ae9d0382b h1:SbneAGWJCpT2hCmhZ9StBKX3XyTcX1lU/Hf7qrJqELU=
github.com/kubewharf/katalyst-api v0.5.9-0.20251226033202-b56ae9d0382b/go.mod h1:BZMVGVl3EP0eCn5xsDgV41/gjYkoh43abIYxrB10e3k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9 h1:jOTYZt7h/J7I8xQMKMUcJjKf5UFBv37jHWvNp5VRFGc=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
36 changes: 36 additions & 0 deletions pkg/agent/evictionmanager/eviction_resp_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,42 @@ func (e *evictionRespCollector) collectMetThreshold(dryRunPlugins []string, plug
}
}

func (e *evictionRespCollector) collectTopSoftEvictionPods(dryRunPlugins []string, pluginName string,
threshold *pluginapi.ThresholdMetResponse, resp *pluginapi.GetTopEvictionPodsResponse,
) {
dryRun := e.isDryRun(dryRunPlugins, pluginName)

targetPods := make([]*v1.Pod, 0, len(resp.TargetPods))
for i, pod := range resp.TargetPods {
if pod == nil {
continue
}

general.Infof("%v plugin %v request to notify topN pod %v/%v, reason: met threshold in scope [%v]",
e.getLogPrefix(dryRun), pluginName, pod.Namespace, pod.Name, threshold.EvictionScope)
if dryRun {
metricsPodToEvict(e.emitter, e.conf.GenericConfiguration.QoSConfiguration, pluginName, pod, dryRun, e.conf.GenericEvictionConfiguration.PodMetricLabels)
} else {
targetPods = append(targetPods, resp.TargetPods[i])
}
}

for _, pod := range targetPods {
reason := fmt.Sprintf("plugin %s met threshold in scope %s, target %v, observed %v",
pluginName, threshold.EvictionScope, threshold.ThresholdValue, threshold.ObservedValue)

e.getSoftEvictPods()[string(pod.UID)] = &rule.RuledEvictPod{
EvictPod: &pluginapi.EvictPod{
Pod: pod.DeepCopy(),
Reason: reason,
ForceEvict: false,
EvictionPluginName: pluginName,
},
Scope: threshold.EvictionScope,
}
}
}

func (e *evictionRespCollector) collectTopEvictionPods(dryRunPlugins []string, pluginName string,
threshold *pluginapi.ThresholdMetResponse, resp *pluginapi.GetTopEvictionPodsResponse,
) {
Expand Down
65 changes: 59 additions & 6 deletions pkg/agent/evictionmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
clocks "k8s.io/utils/clock"

"github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-api/pkg/plugins/registration"
pluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1"
endpointpkg "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/endpoint"
Expand All @@ -46,6 +47,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/plugin/resource"
"github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/plugin/rootfs"
"github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/podkiller"
"github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/podnotifier"
"github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/record"
"github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/rule"
"github.com/kubewharf/katalyst-core/pkg/client"
Expand Down Expand Up @@ -109,7 +111,8 @@ type EvictionManger struct {
// easy to test the code.
clock clocks.WithTickerAndDelayedExecution

podKiller podkiller.PodKiller
podNotifier podnotifier.PodNotifier
podKiller podkiller.PodKiller

killQueue rule.EvictionQueue
killStrategy rule.EvictionStrategy
Expand Down Expand Up @@ -236,6 +239,12 @@ func NewEvictionManager(genericClient *client.GenericClientSet, recorder events.

podKiller := podkiller.NewAsynchronizedPodKiller(killer, metaServer.PodFetcher, genericClient.KubeClient)

notifier, err := podnotifier.NewHostPathPodNotifier(conf, genericClient.KubeClient, metaServer, recorder, emitter)
if err != nil {
return nil, fmt.Errorf("failed to create pod notifier: %v", err)
}
podNotifier := podnotifier.NewSynchronizedPodNotifier(notifier)

cnrTaintReporter, err := control.NewGenericReporterPlugin(cnrTaintReporterPluginName, conf, emitter)
if err != nil {
return nil, fmt.Errorf("failed to initialize cnr taint reporter plugin: %v", err)
Expand Down Expand Up @@ -264,6 +273,7 @@ func NewEvictionManager(genericClient *client.GenericClientSet, recorder events.
metaGetter: metaServer,
emitter: emitter,
podKiller: podKiller,
podNotifier: podNotifier,
cnrTaintReporter: cnrTaintReporter,
endpoints: make(map[string]endpointpkg.Endpoint),
conf: conf,
Expand Down Expand Up @@ -317,6 +327,7 @@ func (m *EvictionManger) Run(ctx context.Context) {
general.RegisterHeartbeatCheck(reportTaintHealthCheckName, reportTaintToleration,
general.HealthzCheckStateNotReady, reportTaintToleration)
m.podKiller.Start(ctx)
m.podNotifier.Start(ctx)
for _, endpoint := range m.endpoints {
endpoint.Start()
}
Expand Down Expand Up @@ -360,6 +371,11 @@ func (m *EvictionManger) sync(ctx context.Context) {
}

errList := make([]error, 0)
notifyErr := m.doNotify(collector.getSoftEvictPods())
if notifyErr != nil {
errList = append(errList, notifyErr)
}

evictErr := m.doEvict(collector.getSoftEvictPods(), collector.getForceEvictPods())
if evictErr != nil {
errList = append(errList, evictErr)
Expand Down Expand Up @@ -430,8 +446,8 @@ func (m *EvictionManger) collectEvictionResult(ctx context.Context, pods []*v1.P
records := m.getEvictionRecords(ctx, collector.currentCandidatePods)

for pluginName, threshold := range thresholdsMet {
if threshold.MetType != pluginapi.ThresholdMetType_HARD_MET {
general.Infof(" the type: %s of met threshold from plugin: %s isn't %s", threshold.MetType.String(), pluginName, pluginapi.ThresholdMetType_HARD_MET.String())
if threshold.MetType == pluginapi.ThresholdMetType_NOT_MET {
general.Infof("resp from plugin: %s not met threshold", pluginName)
continue
}

Expand All @@ -453,13 +469,20 @@ func (m *EvictionManger) collectEvictionResult(ctx context.Context, pods []*v1.P
}
}
}

topN := uint64(0)
forceEvict := false
if threshold.MetType == pluginapi.ThresholdMetType_HARD_MET {
topN = 1
forceEvict = true
}

resp, err := m.endpoints[pluginName].GetTopEvictionPods(context.Background(), &pluginapi.GetTopEvictionPodsRequest{
ActivePods: activePods,
TopN: 1,
TopN: topN,
EvictionScope: threshold.EvictionScope,
CandidateEvictionRecords: candidateEvictionRecords,
})

m.endpointLock.RUnlock()
if err != nil {
general.Errorf(" calling GetTopEvictionPods of plugin: %s failed with error: %v", pluginName, err)
Expand All @@ -473,12 +496,38 @@ func (m *EvictionManger) collectEvictionResult(ctx context.Context, pods []*v1.P
continue
}

collector.collectTopEvictionPods(dynamicConfig.DryRun, pluginName, threshold, resp)
if forceEvict {
collector.collectTopEvictionPods(dynamicConfig.DryRun, pluginName, threshold, resp)
} else {
collector.collectTopSoftEvictionPods(dynamicConfig.DryRun, pluginName, threshold, resp)
}

}

return collector, errors.NewAggregate(errList)
}

func (m *EvictionManger) doNotify(softEvictPods map[string]*rule.RuledEvictPod) error {
errList := make([]error, 0)

for _, pod := range softEvictPods {
if pod == nil || pod.EvictPod.Pod == nil {
continue
}

if v, ok := pod.EvictPod.Pod.Annotations[apiconsts.PodAnnotationSoftEvictNotificationKey]; !ok || strings.ToLower(v) != "true" {
continue
}

err := m.podNotifier.NotifyPod(pod)
if err != nil {
errList = append(errList, err)
}
}

return errors.NewAggregate(errList)
}

func (m *EvictionManger) doEvict(softEvictPods, forceEvictPods map[string]*rule.RuledEvictPod) error {
softEvictPods = filterOutCandidatePodsWithForcePods(softEvictPods, forceEvictPods)
bestSuitedCandidate := m.getEvictPodFromCandidates(softEvictPods)
Expand Down Expand Up @@ -637,6 +686,10 @@ func (m *EvictionManger) getEvictPodFromCandidates(candidateEvictPods map[string
for _, rp := range candidateEvictPods {
// only killing pods that pass candidate validation
if rp != nil && rp.Pod != nil && m.killStrategy.CandidateValidate(rp) {
// do NOT select soft evict pod with notification-enable as candidate
if v, ok := rp.Pod.Annotations[apiconsts.PodAnnotationSoftEvictNotificationKey]; ok && strings.ToLower(v) == "true" {
continue
}
rpList = append(rpList, rp)
}
}
Expand Down
53 changes: 47 additions & 6 deletions pkg/agent/evictionmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func makeConf() *config.Configuration {
conf.PodKiller = consts.KillerNameEvictionKiller
conf.GenericConfiguration.AuthConfiguration.AuthType = credential.AuthTypeInsecure
conf.GenericConfiguration.AuthConfiguration.AccessControlType = authorization.AccessControlTypeInsecure
conf.HostPathNotifierRootPath = "/opt/katalyst"

return conf
}
Expand Down Expand Up @@ -232,12 +233,46 @@ func (p plugin2) GetEvictPods(_ context.Context, _ *pluginapi.GetEvictPodsReques
return &pluginapi.GetEvictPodsResponse{EvictPods: []*pluginapi.EvictPod{}}, nil
}

type plugin3 struct {
pluginSkeleton
}

func (p plugin3) ThresholdMet(_ context.Context, _ *pluginapi.GetThresholdMetRequest) (*pluginapi.ThresholdMetResponse, error) {
return &pluginapi.ThresholdMetResponse{
MetType: pluginapi.ThresholdMetType_SOFT_MET,
ThresholdValue: 0.8,
ObservedValue: 0.9,
ThresholdOperator: pluginapi.ThresholdOperator_GREATER_THAN,
EvictionScope: "plugin3_scope",
GracePeriodSeconds: -1,
}, nil
}

func (p plugin3) GetTopEvictionPods(_ context.Context, _ *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) {
return &pluginapi.GetTopEvictionPodsResponse{TargetPods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
UID: "pod-3",
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
},
}}, nil
}

func (p plugin3) GetEvictPods(_ context.Context, _ *pluginapi.GetEvictPodsRequest) (*pluginapi.GetEvictPodsResponse, error) {
return &pluginapi.GetEvictPodsResponse{EvictPods: []*pluginapi.EvictPod{}}, nil
}

func makeEvictionManager(t *testing.T) *EvictionManger {
mgr, err := NewEvictionManager(&client.GenericClientSet{}, nil, makeMetaServer(), metrics.DummyMetrics{}, makeConf())
assert.NoError(t, err)
mgr.endpoints = map[string]endpointpkg.Endpoint{
"plugin1": &plugin1{},
"plugin2": &plugin2{},
"plugin3": &plugin3{},
}

return mgr
Expand All @@ -258,6 +293,7 @@ func TestEvictionManger_collectEvictionResult(t *testing.T) {
dryrun: []string{},
wantSoftEvictPods: sets.String{
"pod-1": sets.Empty{},
"pod-3": sets.Empty{},
"pod-5": sets.Empty{},
},
wantForceEvictPods: sets.String{
Expand All @@ -269,9 +305,11 @@ func TestEvictionManger_collectEvictionResult(t *testing.T) {
},
},
{
name: "dryrun plugin1",
dryrun: []string{"plugin1"},
wantSoftEvictPods: sets.String{},
name: "dryrun plugin1",
dryrun: []string{"plugin1"},
wantSoftEvictPods: sets.String{
"pod-3": sets.Empty{},
},
wantForceEvictPods: sets.String{
"pod-3": sets.Empty{},
},
Expand All @@ -284,6 +322,7 @@ func TestEvictionManger_collectEvictionResult(t *testing.T) {
dryrun: []string{"plugin2"},
wantSoftEvictPods: sets.String{
"pod-1": sets.Empty{},
"pod-3": sets.Empty{},
"pod-5": sets.Empty{},
},
wantForceEvictPods: sets.String{
Expand All @@ -292,9 +331,11 @@ func TestEvictionManger_collectEvictionResult(t *testing.T) {
wantConditions: sets.String{},
},
{
name: "dryrun plugin1 & plugin2",
dryrun: []string{"plugin1", "plugin2"},
wantSoftEvictPods: sets.String{},
name: "dryrun plugin1 & plugin2",
dryrun: []string{"plugin1", "plugin2"},
wantSoftEvictPods: sets.String{
"pod-3": sets.Empty{},
},
wantForceEvictPods: sets.String{},
wantConditions: sets.String{},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func NewReclaimedNumaResourcesEvictionPlugin(_ *client.GenericClientSet, _ event
}
}

reclaimedSoftThresholdGetter := func(resourceName v1.ResourceName) *float64 {
return nil
}

deletionGracePeriodGetter := func() int64 {
return conf.GetDynamicConfiguration().ReclaimedResourcesEvictionConfiguration.DeletionGracePeriod
}
Expand All @@ -66,6 +70,8 @@ func NewReclaimedNumaResourcesEvictionPlugin(_ *client.GenericClientSet, _ event
emitter,
PodNUMARequestResourcesGetter,
reclaimedThresholdGetter,
reclaimedSoftThresholdGetter,
nil,
deletionGracePeriodGetter,
thresholdMetToleranceDurationGetter,
conf.SkipZeroQuantityResourceNames,
Expand Down
Loading