Skip to content

Reduce Logging noise #678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 98 additions & 30 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
@@ -24,8 +24,10 @@ import (
"math"
"math/rand"
"reflect"
"regexp"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"time"
@@ -64,6 +66,13 @@ import (
// defaultBackoffTime is the default backoff time in seconds
const defaultBackoffTime = 20

var (
// Regex variables for extracting resource values
cpuRegex = regexp.MustCompile(`cpu (\d+(\.\d+)?)`)
memoryRegex = regexp.MustCompile(`memory (\d+(\.\d+)?)`)
gpuRegex = regexp.MustCompile(`GPU (\d+)`)
)

// XController the AppWrapper Controller type
type XController struct {
// MCAD configuration
@@ -216,7 +225,7 @@ func (qjm *XController) allocatableCapacity() *clusterstateapi.Resource {
}
}
}
klog.Infof("[allocatableCapacity] The available capacity to dispatch appwrapper is %v and time took to calculate is %v", capacity, time.Since(startTime))
klog.V(2).Infof("[allocatableCapacity] The available capacity to dispatch appwrapper is %v and time took to calculate is %v", capacity, time.Since(startTime))
return capacity
}

@@ -445,7 +454,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {
} else {
// Only back-off AWs that are in state running and not in state Failed
if updateNewJob.Status.State != arbv1.AppWrapperStateFailed {
klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name)
klog.V(4).Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name)
qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message))
}
}
@@ -921,6 +930,63 @@ func (qjm *XController) chooseAgent(ctx context.Context, qj *arbv1.AppWrapper) s
return ""
}

// Function for calculating required resources to run AppWrapper and logging if there is a required increase for resources.
func calculateRequiredResources(namespace string, appwrapperName string, requestedResources string, totalAvailable string) {
// Requested resources values //
requestedCPU := cpuRegex.FindStringSubmatch(requestedResources)
requestedMemory := memoryRegex.FindStringSubmatch(requestedResources)
requestedGPU := gpuRegex.FindStringSubmatch(requestedResources)

// Available resources values //
availableCPU := cpuRegex.FindStringSubmatch(totalAvailable)
availableMemory := memoryRegex.FindStringSubmatch(totalAvailable)
availableGPU := gpuRegex.FindStringSubmatch(totalAvailable)

// Convert values to float //
requestedCPUValue, err := strconv.ParseFloat(requestedCPU[1], 64)
if err != nil {
klog.Warningf("Error parsing CPU value: ", err)
}
availableCPUValue, err := strconv.ParseFloat(availableCPU[1], 64)
if err != nil {
klog.Warningf("Error parsing CPU value: ", err)
}
requestedMemoryValue, err := strconv.ParseFloat(requestedMemory[1], 64)
if err != nil {
klog.Warningf("Error parsing Memory value: ", err)
}
availableMemoryValue, err := strconv.ParseFloat(availableMemory[1], 64)
if err != nil {
klog.Warningf("Error parsing Memory value: ", err)
}
requestedGPUValue, err := strconv.ParseFloat(requestedGPU[1], 64)
if err != nil {
klog.Warningf("Error parsing GPU value: ", err)
}
availableGPUValue, err := strconv.ParseFloat(availableGPU[1], 64)
if err != nil {
klog.Warningf("Error parsing GPU value: ", err)
}

// Get required resources //
requiredCPU := requestedCPUValue - availableCPUValue
if requiredCPU < availableCPUValue {
requiredCPU = 0
}
requiredMemory := requestedMemoryValue - availableMemoryValue
if requiredMemory < availableMemoryValue {
requiredMemory = 0
}
requiredGPU := requestedGPUValue - availableGPUValue
if requiredGPU < availableGPUValue {
requiredGPU = 0
}
if requiredCPU != 0 || requiredMemory != 0 || requiredGPU != 0 {
// This is important information to know hence no log level
klog.Infof("[ScheduleNext] Appwrapper '%s/%s' requires additional resources CPU: %f, Memory: %f, GPU: %f", namespace, appwrapperName, requiredCPU, requiredMemory, requiredGPU)
}
}

// Thread to find queue-job(QJ) for next schedule
func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
ctx := context.Background()
@@ -937,7 +1003,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
scheduleNextRetrier.SetJitter(0.05)
// Retry the execution
err = scheduleNextRetrier.Run(func() error {
klog.Infof("[ScheduleNext] activeQ.Pop %s/%s *Delay=%.6f seconds RemainingLength=%d &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj,
klog.V(6).Infof("[ScheduleNext] activeQ.Pop %s/%s *Delay=%.6f seconds RemainingLength=%d Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(),
qj.ResourceVersion, qj.Status)

apiCacheAWJob, retryErr := qjm.getAppWrapper(qj.Namespace, qj.Name, "[ScheduleNext] -- get fresh copy after queue pop")
@@ -995,7 +1061,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
klog.V(3).Infof("[ScheduleNext] Cannot pop QueueJob from qjqueue! err=%#v", retryErr)
return err
}
klog.V(3).Infof("[ScheduleNext] activeQ.Pop_afterPriorityUpdate %s/%s *Delay=%.6f seconds RemainingLength=%d &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj, qj.ResourceVersion, qj.Status)
klog.V(4).Infof("[ScheduleNext] activeQ.Pop_afterPriorityUpdate %s/%s *Delay=%.6f seconds RemainingLength=%d Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj.ResourceVersion, qj.Status)
apiCacheAWJob, retryErr := qjm.getAppWrapper(qj.Namespace, qj.Name, "[ScheduleNext] -- after dynamic priority pop")
if retryErr != nil {
if apierrors.IsNotFound(retryErr) {
@@ -1005,7 +1071,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
return err
}
if apiCacheAWJob.Status.CanRun {
klog.Infof("[ScheduleNext] AppWrapper job: %s/%s from API is already scheduled. Ignoring request: Status=%+v", qj.Namespace, qj.Name, qj.Status)
klog.V(4).Infof("[ScheduleNext] AppWrapper job: %s/%s from API is already scheduled. Ignoring request: Status=%+v", qj.Namespace, qj.Name, qj.Status)
return nil
}
apiCacheAWJob.DeepCopyInto(qj)
@@ -1030,9 +1096,9 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {

klog.V(4).Infof("[ScheduleNext] after Pop qjqLength=%d qj %s/%s Version=%s activeQ=%t Unsched=%t Status=%v", qjm.qjqueue.Length(), qj.Namespace, qj.Name, qj.ResourceVersion, qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.Status)
if qjm.isDispatcher {
klog.Infof("[ScheduleNext] [Dispatcher Mode] Attempting to dispatch next appwrapper: '%s/%s Status=%v", qj.Namespace, qj.Name, qj.Status)
klog.V(4).Infof("[ScheduleNext] [Dispatcher Mode] Attempting to dispatch next appwrapper: '%s/%s Status=%v", qj.Namespace, qj.Name, qj.Status)
} else {
klog.Infof("[ScheduleNext] [Agent Mode] Attempting to dispatch next appwrapper: '%s/%s' Status=%v", qj.Namespace, qj.Name, qj.Status)
klog.V(4).Infof("[ScheduleNext] [Agent Mode] Attempting to dispatch next appwrapper: '%s/%s' Status=%v", qj.Namespace, qj.Name, qj.Status)
}

dispatchFailedReason := "AppWrapperNotRunnable."
@@ -1110,6 +1176,8 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
resources, proposedPreemptions := qjm.getAggregatedAvailableResourcesPriority(
unallocatedResources, priorityindex, qj, "")
klog.Infof("[ScheduleNext] [Agent Mode] Appwrapper '%s/%s' with resources %v to be scheduled on aggregated idle resources %v", qj.Namespace, qj.Name, aggqj, resources)
// call calculateRequiredResources to log the remaining required resources for the app wrapper
calculateRequiredResources(qj.Namespace, qj.Name, aggqj.String(), resources.String())

// Jobs dispatched with quota management may be borrowing quota from other tree nodes making those jobs preemptable, regardless of their priority.
// Cluster resources need to be considered to determine if both quota and resources (after deleting borrowing AppWrappers) are availabe for the new AppWrapper
@@ -1167,15 +1235,15 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
klog.Info("%s %s %s", quotaFits, preemptAWs, msg)

if quotaFits {
klog.Infof("[ScheduleNext] [Agent mode] quota evaluation successful for app wrapper '%s/%s' activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v",
qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status)
klog.V(4).Infof("[ScheduleNext] [Agent mode] quota evaluation successful for app wrapper '%s/%s' activeQ=%t Unsched=%t Version=%s Status=%+v",
qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status)
// Set any jobs that are marked for preemption
qjm.preemptAWJobs(ctx, preemptAWs)
} else { // Not enough free quota to dispatch appwrapper
dispatchFailedMessage = "Insufficient quota and/or resources to dispatch AppWrapper."
dispatchFailedReason = "quota limit exceeded"
klog.Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s",
qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg)
klog.Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t Version=%s Status=%+v msg=%s",
qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status, msg)
// Call update etcd here to retrigger AW execution for failed quota
// TODO: quota management tests fail if this is converted into go-routine, need to inspect why?
qjm.backoff(context.Background(), qj, dispatchFailedReason, dispatchFailedMessage)
@@ -1188,19 +1256,19 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
fits = false
}
} else {
klog.V(4).Infof("[ScheduleNext] [Agent Mode] Quota evaluation not enabled for '%s/%s' at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Namespace,
qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status)
klog.V(4).Infof("[ScheduleNext] [Agent Mode] Quota evaluation not enabled for '%s/%s' at %s activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace,
qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status)

if aggqj.LessEqual(resources) { // Check if enough resources to dispatch
fits = true
klog.Infof("[ScheduleNext] [Agent Mode] available resource successful check for '%s/%s' at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v.",
qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status)
klog.V(4).Infof("[ScheduleNext] [Agent Mode] available resource successful check for '%s/%s' at %s activeQ=%t Unsched=%t Version=%s Status=%+v.",
qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status)
} else { // Not enough free resources to dispatch HOL
fits = false
dispatchFailedMessage = "Insufficient resources to dispatch AppWrapper."
klog.Infof("[ScheduleNext] [Agent Mode] Failed to dispatch app wrapper '%s/%s' due to insufficient resources, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v",
klog.Infof("[ScheduleNext] [Agent Mode] Failed to dispatch app wrapper '%s/%s' due to insufficient resources, activeQ=%t Unsched=%t Version=%s Status=%+v",
qj.Namespace, qj.Name, qjm.qjqueue.IfExistActiveQ(qj),
qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status)
qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status)
// TODO: Remove forwarded logic as a big AW will never be forwarded
forwarded = true
// should we call backoff or update etcd?
@@ -1262,8 +1330,8 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
fowardingLoopCount += 1
}
if !forwarded { // start thread to backoff
klog.Infof("[ScheduleNext] [Agent Mode] backing off app wrapper '%s/%s' after waiting for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v",
qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status)
klog.Infof("[ScheduleNext] [Agent Mode] backing off app wrapper '%s/%s' after waiting for %s activeQ=%t Unsched=%t Version=%s Status=%+v",
qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status)
if qjm.quotaManager != nil && quotaFits {
qjm.quotaManager.Release(qj)
}
@@ -1401,8 +1469,8 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason
klog.Errorf("[backoff] Failed to update status for %s/%s. Continuing with possible stale object without updating conditions. err=%s", q.Namespace, q.Name, err)
}
qjm.qjqueue.AddUnschedulableIfNotPresent(q)
klog.V(3).Infof("[backoff] %s/%s move to unschedulableQ before sleep for %d seconds. activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", q.Namespace, q.Name,
qjm.config.BackoffTimeOrDefault(defaultBackoffTime), qjm.qjqueue.IfExistActiveQ(q), qjm.qjqueue.IfExistUnschedulableQ(q), q, q.ResourceVersion, q.Status)
klog.V(4).Infof("[backoff] %s/%s move to unschedulableQ before sleep for %d seconds. activeQ=%t Unsched=%t Version=%s Status=%+v", q.Namespace, q.Name,
qjm.config.BackoffTimeOrDefault(defaultBackoffTime), qjm.qjqueue.IfExistActiveQ(q), qjm.qjqueue.IfExistUnschedulableQ(q), q.ResourceVersion, q.Status)
time.Sleep(time.Duration(qjm.config.BackoffTimeOrDefault(defaultBackoffTime)) * time.Second)
qjm.qjqueue.MoveToActiveQueueIfExists(q)

@@ -1447,7 +1515,7 @@ func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) {
// TODO: should we really return?
return
}
klog.V(6).Infof("[UpdateQueueJobs] %s/%s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Namespace, newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status)
klog.V(6).Infof("[UpdateQueueJobs] %s/%s: qjqueue=%t Version=%s Status=%+v", newjob.Namespace, newjob.Name, qjm.qjqueue.IfExist(newjob), newjob.ResourceVersion, newjob.Status)
// set appwrapper status to Complete or RunningHoldCompletion
derivedAwStatus := qjm.getAppWrapperCompletionStatus(newjob)

@@ -1509,7 +1577,7 @@ func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) {
qjm.eventQueue.Delete(updateQj)
qjm.qjqueue.Delete(updateQj)
}
klog.Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion,
klog.V(4).Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion,
newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed)
}
}
@@ -1546,7 +1614,7 @@ func (cc *XController) addQueueJob(obj interface{}) {
klog.V(6).Infof("[Informer-addQJ] %s/%s Delay=%.6f seconds CreationTimestamp=%s ControllerFirstTimestamp=%s",
qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.CreationTimestamp, qj.Status.ControllerFirstTimestamp)

klog.V(6).Infof("[Informer-addQJ] enqueue %s/%s &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, qj, qj.ResourceVersion, qj.Status)
klog.V(6).Infof("[Informer-addQJ] enqueue %s/%s Version=%s Status=%+v", qj.Namespace, qj.Name, qj.ResourceVersion, qj.Status)

// Requeue the item to be processed again in 30 seconds.
// TODO: tune the frequency of reprocessing an AW
@@ -1759,7 +1827,7 @@ func (cc *XController) agentEventQueueWorker() {

return nil
}
klog.V(3).Infof("[Controller: Dispatcher Mode] XQJ Status Update from AGENT: Name:%s, Namespace:%s, Status: %+v\n", queuejob.Name, queuejob.Namespace, queuejob.Status)
klog.V(4).Infof("[Controller: Dispatcher Mode] XQJ Status Update from AGENT: Name:%s, Namespace:%s, Status: %+v\n", queuejob.Name, queuejob.Namespace, queuejob.Status)

// sync AppWrapper
if err := cc.updateQueueJobStatus(ctx, queuejob); err != nil {
@@ -1849,7 +1917,7 @@ func (cc *XController) worker() {
}

queuejob.Status.State = arbv1.AppWrapperStateEnqueued
klog.V(10).Infof("[worker] before add to activeQ %s/%s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", queuejob.Namespace, queuejob.Name, cc.qjqueue.IfExistActiveQ(queuejob), cc.qjqueue.IfExistUnschedulableQ(queuejob), queuejob, queuejob.ResourceVersion, queuejob.Status)
klog.V(10).Infof("[worker] before add to activeQ %s/%s activeQ=%t Unsched=%t Version=%s Status=%+v", queuejob.Namespace, queuejob.Name, cc.qjqueue.IfExistActiveQ(queuejob), cc.qjqueue.IfExistUnschedulableQ(queuejob), queuejob.ResourceVersion, queuejob.Status)
index := getIndexOfMatchedCondition(queuejob, arbv1.AppWrapperCondQueueing, "AwaitingHeadOfLine")
if index < 0 {
queuejob.Status.QueueJobState = arbv1.AppWrapperCondQueueing
@@ -1947,7 +2015,7 @@ func (cc *XController) syncQueueJob(ctx context.Context, qj *arbv1.AppWrapper) e
klog.Errorf("[syncQueueJob] Error updating pod status counts for AppWrapper job: %s/%s, err=%+v", qj.Namespace, qj.Name, err)
return err
}
klog.Infof("[syncQueueJob] Pod counts updated for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", awNew.Namespace, awNew.Name, awNew.ResourceVersion,
klog.V(4).Infof("[syncQueueJob] Pod counts updated for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", awNew.Namespace, awNew.Name, awNew.ResourceVersion,
awNew.Status.CanRun, awNew.Status.State, awNew.Status.Pending, awNew.Status.Running, awNew.Status.Succeeded, awNew.Status.Failed)

// Update etcd conditions if AppWrapper Job has at least 1 running pod and transitioning from dispatched to running.
@@ -2080,7 +2148,7 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper,
}
return nil
} else if qj.Status.CanRun && qj.Status.State == arbv1.AppWrapperStateActive {
klog.Infof("[manageQueueJob] Getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", qj.Namespace, qj.Name, qj.ResourceVersion,
klog.V(4).Infof("[manageQueueJob] Getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", qj.Namespace, qj.Name, qj.ResourceVersion,
qj.Status.CanRun, qj.Status.State, qj.Status.Pending, qj.Status.Running, qj.Status.Succeeded, qj.Status.Failed)

} else if podPhaseChanges { // Continued bug fix
@@ -2106,9 +2174,9 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper,

qj.Status.State = arbv1.AppWrapperStateEnqueued
if cc.qjqueue.IfExistUnschedulableQ(qj) {
klog.V(10).Infof("[manageQueueJob] [Dispatcher] leaving '%s/%s' to qjqueue.UnschedulableQ activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status)
klog.V(10).Infof("[manageQueueJob] [Dispatcher] leaving '%s/%s' to qjqueue.UnschedulableQ activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status)
} else {
klog.V(10).Infof("[manageQueueJob] [Dispatcher] before add to activeQ '%s/%s' activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status)
klog.V(10).Infof("[manageQueueJob] [Dispatcher] before add to activeQ '%s/%s' activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status)
qj.Status.QueueJobState = arbv1.AppWrapperCondQueueing
qj.Status.FilterIgnore = true // Update Queueing status, add to qjqueue for ScheduleNext
err := cc.updateStatusInEtcdWithRetry(ctx, qj, "manageQueueJob - setQueueing")
Original file line number Diff line number Diff line change
@@ -193,9 +193,9 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG
return name, gvk, err
}

//SyncQueueJob uses dynamic clients to unwrap (spawn) items inside genericItems block, it is used to create resources inside etcd and return errors when
//unwrapping fails.
//More context here: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/598
// SyncQueueJob uses dynamic clients to unwrap (spawn) items inside genericItems block, it is used to create resources inside etcd and return errors when
// unwrapping fails.
// More context here: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/598
func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperGenericResource) (podList []*v1.Pod, err error) {
startTime := time.Now()
defer func() {
@@ -329,7 +329,6 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra
if errors.IsAlreadyExists(err) {
klog.V(4).Infof("%v\n", err.Error())
} else {
klog.Errorf("Error creating the object `%v`, the error is `%v`", newName, errors.ReasonForError(err))
return []*v1.Pod{}, err
}
}
@@ -437,7 +436,7 @@ func hasFields(obj runtime.RawExtension) (hasFields bool, replica float64, conta

containerList, isFound, _ := unstructured.NestedSlice(subspec, "containers")
if !isFound {
klog.Warningf("[hasFields] No containers field found in raw object: %#v", subspec)
klog.V(6).Infof("[hasFields] No containers field found in raw object: %#v", subspec)
return false, 0, nil
}
objContainers := make([]v1.Container, len(containerList))