@@ -22,17 +22,14 @@ import (
2222 "errors"
2323 "math"
2424 "strconv"
25- "time"
2625
2726 "go.uber.org/zap"
2827 v1 "k8s.io/api/core/v1"
2928 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3029 "k8s.io/apimachinery/pkg/labels"
3130 "k8s.io/apimachinery/pkg/types"
3231 "k8s.io/apimachinery/pkg/util/sets"
33- "k8s.io/apimachinery/pkg/util/wait"
3432 corev1 "k8s.io/client-go/listers/core/v1"
35-
3633 "knative.dev/pkg/logging"
3734
3835 "knative.dev/eventing/pkg/scheduler"
@@ -42,7 +39,7 @@ type StateAccessor interface {
4239 // State returns the current state (snapshot) about placed vpods
4340 // Take into account reserved vreplicas and update `reserved` to reflect
4441 // the current state.
45- State (reserved map [types.NamespacedName ]map [string ]int32 ) (* State , error )
42+ State (ctx context. Context , reserved map [types.NamespacedName ]map [string ]int32 ) (* State , error )
4643}
4744
4845// state provides information about the current scheduling of all vpods
@@ -152,8 +149,6 @@ func (s *State) IsSchedulablePod(ordinal int32) bool {
152149
153150// stateBuilder reconstruct the state from scratch, by listing vpods
154151type stateBuilder struct {
155- ctx context.Context
156- logger * zap.SugaredLogger
157152 vpodLister scheduler.VPodLister
158153 capacity int32
159154 schedulerPolicy scheduler.SchedulerPolicyType
@@ -166,11 +161,9 @@ type stateBuilder struct {
166161}
167162
168163// NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested
169- func NewStateBuilder (ctx context. Context , namespace , sfsname string , lister scheduler.VPodLister , podCapacity int32 , schedulerPolicy scheduler.SchedulerPolicyType , schedPolicy * scheduler. SchedulerPolicy , deschedPolicy * scheduler.SchedulerPolicy , podlister corev1.PodNamespaceLister , nodeLister corev1.NodeLister , statefulSetCache * scheduler.ScaleCache ) StateAccessor {
164+ func NewStateBuilder (sfsname string , lister scheduler.VPodLister , podCapacity int32 , schedulerPolicy scheduler.SchedulerPolicyType , schedPolicy , deschedPolicy * scheduler.SchedulerPolicy , podlister corev1.PodNamespaceLister , nodeLister corev1.NodeLister , statefulSetCache * scheduler.ScaleCache ) StateAccessor {
170165
171166 return & stateBuilder {
172- ctx : ctx ,
173- logger : logging .FromContext (ctx ),
174167 vpodLister : lister ,
175168 capacity : podCapacity ,
176169 schedulerPolicy : schedulerPolicy ,
@@ -183,15 +176,18 @@ func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister sche
183176 }
184177}
185178
186- func (s * stateBuilder ) State (reserved map [types.NamespacedName ]map [string ]int32 ) (* State , error ) {
179+ func (s * stateBuilder ) State (ctx context. Context , reserved map [types.NamespacedName ]map [string ]int32 ) (* State , error ) {
187180 vpods , err := s .vpodLister ()
188181 if err != nil {
189182 return nil , err
190183 }
191184
192- scale , err := s .statefulSetCache .GetScale (s .ctx , s .statefulSetName , metav1.GetOptions {})
185+ logger := logging .FromContext (ctx ).With ("subcomponent" , "statebuilder" )
186+ ctx = logging .WithLogger (ctx , logger )
187+
188+ scale , err := s .statefulSetCache .GetScale (ctx , s .statefulSetName , metav1.GetOptions {})
193189 if err != nil {
194- s . logger .Infow ("failed to get statefulset" , zap .Error (err ))
190+ logger .Infow ("failed to get statefulset" , zap .Error (err ))
195191 return nil , err
196192 }
197193
@@ -235,36 +231,35 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
235231 }
236232
237233 for podId := int32 (0 ); podId < scale .Spec .Replicas && s .podLister != nil ; podId ++ {
238- var pod * v1.Pod
239- wait .PollUntilContextTimeout (context .Background (), 50 * time .Millisecond , 5 * time .Second , true , func (ctx context.Context ) (bool , error ) {
240- pod , err = s .podLister .Get (PodNameFromOrdinal (s .statefulSetName , podId ))
241- return err == nil , nil
242- })
243-
244- if pod != nil {
245- if isPodUnschedulable (pod ) {
246- // Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod.
247- continue
248- }
249-
250- node , err := s .nodeLister .Get (pod .Spec .NodeName )
251- if err != nil {
252- return nil , err
253- }
234+ pod , err := s .podLister .Get (PodNameFromOrdinal (s .statefulSetName , podId ))
235+ if err != nil {
236+ logger .Warnw ("Failed to get pod" , zap .Int32 ("ordinal" , podId ), zap .Error (err ))
237+ continue
238+ }
239+ if isPodUnschedulable (pod ) {
240+ // Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod.
241+ logger .Debugw ("Pod is unschedulable" , zap .Any ("pod" , pod ))
242+ continue
243+ }
254244
255- if isNodeUnschedulable ( node ) {
256- // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node.
257- continue
258- }
245+ node , err := s . nodeLister . Get ( pod . Spec . NodeName )
246+ if err != nil {
247+ return nil , err
248+ }
259249
260- // Pod has no annotation or not annotated as unschedulable and
261- // not on an unschedulable node, so add to feasible
262- schedulablePods .Insert (podId )
250+ if isNodeUnschedulable (node ) {
251+ // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node.
252+ logger .Debugw ("Pod is on an unschedulable node" , zap .Any ("pod" , node ))
253+ continue
263254 }
255+
256+ // Pod has no annotation or not annotated as unschedulable and
257+ // not on an unschedulable node, so add to feasible
258+ schedulablePods .Insert (podId )
264259 }
265260
266261 for _ , p := range schedulablePods .List () {
267- free , last = s .updateFreeCapacity (free , last , PodNameFromOrdinal (s .statefulSetName , p ), 0 )
262+ free , last = s .updateFreeCapacity (logger , free , last , PodNameFromOrdinal (s .statefulSetName , p ), 0 )
268263 }
269264
270265 // Getting current state from existing placements for all vpods
@@ -286,15 +281,14 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
286281 // Account for reserved vreplicas
287282 vreplicas = withReserved (vpod .GetKey (), podName , vreplicas , reserved )
288283
289- free , last = s .updateFreeCapacity (free , last , podName , vreplicas )
284+ free , last = s .updateFreeCapacity (logger , free , last , podName , vreplicas )
290285
291286 withPlacement [vpod .GetKey ()][podName ] = true
292287
293- var pod * v1.Pod
294- wait .PollUntilContextTimeout (context .Background (), 50 * time .Millisecond , 5 * time .Second , true , func (ctx context.Context ) (bool , error ) {
295- pod , err = s .podLister .Get (podName )
296- return err == nil , nil
297- })
288+ pod , err := s .podLister .Get (podName )
289+ if err != nil {
290+ logger .Warnw ("Failed to get pod" , zap .String ("podName" , podName ), zap .Error (err ))
291+ }
298292
299293 if pod != nil && schedulablePods .Has (OrdinalFromPodName (pod .GetName ())) {
300294 nodeName := pod .Spec .NodeName //node name for this pod
@@ -315,11 +309,10 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
315309 continue
316310 }
317311
318- var pod * v1.Pod
319- wait .PollUntilContextTimeout (context .Background (), 50 * time .Millisecond , 5 * time .Second , true , func (ctx context.Context ) (bool , error ) {
320- pod , err = s .podLister .Get (podName )
321- return err == nil , nil
322- })
312+ pod , err := s .podLister .Get (podName )
313+ if err != nil {
314+ logger .Warnw ("Failed to get pod" , zap .String ("podName" , podName ), zap .Error (err ))
315+ }
323316
324317 if pod != nil && schedulablePods .Has (OrdinalFromPodName (pod .GetName ())) {
325318 nodeName := pod .Spec .NodeName //node name for this pod
@@ -330,15 +323,15 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
330323 }
331324 }
332325
333- free , last = s .updateFreeCapacity (free , last , podName , rvreplicas )
326+ free , last = s .updateFreeCapacity (logger , free , last , podName , rvreplicas )
334327 }
335328 }
336329
337330 state := & State {FreeCap : free , SchedulablePods : schedulablePods .List (), LastOrdinal : last , Capacity : s .capacity , Replicas : scale .Spec .Replicas , NumZones : int32 (len (zoneMap )), NumNodes : int32 (len (nodeToZoneMap )),
338331 SchedulerPolicy : s .schedulerPolicy , SchedPolicy : s .schedPolicy , DeschedPolicy : s .deschedPolicy , NodeToZoneMap : nodeToZoneMap , StatefulSetName : s .statefulSetName , PodLister : s .podLister ,
339332 PodSpread : podSpread , NodeSpread : nodeSpread , ZoneSpread : zoneSpread , Pending : pending , ExpectedVReplicaByVPod : expectedVReplicasByVPod }
340333
341- s . logger .Infow ("cluster state info" , zap .Any ("state" , state ), zap .Any ("reserved" , toJSONable (reserved )))
334+ logger .Infow ("cluster state info" , zap .Any ("state" , state ), zap .Any ("reserved" , toJSONable (reserved )))
342335
343336 return state , nil
344337}
@@ -350,7 +343,7 @@ func pendingFromVPod(vpod scheduler.VPod) int32 {
350343 return int32 (math .Max (float64 (0 ), float64 (expected - scheduled )))
351344}
352345
353- func (s * stateBuilder ) updateFreeCapacity (free []int32 , last int32 , podName string , vreplicas int32 ) ([]int32 , int32 ) {
346+ func (s * stateBuilder ) updateFreeCapacity (logger * zap. SugaredLogger , free []int32 , last int32 , podName string , vreplicas int32 ) ([]int32 , int32 ) {
354347 ordinal := OrdinalFromPodName (podName )
355348 free = grow (free , ordinal , s .capacity )
356349
@@ -359,7 +352,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri
359352 // Assert the pod is not overcommitted
360353 if free [ordinal ] < 0 {
361354 // This should not happen anymore. Log as an error but do not interrupt the current scheduling.
362- s . logger .Warnw ("pod is overcommitted" , zap .String ("podName" , podName ), zap .Int32 ("free" , free [ordinal ]))
355+ logger .Warnw ("pod is overcommitted" , zap .String ("podName" , podName ), zap .Int32 ("free" , free [ordinal ]))
363356 }
364357
365358 if ordinal > last {
0 commit comments