Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 62 additions & 62 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"

Expand Down Expand Up @@ -115,13 +115,10 @@ type UnregisteredNode struct {
type ScaleUpFailure struct {
NodeGroup cloudprovider.NodeGroup
Reason metrics.FailedScaleUpReason
ErrorInfo cloudprovider.InstanceErrorInfo
Time time.Time
}

type metricObserver interface {
RegisterFailedScaleUp(reason metrics.FailedScaleUpReason, gpuResourceName, gpuType string)
}

// ClusterStateRegistry is a structure to keep track the current state of the cluster.
type ClusterStateRegistry struct {
sync.Mutex
Expand All @@ -148,11 +145,11 @@ type ClusterStateRegistry struct {
interrupt chan struct{}
nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor
asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker
metrics metricObserver
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave blank line here, scaleStateNotifier shouldn't require locking, so let's be clear about that: https://dmitri.shuralyov.com/idiomatic-go#mutex-hat

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a blank line before scaleStateNotifier and put it after scaleUpFailures, because they do require a lock for it everywhere


// scaleUpFailures contains information about scale-up failures for each node group. It should be
// cleared periodically to avoid unnecessary accumulation.
scaleUpFailures map[string][]ScaleUpFailure

scaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList
}

// NodeGroupScalingSafety contains information about the safety of the node group to scale up/down.
Expand All @@ -163,11 +160,7 @@ type NodeGroupScalingSafety struct {
}

// NewClusterStateRegistry creates new ClusterStateRegistry.
func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker) *ClusterStateRegistry {
return newClusterStateRegistry(cloudProvider, config, logRecorder, backoff, nodeGroupConfigProcessor, asyncNodeGroupStateChecker, metrics.DefaultMetrics)
}

func newClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker, metrics metricObserver) *ClusterStateRegistry {
func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker, scaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList) *ClusterStateRegistry {
return &ClusterStateRegistry{
scaleUpRequests: make(map[string]*ScaleUpRequest),
scaleDownRequests: make([]*ScaleDownRequest, 0),
Expand All @@ -188,7 +181,7 @@ func newClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config C
scaleUpFailures: make(map[string][]ScaleUpFailure),
nodeGroupConfigProcessor: nodeGroupConfigProcessor,
asyncNodeGroupStateChecker: asyncNodeGroupStateChecker,
metrics: metrics,
scaleStateNotifier: scaleStateNotifier,
}
}

Expand Down Expand Up @@ -282,10 +275,11 @@ func (csr *ClusterStateRegistry) RegisterScaleDown(nodeGroup cloudprovider.NodeG
}

// To be executed under a lock.
func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) []ScaleUpFailure {
// clean up stale backoff info
csr.backoff.RemoveStaleBackoffData(currentTime)

var failedScaleUps []ScaleUpFailure
for nodeGroupName, scaleUpRequest := range csr.scaleUpRequests {
if csr.asyncNodeGroupStateChecker.IsUpcoming(scaleUpRequest.NodeGroup) {
continue
Expand All @@ -304,19 +298,16 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
csr.logRecorder.Eventf(apiv1.EventTypeWarning, "ScaleUpTimedOut",
"Nodes added to group %s failed to register within %v",
scaleUpRequest.NodeGroup.Id(), currentTime.Sub(scaleUpRequest.Time))
availableGPUTypes := csr.cloudProvider.GetAvailableGPUTypes()
gpuResource, gpuType := "", ""
nodeInfo, err := scaleUpRequest.NodeGroup.TemplateNodeInfo()
if err != nil {
klog.Warningf("Failed to get template node info for a node group: %s", err)
} else {
gpuResource, gpuType = gpu.GetGpuInfoForMetrics(csr.cloudProvider.GetNodeGpuConfig(nodeInfo.Node()), availableGPUTypes, nodeInfo.Node(), scaleUpRequest.NodeGroup)
}
csr.registerFailedScaleUpNoLock(scaleUpRequest.NodeGroup, metrics.Timeout, cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OtherErrorClass,
ErrorCode: "timeout",
ErrorMessage: fmt.Sprintf("Scale-up timed out for node group %v after %v", nodeGroupName, currentTime.Sub(scaleUpRequest.Time)),
}, gpuResource, gpuType, currentTime)
failedScaleUps = append(failedScaleUps, ScaleUpFailure{
NodeGroup: scaleUpRequest.NodeGroup,
Reason: metrics.Timeout,
ErrorInfo: cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OtherErrorClass,
ErrorCode: string(metrics.Timeout),
ErrorMessage: fmt.Sprintf("Scale-up timed out for node group %v after %v", nodeGroupName, currentTime.Sub(scaleUpRequest.Time)),
},
Time: currentTime,
})
delete(csr.scaleUpRequests, nodeGroupName)
}
}
Expand All @@ -328,6 +319,7 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
}
}
csr.scaleDownRequests = newScaleDownRequests
return failedScaleUps
}

// To be executed under a lock.
Expand All @@ -340,27 +332,21 @@ func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroup cloudprovider.NodeGr
// RegisterFailedScaleUp should be called after getting error from cloudprovider
// when trying to scale-up node group. It will mark this group as not safe to autoscale
// for some time.
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason string, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) {
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, errorInfo cloudprovider.InstanceErrorInfo, currentTime time.Time) {
csr.Lock()
defer csr.Unlock()
csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(reason), cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OtherErrorClass,
ErrorCode: string(reason),
ErrorMessage: errorMessage,
}, gpuResourceName, gpuType, currentTime)
csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{
NodeGroup: nodeGroup,
Reason: metrics.FailedScaleUpReason(errorInfo.ErrorCode),
Time: currentTime})
csr.backoffNodeGroup(nodeGroup, errorInfo, currentTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does backoff require locking? From what I see we are not locking when calling BackoffStatusForNodeGroup().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we need locking for exponentialBackoff.Backoff() to properly backoff all failing node groups, and it was implemented like this since the beginning. I am not 100% sure that it is safe to remove it

}

// RegisterFailedScaleDown records failed scale-down for a nodegroup.
// We don't need to implement this function for cluster state registry
func (csr *ClusterStateRegistry) RegisterFailedScaleDown(_ cloudprovider.NodeGroup, _ string, _ time.Time) {
}

func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorInfo cloudprovider.InstanceErrorInfo, gpuResourceName, gpuType string, currentTime time.Time) {
csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{NodeGroup: nodeGroup, Reason: reason, Time: currentTime})
csr.metrics.RegisterFailedScaleUp(reason, gpuResourceName, gpuType)
csr.backoffNodeGroup(nodeGroup, errorInfo, currentTime)
}

// UpdateNodes updates the state of the nodes in the ClusterStateRegistry and recalculates the stats
func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGroups map[string]*framework.NodeInfo, currentTime time.Time) error {
csr.updateNodeGroupMetrics()
Expand All @@ -374,12 +360,27 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr
if err != nil {
return err
}
scaleUpFailures := csr.updateClusterStateRegistry(
nodes,
nodeInfosForGroups,
cloudProviderNodeInstances,
currentTime,
targetSizes,
)
for _, failure := range scaleUpFailures {
csr.scaleStateNotifier.RegisterFailedScaleUp(failure.NodeGroup, failure.ErrorInfo, failure.Time)
}
return nil
}

func (csr *ClusterStateRegistry) updateClusterStateRegistry(nodes []*apiv1.Node,
nodeInfosForGroups map[string]*framework.NodeInfo,
cloudProviderNodeInstances map[string][]cloudprovider.Instance, currentTime time.Time, targetSizes map[string]int) []ScaleUpFailure {
cloudProviderNodesRemoved := csr.getCloudProviderDeletedNodes(nodes)
notRegistered := getNotRegisteredNodes(nodes, cloudProviderNodeInstances, currentTime)

csr.Lock()
defer csr.Unlock()

csr.nodes = nodes
csr.nodeInfosForGroups = nodeInfosForGroups
csr.previousCloudProviderNodeInstances = csr.cloudProviderNodeInstances
Expand All @@ -392,12 +393,12 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr
// update acceptable ranges based on requests from last loop and targetSizes
// updateScaleRequests relies on acceptableRanges being up to date
csr.updateAcceptableRanges(targetSizes)
csr.updateScaleRequests(currentTime)
csr.handleInstanceCreationErrors(currentTime)
scaleUpFailures := csr.updateScaleRequests(currentTime)
scaleUpFailures = append(scaleUpFailures, csr.handleInstanceCreationErrors(currentTime)...)
// recalculate acceptable ranges after removing timed out requests
csr.updateAcceptableRanges(targetSizes)
csr.updateIncorrectNodeGroupSizes(currentTime)
return nil
return scaleUpFailures
}

// Recalculate cluster state after scale-ups or scale-downs were registered.
Expand Down Expand Up @@ -1132,23 +1133,25 @@ func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetS
return currentSize, targetSize
}

func (csr *ClusterStateRegistry) handleInstanceCreationErrors(currentTime time.Time) {
func (csr *ClusterStateRegistry) handleInstanceCreationErrors(currentTime time.Time) []ScaleUpFailure {
nodeGroups := csr.getRunningNodeGroups()

var failedScaleUps []ScaleUpFailure
for _, nodeGroup := range nodeGroups {
csr.handleInstanceCreationErrorsForNodeGroup(
failedScaleUps = append(failedScaleUps, csr.handleInstanceCreationErrorsForNodeGroup(
nodeGroup,
csr.cloudProviderNodeInstances[nodeGroup.Id()],
csr.previousCloudProviderNodeInstances[nodeGroup.Id()],
currentTime)
currentTime)...)
}
return failedScaleUps
}

func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup(
nodeGroup cloudprovider.NodeGroup,
currentInstances []cloudprovider.Instance,
previousInstances []cloudprovider.Instance,
currentTime time.Time) {
currentTime time.Time) []ScaleUpFailure {

_, currentUniqueErrorMessagesForErrorCode, currentErrorCodeToInstance := csr.buildInstanceToErrorCodeMappings(currentInstances)
previousInstanceToErrorCode, _, _ := csr.buildInstanceToErrorCodeMappings(previousInstances)
Expand All @@ -1159,6 +1162,7 @@ func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup(
}
}

var failedScaleUps []ScaleUpFailure
// If node group is scaling up and there are new node-create requests which cannot be satisfied because of
// out-of-resources errors we:
// - emit event
Expand All @@ -1183,25 +1187,21 @@ func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup(
nodeGroup.Id(),
errorCode,
csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]))

availableGPUTypes := csr.cloudProvider.GetAvailableGPUTypes()
gpuResource, gpuType := "", ""
nodeInfo, err := nodeGroup.TemplateNodeInfo()
if err != nil {
klog.Warningf("Failed to get template node info for a node group: %s", err)
} else {
gpuResource, gpuType = gpu.GetGpuInfoForMetrics(csr.cloudProvider.GetNodeGpuConfig(nodeInfo.Node()), availableGPUTypes, nodeInfo.Node(), nodeGroup)
}
// Decrease the scale up request by the number of deleted nodes
csr.registerOrUpdateScaleUpNoLock(nodeGroup, -len(unseenInstanceIds), currentTime)

csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(errorCode.code), cloudprovider.InstanceErrorInfo{
ErrorClass: errorCode.class,
ErrorCode: errorCode.code,
ErrorMessage: csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]),
}, gpuResource, gpuType, currentTime)
failedScaleUps = append(failedScaleUps, ScaleUpFailure{
NodeGroup: nodeGroup,
Reason: metrics.FailedScaleUpReason(errorCode.code),
ErrorInfo: cloudprovider.InstanceErrorInfo{
ErrorClass: errorCode.class,
ErrorCode: errorCode.code,
ErrorMessage: csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]),
},
Time: currentTime,
})
}
}
return failedScaleUps
}

func (csr *ClusterStateRegistry) buildErrorMessageEventString(uniqErrorMessages []string) string {
Expand Down
Loading