Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,42 +90,55 @@ func (t *NodeLatencyTracker) Process(autoscalingCtx *ca_context.AutoscalingConte
if t.wrapped != nil {
t.wrapped.Process(autoscalingCtx, status)
}

for _, unremovableNode := range status.UnremovableNodes {
nodeName := unremovableNode.Node.Name
if info, exists := t.unneededNodes[nodeName]; exists {
duration := time.Since(info.unneededSince)
metrics.UpdateScaleDownNodeRemovalLatency(false, duration)
klog.V(4).Infof("Node %q is unremovable, became needed again (unneeded for %s).", nodeName, duration)
delete(t.unneededNodes, nodeName)
}
t.recordAndCleanup(unremovableNode.Node.Name, false)
}
for _, scaledDownNode := range status.ScaledDownNodes {
nodeName := scaledDownNode.Node.Name
if info, exists := t.unneededNodes[nodeName]; exists {
duration := time.Since(info.unneededSince)
latency := duration - info.removalThreshold
metrics.UpdateScaleDownNodeRemovalLatency(true, latency)
if latency > scaleDownLatencyLogThreshold {
klog.V(2).Infof(
"Observing deletion for node %s, unneeded for %s (removal threshold was %s).",
nodeName, duration, info.removalThreshold,
)
} else {
klog.V(6).Infof(
"Observing deletion for node %s, unneeded for %s (removal threshold was %s).",
nodeName, duration, info.removalThreshold,
)
}
delete(t.unneededNodes, nodeName)
}
for _, node := range status.ScaledDownNodes {
t.recordAndCleanup(node.Node.Name, true)
}

if klog.V(6).Enabled() {
for nodeName := range t.unneededNodes {
klog.Infof("Node %q remains in unneeded list (not scaled down). Continuing to track latency.", nodeName)
}
}
}

// recordAndCleanup calculates the time a node spent in the "unneeded" state, updates
// relevant Prometheus metrics, and removes the node from internal tracking.
func (t *NodeLatencyTracker) recordAndCleanup(nodeName string, isRemoved bool) {
info, exists := t.unneededNodes[nodeName]
if !exists {
return
}
defer delete(t.unneededNodes, nodeName)

duration := time.Since(info.unneededSince)
latency := duration - info.removalThreshold

if isRemoved || latency > 0 {
metrics.UpdateScaleDownNodeRemovalLatency(isRemoved, latency)
}
if isRemoved {
t.logDeletion(nodeName, duration, info.removalThreshold, latency)
} else {
klog.V(4).Infof("Node %q is unremovable, became needed again (unneeded for %s).",
nodeName, duration)
}
}

// logDeletion handles the logging for scaled-down nodes,
// using a higher verbosity (V2) if the latency exceeds the configured threshold.
func (t *NodeLatencyTracker) logDeletion(nodeName string, duration, threshold, latency time.Duration) {
level := klog.Level(6)
if latency > scaleDownLatencyLogThreshold {
level = klog.Level(2)
}
klog.V(level).Infof("Observing deletion for node %s, unneeded for %s (removal threshold was %s).",
nodeName, duration, threshold)
}

// getTrackedNodes returns the names of all nodes currently tracked as unneeded.
func (t *NodeLatencyTracker) getTrackedNodes() []string {
return slices.Collect(maps.Keys(t.unneededNodes))
Expand Down