@@ -45,6 +45,7 @@ import (
4545 "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/plugin/resource"
4646 "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/plugin/rootfs"
4747 "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/podkiller"
48+ "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/podnotifier"
4849 "github.com/kubewharf/katalyst-core/pkg/agent/evictionmanager/rule"
4950 "github.com/kubewharf/katalyst-core/pkg/client"
5051 "github.com/kubewharf/katalyst-core/pkg/client/control"
@@ -104,7 +105,8 @@ type EvictionManger struct {
104105 // easy to test the code.
105106 clock clocks.WithTickerAndDelayedExecution
106107
107- podKiller podkiller.PodKiller
108+ podNotifier podnotifier.PodNotifier
109+ podKiller podkiller.PodKiller
108110
109111 killQueue rule.EvictionQueue
110112 killStrategy rule.EvictionStrategy
@@ -171,6 +173,12 @@ func NewEvictionManager(genericClient *client.GenericClientSet, recorder events.
171173
172174 podKiller := podkiller .NewAsynchronizedPodKiller (killer , metaServer .PodFetcher , genericClient .KubeClient )
173175
176+ notifier , err := podnotifier .NewHostPathPodNotifier (conf , genericClient .KubeClient , metaServer , recorder , emitter )
177+ if err != nil {
178+ return nil , fmt .Errorf ("failed to create pod notifier: %v" , err )
179+ }
180+ podNotifier := podnotifier .NewSynchronizedPodNotifier (notifier )
181+
174182 cnrTaintReporter , err := control .NewGenericReporterPlugin (cnrTaintReporterPluginName , conf , emitter )
175183 if err != nil {
176184 return nil , fmt .Errorf ("failed to initialize cnr taint reporter plugin: %v" , err )
@@ -183,6 +191,7 @@ func NewEvictionManager(genericClient *client.GenericClientSet, recorder events.
183191 metaGetter : metaServer ,
184192 emitter : emitter ,
185193 podKiller : podKiller ,
194+ podNotifier : podNotifier ,
186195 cnrTaintReporter : cnrTaintReporter ,
187196 endpoints : make (map [string ]endpointpkg.Endpoint ),
188197 conf : conf ,
@@ -235,6 +244,7 @@ func (m *EvictionManger) Run(ctx context.Context) {
235244 general .RegisterHeartbeatCheck (reportTaintHealthCheckName , reportTaintToleration ,
236245 general .HealthzCheckStateNotReady , reportTaintToleration )
237246 m .podKiller .Start (ctx )
247+ m .podNotifier .Start (ctx )
238248 for _ , endpoint := range m .endpoints {
239249 endpoint .Start ()
240250 }
@@ -277,6 +287,11 @@ func (m *EvictionManger) sync(ctx context.Context) {
277287 }
278288
279289 errList := make ([]error , 0 )
290+ notifyErr := m .doNotify (collector .getSoftEvictPods ())
291+ if notifyErr != nil {
292+ errList = append (errList , notifyErr )
293+ }
294+
280295 evictErr := m .doEvict (collector .getSoftEvictPods (), collector .getForceEvictPods ())
281296 if evictErr != nil {
282297 errList = append (errList , evictErr )
@@ -342,8 +357,8 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) (*evictionRespCol
342357 m .conditionLock .Unlock ()
343358
344359 for pluginName , threshold := range thresholdsMet {
345- if threshold .MetType != pluginapi .ThresholdMetType_HARD_MET {
346- general .Infof (" the type: %s of met threshold from plugin: %s isn't %s " , threshold . MetType . String (), pluginName , pluginapi . ThresholdMetType_HARD_MET . String () )
360+ if threshold .MetType == pluginapi .ThresholdMetType_NOT_MET {
361+ general .Infof ("resp from plugin: %s not met threshold " , pluginName )
347362 continue
348363 }
349364
@@ -352,12 +367,18 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) (*evictionRespCol
352367 general .Errorf (" pluginName points to nil endpoint, can't handle threshold from it" )
353368 }
354369
370+ topN := uint64 (0 )
371+ forceEvict := false
372+ if threshold .MetType == pluginapi .ThresholdMetType_HARD_MET {
373+ topN = 1
374+ forceEvict = true
375+ }
376+
355377 resp , err := m .endpoints [pluginName ].GetTopEvictionPods (context .Background (), & pluginapi.GetTopEvictionPodsRequest {
356378 ActivePods : pods ,
357- TopN : 1 ,
379+ TopN : topN ,
358380 EvictionScope : threshold .EvictionScope ,
359381 })
360-
361382 m .endpointLock .RUnlock ()
362383 if err != nil {
363384 general .Errorf (" calling GetTopEvictionPods of plugin: %s failed with error: %v" , pluginName , err )
@@ -371,12 +392,40 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) (*evictionRespCol
371392 continue
372393 }
373394
374- collector .collectTopEvictionPods (dynamicConfig .DryRun , pluginName , threshold , resp )
395+ if forceEvict {
396+ collector .collectTopEvictionPods (dynamicConfig .DryRun , pluginName , threshold , resp )
397+ } else {
398+ collector .collectTopSoftEvictionPods (dynamicConfig .DryRun , pluginName , threshold , resp )
399+ }
400+
375401 }
376402
377403 return collector , errors .NewAggregate (errList )
378404}
379405
406+ func (m * EvictionManger ) doNotify (softEvictPods map [string ]* rule.RuledEvictPod ) error {
407+ errList := make ([]error , 0 )
408+
409+ for _ , pod := range softEvictPods {
410+ if pod == nil || pod .EvictPod .Pod == nil {
411+ continue
412+ }
413+
414+ // TODO
415+ // if _, ok := pod.EvictPod.Pod.Annotations[constapi.PodAnnotationSoftEvictNotificationKey]; !ok {
416+ if _ , ok := pod .EvictPod .Pod .Annotations ["katalyst.kubewharf.io/soft_evict_notify" ]; ! ok {
417+ continue
418+ }
419+
420+ err := m .podNotifier .NotifyPod (pod )
421+ if err != nil {
422+ errList = append (errList , err )
423+ }
424+ }
425+
426+ return errors .NewAggregate (errList )
427+ }
428+
380429func (m * EvictionManger ) doEvict (softEvictPods , forceEvictPods map [string ]* rule.RuledEvictPod ) error {
381430 softEvictPods = filterOutCandidatePodsWithForcePods (softEvictPods , forceEvictPods )
382431 bestSuitedCandidate := m .getEvictPodFromCandidates (softEvictPods )
0 commit comments