Skip to content

Commit 5ea084b

Browse files
committed
simplify cache
1 parent f34e1f5 commit 5ea084b

File tree

1 file changed

+51
-10
lines changed

1 file changed

+51
-10
lines changed

pkg/controller/queuejob/queuejob_controller_ex.go

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ type XController struct {
108108
// QJ queue that needs to be allocated
109109
qjqueue SchedulingQueue
110110

111+
//TODO: Do we need this local cache?
111112
// our own local cache, used for computing total amount of resources
112113
cache clusterstatecache.Cache
113114

@@ -154,6 +155,46 @@ func GetQueueJobKey(obj interface{}) (string, error) {
154155
return fmt.Sprintf("%s/%s", qj.Namespace, qj.Name), nil
155156
}
156157

158+
//allocatableCapacity calculates the capacity available on each node by substracting resources
159+
//consumed by existing pods.
160+
//For a large cluster with thousands of nodes and hundreds of thousands of pods this
161+
//method could be a performance bottleneck
162+
//We can then move this method to a seperate thread that basically runs every X interval and
163+
//provides resources available to the next AW that needs to be dispatched.
164+
//Obviously the thread would need locking and timer to expire cache.
165+
//May be move to controller runtime can help.
166+
func (qjm *XController) allocatableCapacity() *clusterstateapi.Resource {
167+
capacity := clusterstateapi.EmptyResource()
168+
nodes, _ := qjm.clients.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
169+
startTime := time.Now()
170+
for _, node := range nodes.Items {
171+
// skip unschedulable nodes
172+
if node.Spec.Unschedulable {
173+
continue
174+
}
175+
nodeResource := clusterstateapi.NewResource(node.Status.Allocatable)
176+
capacity.Add(nodeResource)
177+
var specNodeName = "spec.nodeName"
178+
labelSelector := fmt.Sprintf("%s=%s", specNodeName, node.Name)
179+
podList, err := qjm.clients.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{FieldSelector: labelSelector})
180+
//TODO: when no pods are listed, do we send entire node capacity as available
181+
//this will cause false positive dispatch.
182+
if err != nil {
183+
klog.Errorf("[allocatableCapacity] Error listing pods %v", err)
184+
}
185+
for _, pod := range podList.Items {
186+
if _, ok := pod.GetLabels()["appwrappers.mcad.ibm.com"]; !ok && pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded {
187+
for _, container := range pod.Spec.Containers {
188+
usedResource := clusterstateapi.NewResource(container.Resources.Requests)
189+
capacity.Sub(usedResource)
190+
}
191+
}
192+
}
193+
}
194+
klog.Info("[allocatableCapacity] The avaible capacity to dispatch appwrapper is %v and time took to calculate is %v", capacity, time.Now().Sub(startTime))
195+
return capacity
196+
}
197+
157198
// NewJobController create new AppWrapper Controller
158199
func NewJobController(config *rest.Config, serverOption *options.ServerOption) *XController {
159200
cc := &XController{
@@ -166,8 +207,9 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
166207
initQueue: cache.NewFIFO(GetQueueJobKey),
167208
updateQueue: cache.NewFIFO(GetQueueJobKey),
168209
qjqueue: NewSchedulingQueue(),
169-
cache: clusterstatecache.New(config),
170-
schedulingAW: nil,
210+
//TODO: do we still need cache to be initialized?
211+
cache: clusterstatecache.New(config),
212+
schedulingAW: nil,
171213
}
172214
//TODO: work on enabling metrics adapter for correct MCAD mode
173215
//metrics adapter is implemented through dynamic client which looks at all the
@@ -1098,26 +1140,24 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
10981140
if qjm.serverOption.DynamicPriority {
10991141
priorityindex = -math.MaxFloat64
11001142
}
1101-
//cache.go updatestate method fails resulting in empty resource object
1102-
//cache upate failure costly, as it will put current AW in backoff queue plus take another dispatch cycle
1103-
//In worst case the cache update could fail for subsequent dispatche cycles causing test cases to fail or AW never getting dispatched
1104-
//To avoid non-determinism below code is workaround. this should be issue should be fixed: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/550
1143+
//cache now is a method inside the controller.
1144+
//The reimplementation should fix issue : https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/550
11051145
var unallocatedResources = clusterstateapi.EmptyResource()
1106-
unallocatedResources = qjm.cache.GetUnallocatedResources()
1146+
unallocatedResources = qjm.allocatableCapacity()
11071147
for unallocatedResources.IsEmpty() {
1108-
unallocatedResources.Add(qjm.cache.GetUnallocatedResources())
1148+
unallocatedResources.Add(qjm.allocatableCapacity())
11091149
if !unallocatedResources.IsEmpty() {
11101150
break
11111151
}
11121152
}
1113-
11141153
resources, proposedPreemptions := qjm.getAggregatedAvailableResourcesPriority(
11151154
unallocatedResources, priorityindex, qj, "")
11161155
klog.Infof("[ScheduleNext] [Agent Mode] Appwrapper '%s/%s' with resources %v to be scheduled on aggregated idle resources %v", qj.Namespace, qj.Name, aggqj, resources)
11171156

11181157
// Assume preemption will remove low priroity AWs in the system, optimistically dispatch such AWs
11191158

11201159
if aggqj.LessEqual(resources) {
1160+
//TODO: should we turn-off histograms?
11211161
unallocatedHistogramMap := qjm.cache.GetUnallocatedHistograms()
11221162
if !qjm.nodeChecks(unallocatedHistogramMap, qj) {
11231163
klog.Infof("[ScheduleNext] [Agent Mode] Optimistic dispatch for AW '%s/%s' requesting aggregated resources %v histogram for point in-time fragmented resources are available in the cluster %s",
@@ -1424,8 +1464,9 @@ func (cc *XController) Run(stopCh <-chan struct{}) {
14241464

14251465
cache.WaitForCacheSync(stopCh, cc.appWrapperSynced)
14261466

1467+
//TODO: do we still need to run cache every second?
14271468
// update snapshot of ClientStateCache every second
1428-
cc.cache.Run(stopCh)
1469+
//cc.cache.Run(stopCh)
14291470

14301471
// start preempt thread is used to preempt AWs that have partial pods or have reached dispatch duration
14311472
go wait.Until(cc.PreemptQueueJobs, 60*time.Second, stopCh)

0 commit comments

Comments
 (0)