diff --git a/cluster-autoscaler/clusterstate/clusterstate.go b/cluster-autoscaler/clusterstate/clusterstate.go index c89def9c80eb..344e463e5fe3 100644 --- a/cluster-autoscaler/clusterstate/clusterstate.go +++ b/cluster-autoscaler/clusterstate/clusterstate.go @@ -29,11 +29,11 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" - "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" @@ -115,13 +115,10 @@ type UnregisteredNode struct { type ScaleUpFailure struct { NodeGroup cloudprovider.NodeGroup Reason metrics.FailedScaleUpReason + ErrorInfo cloudprovider.InstanceErrorInfo Time time.Time } -type metricObserver interface { - RegisterFailedScaleUp(reason metrics.FailedScaleUpReason, gpuResourceName, gpuType string) -} - // ClusterStateRegistry is a structure to keep track the current state of the cluster. type ClusterStateRegistry struct { sync.Mutex @@ -148,11 +145,11 @@ type ClusterStateRegistry struct { interrupt chan struct{} nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker - metrics metricObserver - // scaleUpFailures contains information about scale-up failures for each node group. It should be // cleared periodically to avoid unnecessary accumulation. scaleUpFailures map[string][]ScaleUpFailure + + scaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList } // NodeGroupScalingSafety contains information about the safety of the node group to scale up/down. @@ -163,11 +160,7 @@ type NodeGroupScalingSafety struct { } // NewClusterStateRegistry creates new ClusterStateRegistry. -func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker) *ClusterStateRegistry { - return newClusterStateRegistry(cloudProvider, config, logRecorder, backoff, nodeGroupConfigProcessor, asyncNodeGroupStateChecker, metrics.DefaultMetrics) -} - -func newClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker, metrics metricObserver) *ClusterStateRegistry { +func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker, scaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList) *ClusterStateRegistry { return &ClusterStateRegistry{ scaleUpRequests: make(map[string]*ScaleUpRequest), scaleDownRequests: make([]*ScaleDownRequest, 0), @@ -188,7 +181,7 @@ func newClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config C scaleUpFailures: make(map[string][]ScaleUpFailure), nodeGroupConfigProcessor: nodeGroupConfigProcessor, asyncNodeGroupStateChecker: asyncNodeGroupStateChecker, - metrics: metrics, + scaleStateNotifier: scaleStateNotifier, } } @@ -282,10 +275,11 @@ func (csr *ClusterStateRegistry) RegisterScaleDown(nodeGroup cloudprovider.NodeG } // To be executed under a lock. -func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) { +func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) []ScaleUpFailure { // clean up stale backoff info csr.backoff.RemoveStaleBackoffData(currentTime) + var failedScaleUps []ScaleUpFailure for nodeGroupName, scaleUpRequest := range csr.scaleUpRequests { if csr.asyncNodeGroupStateChecker.IsUpcoming(scaleUpRequest.NodeGroup) { continue @@ -304,19 +298,16 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) { csr.logRecorder.Eventf(apiv1.EventTypeWarning, "ScaleUpTimedOut", "Nodes added to group %s failed to register within %v", scaleUpRequest.NodeGroup.Id(), currentTime.Sub(scaleUpRequest.Time)) - availableGPUTypes := csr.cloudProvider.GetAvailableGPUTypes() - gpuResource, gpuType := "", "" - nodeInfo, err := scaleUpRequest.NodeGroup.TemplateNodeInfo() - if err != nil { - klog.Warningf("Failed to get template node info for a node group: %s", err) - } else { - gpuResource, gpuType = gpu.GetGpuInfoForMetrics(csr.cloudProvider.GetNodeGpuConfig(nodeInfo.Node()), availableGPUTypes, nodeInfo.Node(), scaleUpRequest.NodeGroup) - } - csr.registerFailedScaleUpNoLock(scaleUpRequest.NodeGroup, metrics.Timeout, cloudprovider.InstanceErrorInfo{ - ErrorClass: cloudprovider.OtherErrorClass, - ErrorCode: "timeout", - ErrorMessage: fmt.Sprintf("Scale-up timed out for node group %v after %v", nodeGroupName, currentTime.Sub(scaleUpRequest.Time)), - }, gpuResource, gpuType, currentTime) + failedScaleUps = append(failedScaleUps, ScaleUpFailure{ + NodeGroup: scaleUpRequest.NodeGroup, + Reason: metrics.Timeout, + ErrorInfo: cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OtherErrorClass, + ErrorCode: string(metrics.Timeout), + ErrorMessage: fmt.Sprintf("Scale-up timed out for node group %v after %v", nodeGroupName, currentTime.Sub(scaleUpRequest.Time)), + }, + Time: currentTime, + }) delete(csr.scaleUpRequests, nodeGroupName) } } @@ -328,6 +319,7 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) { } } csr.scaleDownRequests = newScaleDownRequests + return failedScaleUps } // To be executed under a lock. @@ -340,14 +332,14 @@ func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroup cloudprovider.NodeGr // RegisterFailedScaleUp should be called after getting error from cloudprovider // when trying to scale-up node group. It will mark this group as not safe to autoscale // for some time. -func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason string, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) { +func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, errorInfo cloudprovider.InstanceErrorInfo, currentTime time.Time) { csr.Lock() defer csr.Unlock() - csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(reason), cloudprovider.InstanceErrorInfo{ - ErrorClass: cloudprovider.OtherErrorClass, - ErrorCode: string(reason), - ErrorMessage: errorMessage, - }, gpuResourceName, gpuType, currentTime) + csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{ + NodeGroup: nodeGroup, + Reason: metrics.FailedScaleUpReason(errorInfo.ErrorCode), + Time: currentTime}) + csr.backoffNodeGroup(nodeGroup, errorInfo, currentTime) } // RegisterFailedScaleDown records failed scale-down for a nodegroup. @@ -355,12 +347,6 @@ func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.N func (csr *ClusterStateRegistry) RegisterFailedScaleDown(_ cloudprovider.NodeGroup, _ string, _ time.Time) { } -func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorInfo cloudprovider.InstanceErrorInfo, gpuResourceName, gpuType string, currentTime time.Time) { - csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{NodeGroup: nodeGroup, Reason: reason, Time: currentTime}) - csr.metrics.RegisterFailedScaleUp(reason, gpuResourceName, gpuType) - csr.backoffNodeGroup(nodeGroup, errorInfo, currentTime) -} - // UpdateNodes updates the state of the nodes in the ClusterStateRegistry and recalculates the stats func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGroups map[string]*framework.NodeInfo, currentTime time.Time) error { csr.updateNodeGroupMetrics() @@ -374,12 +360,27 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr if err != nil { return err } + scaleUpFailures := csr.updateClusterStateRegistry( + nodes, + nodeInfosForGroups, + cloudProviderNodeInstances, + currentTime, + targetSizes, + ) + for _, failure := range scaleUpFailures { + csr.scaleStateNotifier.RegisterFailedScaleUp(failure.NodeGroup, failure.ErrorInfo, failure.Time) + } + return nil +} + +func (csr *ClusterStateRegistry) updateClusterStateRegistry(nodes []*apiv1.Node, + nodeInfosForGroups map[string]*framework.NodeInfo, + cloudProviderNodeInstances map[string][]cloudprovider.Instance, currentTime time.Time, targetSizes map[string]int) []ScaleUpFailure { cloudProviderNodesRemoved := csr.getCloudProviderDeletedNodes(nodes) notRegistered := getNotRegisteredNodes(nodes, cloudProviderNodeInstances, currentTime) csr.Lock() defer csr.Unlock() - csr.nodes = nodes csr.nodeInfosForGroups = nodeInfosForGroups csr.previousCloudProviderNodeInstances = csr.cloudProviderNodeInstances @@ -392,12 +393,12 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr // update acceptable ranges based on requests from last loop and targetSizes // updateScaleRequests relies on acceptableRanges being up to date csr.updateAcceptableRanges(targetSizes) - csr.updateScaleRequests(currentTime) - csr.handleInstanceCreationErrors(currentTime) + scaleUpFailures := csr.updateScaleRequests(currentTime) + scaleUpFailures = append(scaleUpFailures, csr.handleInstanceCreationErrors(currentTime)...) // recalculate acceptable ranges after removing timed out requests csr.updateAcceptableRanges(targetSizes) csr.updateIncorrectNodeGroupSizes(currentTime) - return nil + return scaleUpFailures } // Recalculate cluster state after scale-ups or scale-downs were registered. @@ -1132,23 +1133,25 @@ func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetS return currentSize, targetSize } -func (csr *ClusterStateRegistry) handleInstanceCreationErrors(currentTime time.Time) { +func (csr *ClusterStateRegistry) handleInstanceCreationErrors(currentTime time.Time) []ScaleUpFailure { nodeGroups := csr.getRunningNodeGroups() + var failedScaleUps []ScaleUpFailure for _, nodeGroup := range nodeGroups { - csr.handleInstanceCreationErrorsForNodeGroup( + failedScaleUps = append(failedScaleUps, csr.handleInstanceCreationErrorsForNodeGroup( nodeGroup, csr.cloudProviderNodeInstances[nodeGroup.Id()], csr.previousCloudProviderNodeInstances[nodeGroup.Id()], - currentTime) + currentTime)...) } + return failedScaleUps } func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup( nodeGroup cloudprovider.NodeGroup, currentInstances []cloudprovider.Instance, previousInstances []cloudprovider.Instance, - currentTime time.Time) { + currentTime time.Time) []ScaleUpFailure { _, currentUniqueErrorMessagesForErrorCode, currentErrorCodeToInstance := csr.buildInstanceToErrorCodeMappings(currentInstances) previousInstanceToErrorCode, _, _ := csr.buildInstanceToErrorCodeMappings(previousInstances) @@ -1159,6 +1162,7 @@ func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup( } } + var failedScaleUps []ScaleUpFailure // If node group is scaling up and there are new node-create requests which cannot be satisfied because of // out-of-resources errors we: // - emit event @@ -1183,25 +1187,21 @@ func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup( nodeGroup.Id(), errorCode, csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode])) - - availableGPUTypes := csr.cloudProvider.GetAvailableGPUTypes() - gpuResource, gpuType := "", "" - nodeInfo, err := nodeGroup.TemplateNodeInfo() - if err != nil { - klog.Warningf("Failed to get template node info for a node group: %s", err) - } else { - gpuResource, gpuType = gpu.GetGpuInfoForMetrics(csr.cloudProvider.GetNodeGpuConfig(nodeInfo.Node()), availableGPUTypes, nodeInfo.Node(), nodeGroup) - } // Decrease the scale up request by the number of deleted nodes csr.registerOrUpdateScaleUpNoLock(nodeGroup, -len(unseenInstanceIds), currentTime) - - csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(errorCode.code), cloudprovider.InstanceErrorInfo{ - ErrorClass: errorCode.class, - ErrorCode: errorCode.code, - ErrorMessage: csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]), - }, gpuResource, gpuType, currentTime) + failedScaleUps = append(failedScaleUps, ScaleUpFailure{ + NodeGroup: nodeGroup, + Reason: metrics.FailedScaleUpReason(errorCode.code), + ErrorInfo: cloudprovider.InstanceErrorInfo{ + ErrorClass: errorCode.class, + ErrorCode: errorCode.code, + ErrorMessage: csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]), + }, + Time: currentTime, + }) } } + return failedScaleUps } func (csr *ClusterStateRegistry) buildErrorMessageEventString(uniqErrorMessages []string) string { diff --git a/cluster-autoscaler/clusterstate/clusterstate_test.go b/cluster-autoscaler/clusterstate/clusterstate_test.go index 1799e5d1fccf..a2e08cf7c815 100644 --- a/cluster-autoscaler/clusterstate/clusterstate_test.go +++ b/cluster-autoscaler/clusterstate/clusterstate_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" @@ -79,7 +80,7 @@ func TestOKWithScaleUp(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) @@ -117,7 +118,7 @@ func TestEmptyOK(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now.Add(-5*time.Second)) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -159,7 +160,7 @@ func TestHasNodeGroupStartedScaleUp(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now.Add(-5*time.Second)) assert.NoError(t, err) assert.False(t, clusterstate.IsNodeGroupScalingUp("ng1")) @@ -234,7 +235,7 @@ func TestRecalculateStateAfterNodeGroupSizeChanged(t *testing.T) { fakeLogRecorder, _ := utils.NewStatusMapRecorder(&fake.Clientset{}, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") clusterState := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{}, fakeLogRecorder, - newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) clusterState.acceptableRanges = map[string]AcceptableRange{ngName: tc.acceptableRange} clusterState.perNodeGroupReadiness = map[string]Readiness{ngName: tc.readiness} if tc.scaleUpRequest != nil { @@ -271,7 +272,7 @@ func TestOKOneUnreadyNode(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -306,7 +307,7 @@ func TestNodeWithoutNodeGroupDontCrash(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{noNgNode}, nil, now) assert.NoError(t, err) assert.Empty(t, clusterstate.GetScaleUpFailures()) @@ -333,7 +334,7 @@ func TestOKOneUnreadyNodeWithScaleDownCandidate(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) clusterstate.UpdateScaleDownCandidates([]*scaledown.UnneededNode{{Node: ng1_1}}, now) @@ -387,7 +388,7 @@ func TestMissingNodes(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -427,7 +428,7 @@ func TestTooManyUnready(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 35 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 35 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.False(t, clusterstate.IsClusterHealthy()) @@ -456,7 +457,7 @@ func TestUnreadyLongAfterCreation(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Unready)) @@ -487,7 +488,7 @@ func TestUnreadyAfterCreationWithIncreasedStartupTime(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 35 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 35 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().Unready)) @@ -519,7 +520,7 @@ func TestNotStarted(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 35 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 35 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().NotStarted)) @@ -555,10 +556,11 @@ func TestExpiredScaleUp(t *testing.T) { mockMetrics := &mockMetrics{} mockMetrics.On("RegisterFailedScaleUp", mock.Anything, mock.Anything, mock.Anything).Return() fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := newClusterStateRegistry(provider, ClusterStateRegistryConfig{ + clusterstate := newClusterStateRegistryWithMetrics(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 2 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), mockMetrics) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 2 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList(), mockMetrics) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute)) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now) assert.NoError(t, err) @@ -584,7 +586,7 @@ func TestRegisterScaleDown(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 35 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 35 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) now := time.Now() clusterstate.RegisterScaleDown(provider.GetNodeGroup("ng1"), "ng1-1", now.Add(time.Minute), now) assert.Equal(t, 1, len(clusterstate.scaleDownRequests)) @@ -602,7 +604,7 @@ func TestNodeGroupScaleUpTime(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) // nil node group _, err := clusterstate.NodeGroupScaleUpTime(nil) @@ -677,7 +679,7 @@ func TestUpcomingNodes(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1, ng5_1, ng5_2}, nil, now) assert.NoError(t, err) assert.Empty(t, clusterstate.GetScaleUpFailures()) @@ -723,7 +725,7 @@ func TestTaintBasedNodeDeletion(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, now) assert.NoError(t, err) assert.Empty(t, clusterstate.GetScaleUpFailures()) @@ -744,7 +746,7 @@ func TestIncorrectSize(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) now := time.Now() clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-5*time.Minute)) incorrect := clusterstate.incorrectNodeGroupSizes["ng1"] @@ -780,7 +782,7 @@ func TestUnregisteredNodes(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 10 * time.Second}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 10 * time.Second}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, time.Now().Add(-time.Minute)) assert.NoError(t, err) @@ -829,7 +831,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 10 * time.Second}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 10 * time.Second}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) now.Add(time.Minute) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, noNgNode}, nil, now) @@ -924,12 +926,11 @@ func TestScaleUpBackoff(t *testing.T) { mockMetrics := &mockMetrics{} mockMetrics.On("RegisterFailedScaleUp", mock.Anything, mock.Anything, mock.Anything).Return() fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := newClusterStateRegistry( - provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 120 * time.Second}), - asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), mockMetrics) + clusterstate := newClusterStateRegistryWithMetrics(provider, ClusterStateRegistryConfig{ + MaxTotalUnreadyPercentage: 10, + OkTotalUnreadyCount: 1, + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 120 * time.Second}), + asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList(), mockMetrics) // After failed scale-up, node group should be still healthy, but should backoff from scale-ups clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second)) @@ -1054,7 +1055,7 @@ func TestGetClusterSize(t *testing.T) { clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) // There are 2 actual nodes in 2 node groups with target sizes of 5 and 1. clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, notAutoscaledNode}, nil, now) @@ -1104,6 +1105,7 @@ func TestUpdateScaleUp(t *testing.T) { newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 10 * time.Second}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), + nodegroupchange.NewNodeGroupChangeObserversList(), ) // Test cases for `RegisterScaleUp` @@ -1145,13 +1147,13 @@ func TestScaleUpFailures(t *testing.T) { fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") mockMetrics := &mockMetrics{} mockMetrics.On("RegisterFailedScaleUp", mock.Anything, mock.Anything, mock.Anything).Return() - clusterstate := newClusterStateRegistry(provider, ClusterStateRegistryConfig{}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), mockMetrics) + clusterstate := newClusterStateRegistryWithMetrics(provider, ClusterStateRegistryConfig{}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList(), mockMetrics) - clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), string(metrics.Timeout), "", "", "", now) + clusterstate.scaleStateNotifier.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), cloudprovider.InstanceErrorInfo{ErrorClass: cloudprovider.OtherErrorClass, ErrorCode: string(metrics.Timeout), ErrorMessage: ""}, now) mockMetrics.AssertCalled(t, "RegisterFailedScaleUp", metrics.Timeout, "", "") - clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng2"), string(metrics.Timeout), "", "", "", now) + clusterstate.scaleStateNotifier.RegisterFailedScaleUp(provider.GetNodeGroup("ng2"), cloudprovider.InstanceErrorInfo{ErrorClass: cloudprovider.OtherErrorClass, ErrorCode: string(metrics.Timeout), ErrorMessage: ""}, now) mockMetrics.AssertCalled(t, "RegisterFailedScaleUp", metrics.Timeout, "", "") - clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), string(metrics.APIError), "", "", "", now.Add(time.Minute)) + clusterstate.scaleStateNotifier.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), cloudprovider.InstanceErrorInfo{ErrorClass: cloudprovider.OtherErrorClass, ErrorCode: string(metrics.APIError), ErrorMessage: ""}, now.Add(time.Minute)) mockMetrics.AssertCalled(t, "RegisterFailedScaleUp", metrics.APIError, "", "") failures := clusterstate.GetScaleUpFailures() @@ -1546,6 +1548,7 @@ func TestIsNodeGroupRegistered(t *testing.T) { newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), + nodegroupchange.NewNodeGroupChangeObserversList(), ) clusterstate.Recalculate() @@ -1629,6 +1632,7 @@ func TestUpcomingNodesFromUpcomingNodeGroups(t *testing.T) { newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), &asyncnodegroups.MockAsyncNodeGroupStateChecker{IsUpcomingNodeGroup: tc.isUpcomingMockMap}, + nodegroupchange.NewNodeGroupChangeObserversList(), ) if tc.updateNodes { err := clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now) @@ -1676,7 +1680,7 @@ func TestHandleInstanceCreationErrors(t *testing.T) { fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") mockMetrics := &mockMetrics{} mockMetrics.On("RegisterFailedScaleUp", mock.Anything, mock.Anything, mock.Anything).Return() - clusterstate := newClusterStateRegistry(provider, ClusterStateRegistryConfig{}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), mockMetrics) + clusterstate := newClusterStateRegistryWithMetrics(provider, ClusterStateRegistryConfig{}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList(), mockMetrics) clusterstate.RegisterScaleUp(mockedNodeGroup, 1, now) // UpdateNodes will trigger handleInstanceCreationErrors @@ -1692,3 +1696,11 @@ type mockMetrics struct { func (m *mockMetrics) RegisterFailedScaleUp(reason metrics.FailedScaleUpReason, gpuResourceName, gpuType string) { m.Called(reason, gpuResourceName, gpuType) } + +// newClusterStateRegistryWithMetrics creates new ClusterStateRegistry object and registers all necessary observers for metric mocking. +func newClusterStateRegistryWithMetrics(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker, scaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList, metrics *mockMetrics) *ClusterStateRegistry { + clusterstate := NewClusterStateRegistry(cloudProvider, config, logRecorder, backoff, nodeGroupConfigProcessor, asyncNodeGroupStateChecker, scaleStateNotifier) + clusterstate.scaleStateNotifier.Register(clusterstate) + clusterstate.scaleStateNotifier.Register(nodegroupchange.NewNodeGroupChangeMetricsProducer(cloudProvider, metrics)) + return clusterstate +} diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index e92e2d7f6cf5..13e1c145516d 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -1233,7 +1233,7 @@ func runStartDeletionTest(t *testing.T, tc startDeletionTestCase, force bool) { if err != nil { t.Fatalf("Couldn't set up autoscaling context: %v", err) } - csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) for _, bucket := range emptyNodeGroupViews { for _, node := range bucket.Nodes { err := autoscalingCtx.ClusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods[node.Name]...)) @@ -1546,7 +1546,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) { if err != nil { t.Fatalf("Couldn't set up autoscaling context: %v", err) } - csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList() scaleStateNotifier.Register(csr) ndt := deletiontracker.NewNodeDeletionTracker(0) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/executor.go b/cluster-autoscaler/core/scaleup/orchestrator/executor.go index 19bcdbb6ff70..3d0c83046e17 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/executor.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/executor.go @@ -159,8 +159,6 @@ func (e *scaleUpExecutor) executeScaleUp( now time.Time, atomic bool, ) errors.AutoscalerError { - gpuConfig := e.autoscalingCtx.CloudProvider.GetNodeGpuConfig(nodeInfo.Node()) - gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil) klog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize) e.autoscalingCtx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", "Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize) @@ -168,7 +166,11 @@ func (e *scaleUpExecutor) executeScaleUp( if err := e.increaseSize(info.Group, increase, atomic); err != nil { e.autoscalingCtx.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err) aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ") - e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, string(aerr.Type()), aerr.Error(), gpuResourceName, gpuType, now) + e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OtherErrorClass, + ErrorCode: string(aerr.Type()), + ErrorMessage: aerr.Error(), + }, now) return aerr } if increase < 0 { @@ -180,6 +182,8 @@ func (e *scaleUpExecutor) executeScaleUp( return nil } e.scaleStateNotifier.RegisterScaleUp(info.Group, increase, time.Now()) + gpuConfig := e.autoscalingCtx.CloudProvider.GetNodeGpuConfig(nodeInfo.Node()) + gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil) metrics.RegisterScaleUp(increase, gpuResourceName, gpuType) e.autoscalingCtx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", "Scale-up: group %s size set to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index dd3ab8958c42..7bee89167797 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups" @@ -1211,9 +1212,10 @@ func runSimpleScaleUpTest(t *testing.T, testConfig *ScaleUpTestConfig) *ScaleUpT err = autoscalingCtx.TemplateNodeInfoRegistry.Recompute(&autoscalingCtx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) assert.NoError(t, err) nodeInfos := autoscalingCtx.TemplateNodeInfoRegistry.GetNodeInfos() - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) - clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), processors.ScaleStateNotifier) processors.ScaleStateNotifier.Register(clusterState) + processors.ScaleStateNotifier.Register(nodegroupchange.NewNodeGroupChangeMetricsProducer(provider, metrics.DefaultMetrics)) + clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) processors.NodeGroupSetProcessor = nodegroupset.NewDefaultNodeGroupSetProcessor([]string{nodeGroupLabel}, config.NodeGroupDifferenceRatios{}) if testConfig.EnableAutoprovisioning { processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} @@ -1328,7 +1330,7 @@ func TestScaleUpUnhealthy(t *testing.T) { assert.NoError(t, err) _ = autoscalingCtx.TemplateNodeInfoRegistry.Recompute(&autoscalingCtx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) nodeInfos := autoscalingCtx.TemplateNodeInfoRegistry.GetNodeInfos() - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 550, 0) @@ -1379,7 +1381,7 @@ func TestBinpackingLimiter(t *testing.T) { assert.NoError(t, err) nodeInfos := autoscalingCtx.TemplateNodeInfoRegistry.GetNodeInfos() - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) extraPod := BuildTestPod("p-new", 500, 0) @@ -1442,7 +1444,7 @@ func TestScaleUpNoHelp(t *testing.T) { assert.NoError(t, err) _ = autoscalingCtx.TemplateNodeInfoRegistry.Recompute(&autoscalingCtx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) nodeInfos := autoscalingCtx.TemplateNodeInfoRegistry.GetNodeInfos() - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 500, 0) @@ -1603,7 +1605,7 @@ func TestComputeSimilarNodeGroups(t *testing.T) { assert.NoError(t, err) _ = autoscalingCtx.TemplateNodeInfoRegistry.Recompute(&autoscalingCtx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) nodeInfos := autoscalingCtx.TemplateNodeInfoRegistry.GetNodeInfos() - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) assert.NoError(t, clusterState.UpdateNodes(nodes, nodeInfos, time.Now())) suOrchestrator := &ScaleUpOrchestrator{} @@ -1690,7 +1692,7 @@ func TestScaleUpBalanceGroups(t *testing.T) { assert.NoError(t, err) _ = autoscalingCtx.TemplateNodeInfoRegistry.Recompute(&autoscalingCtx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) nodeInfos := autoscalingCtx.TemplateNodeInfoRegistry.GetNodeInfos() - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) pods := make([]*apiv1.Pod, 0) @@ -1759,7 +1761,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { autoscalingCtx, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil, templateNodeInfoRegistry) assert.NoError(t, err) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 0} @@ -1817,7 +1819,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) { autoscalingCtx, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil, templateNodeInfoRegistry) assert.NoError(t, err) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 2} @@ -1887,7 +1889,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) { assert.NoError(t, err) _ = autoscalingCtx.TemplateNodeInfoRegistry.Recompute(&autoscalingCtx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now()) nodeInfos := autoscalingCtx.TemplateNodeInfoRegistry.GetNodeInfos() - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) quotasProvider := resourcequotas.NewCloudQuotasProvider(provider) @@ -1979,7 +1981,7 @@ func TestScaleupAsyncNodeGroupsEnabled(t *testing.T) { autoscalingCtx, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil, templateNodeInfoRegistry) assert.NoError(t, err) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 1} diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index b7509fae09c5..e060ede5ffa2 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -42,6 +42,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/autoscaler/cluster-autoscaler/processors/status" @@ -156,7 +157,7 @@ func NewStaticAutoscaler( MaxTotalUnreadyPercentage: opts.MaxTotalUnreadyPercentage, OkTotalUnreadyCount: opts.OkTotalUnreadyCount, } - clusterStateRegistry := clusterstate.NewClusterStateRegistry(cloudProvider, clusterStateConfig, autoscalingKubeClients.LogRecorder, backoff, processors.NodeGroupConfigProcessor, processors.AsyncNodeGroupStateChecker) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(cloudProvider, clusterStateConfig, autoscalingKubeClients.LogRecorder, backoff, processors.NodeGroupConfigProcessor, processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) processorCallbacks := newStaticAutoscalerProcessorCallbacks() templateNodeInfoRegistry := nodeinfosprovider.NewTemplateNodeInfoRegistry(processors.TemplateNodeInfoProvider) @@ -179,6 +180,7 @@ func NewStaticAutoscaler( taintConfig := taints.NewTaintConfig(opts) processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry) processors.ScaleStateNotifier.Register(clusterStateRegistry) + processors.ScaleStateNotifier.Register(nodegroupchange.NewNodeGroupChangeMetricsProducer(cloudProvider, metrics.DefaultMetrics)) // TODO: Populate the ScaleDownActuator/Planner fields in AutoscalingContext // during the struct creation rather than here. diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 78b2ae635283..9b1a7615ec60 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -56,6 +56,7 @@ import ( core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups" @@ -317,7 +318,7 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) { return nil, err } - clusterState := clusterstate.NewClusterStateRegistry(provider, config.clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), processors.NodeGroupConfigProcessor, processors.AsyncNodeGroupStateChecker) + clusterState := clusterstate.NewClusterStateRegistry(provider, config.clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), processors.NodeGroupConfigProcessor, processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) clusterState.UpdateNodes(allNodes, nil, config.nodeStateUpdateTime) processors.ScaleStateNotifier.Register(clusterState) quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) @@ -420,7 +421,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ OkTotalUnreadyCount: 1, } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) suOrchestrator := orchestrator.New() @@ -688,8 +689,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) { cp.Register(scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers)) cp.Register(sddProcessor) processors.ScaleDownNodeProcessor = cp - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - processors.ScaleStateNotifier.Register(clusterState) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) @@ -835,7 +835,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ OkTotalUnreadyCount: 0, } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) @@ -982,7 +982,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { OkTotalUnreadyCount: 1, } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) // broken node detected as unregistered nodes := []*apiv1.Node{n1} @@ -1147,7 +1147,8 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ OkTotalUnreadyCount: 1, } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) + + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) suOrchestrator := orchestrator.New() @@ -1279,7 +1280,8 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ OkTotalUnreadyCount: 1, } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) + + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) autoscaler := &StaticAutoscaler{ @@ -1377,7 +1379,8 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ OkTotalUnreadyCount: 1, } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) + + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) autoscaler := &StaticAutoscaler{ @@ -1728,7 +1731,7 @@ func TestStaticAutoscalerRunOnceWithExistingDeletionCandidateNodes(t *testing.T) clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ OkTotalUnreadyCount: 1, } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) quotasTrackerFactory := newQuotasTrackerFactory(&autoscalingCtx, processors) suOrchestrator := orchestrator.New() @@ -1840,7 +1843,8 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { nodeGroupConfigProcessor := nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults) asyncNodeGroupStateChecker := asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker() - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodeGroupConfigProcessor, asyncNodeGroupStateChecker) + scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList() + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodeGroupConfigProcessor, asyncNodeGroupStateChecker, scaleStateNotifier) autoscaler := &StaticAutoscaler{ AutoscalingContext: &autoscalingCtx, clusterStateRegistry: clusterState, @@ -2100,7 +2104,7 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { return false }, nil) - clusterState = clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodeGroupConfigProcessor, asyncNodeGroupStateChecker) + clusterState = clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodeGroupConfigProcessor, asyncNodeGroupStateChecker, scaleStateNotifier) clusterState.RefreshCloudProviderNodeInstancesCache() autoscaler.clusterStateRegistry = clusterState @@ -2149,7 +2153,7 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { return nil }, nil).Times(2) - clusterState = clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodeGroupConfigProcessor, asyncNodeGroupStateChecker) + clusterState = clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodeGroupConfigProcessor, asyncNodeGroupStateChecker, scaleStateNotifier) clusterState.RefreshCloudProviderNodeInstancesCache() autoscaler.CloudProvider = provider autoscaler.clusterStateRegistry = clusterState @@ -2208,7 +2212,8 @@ func setupTestStaticAutoscalerInstanceCreationErrorsForZeroOrMaxScaling(t *testi nodeGroupConfigProcessor := nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults) asyncNodeGroupStateChecker := asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker() - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodeGroupConfigProcessor, asyncNodeGroupStateChecker) + scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList() + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodeGroupConfigProcessor, asyncNodeGroupStateChecker, scaleStateNotifier) autoscaler := &StaticAutoscaler{ AutoscalingContext: &autoscalingCtx, clusterStateRegistry: clusterState, @@ -2240,7 +2245,7 @@ func setupTestStaticAutoscalerInstanceCreationErrorsForZeroOrMaxScaling(t *testi return nil }, nil) - clusterState = clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodeGroupConfigProcessor, asyncNodeGroupStateChecker) + clusterState = clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodeGroupConfigProcessor, asyncNodeGroupStateChecker, scaleStateNotifier) clusterState.RefreshCloudProviderNodeInstancesCache() autoscaler.CloudProvider = provider autoscaler.clusterStateRegistry = clusterState @@ -2489,7 +2494,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) { // Create CSR with unhealthy cluster protection effectively disabled, to guarantee we reach the tested logic. csrConfig := clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: nodeGroupCount * unreadyNodesCount} - csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker) + csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) // Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test. actuator := actuation.NewActuator(&autoscalingCtx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor) @@ -2587,7 +2592,7 @@ func TestRemoveFixNodeTargetSize(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingCtx.AutoscalingOptions.NodeGroupDefaults), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingCtx.AutoscalingOptions.NodeGroupDefaults), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-time.Hour)) assert.NoError(t, err) @@ -2635,7 +2640,7 @@ func TestRemoveOldUnregisteredNodes(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingCtx.AutoscalingOptions.NodeGroupDefaults), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingCtx.AutoscalingOptions.NodeGroupDefaults), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-time.Hour)) assert.NoError(t, err) @@ -2695,7 +2700,7 @@ func setupTestRemoveOldUnregisteredNodesAtomic(t *testing.T, now time.Time, allo clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, - }, fakeLogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingCtx.AutoscalingOptions.NodeGroupDefaults), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) + }, fakeLogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingCtx.AutoscalingOptions.NodeGroupDefaults), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), nodegroupchange.NewNodeGroupChangeObserversList()) err := clusterState.UpdateNodes([]*apiv1.Node{regNode}, nil, now.Add(-time.Hour)) assert.NoError(t, err) @@ -3293,7 +3298,7 @@ func buildStaticAutoscaler(t *testing.T, provider cloudprovider.CloudProvider, a cp.Register(scaledowncandidates.NewScaleDownCandidatesSortingProcessor([]scaledowncandidates.CandidatesComparer{})) processors.ScaleDownNodeProcessor = cp - csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: 1}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker) + csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: 1}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) actuator := actuation.NewActuator(&autoscalingCtx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor) autoscalingCtx.ScaleDownActuator = actuator @@ -3572,7 +3577,7 @@ func TestStaticAutoscalerWithNodeDeclaredFeatures(t *testing.T) { } clusterStateConfig := clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: 2} - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), processors.NodeGroupConfigProcessor, processors.AsyncNodeGroupStateChecker) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, autoscalingCtx.LogRecorder, NewBackoff(), processors.NodeGroupConfigProcessor, processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&autoscalingCtx, processors, clusterState, nil) autoscalingCtx.ScaleDownActuator = sdActuator diff --git a/cluster-autoscaler/observers/nodegroupchange/scale_state_observer.go b/cluster-autoscaler/observers/nodegroupchange/scale_state_observer.go index 0f771746c917..370597e9a977 100644 --- a/cluster-autoscaler/observers/nodegroupchange/scale_state_observer.go +++ b/cluster-autoscaler/observers/nodegroupchange/scale_state_observer.go @@ -21,6 +21,9 @@ import ( "time" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" + "k8s.io/klog/v2" ) // NodeGroupChangeObserver is an observer of: @@ -34,9 +37,8 @@ type NodeGroupChangeObserver interface { // RegisterScaleDowns records scale down for a nodegroup. RegisterScaleDown(nodeGroup cloudprovider.NodeGroup, nodeName string, currentTime time.Time, expectedDeleteTime time.Time) // RegisterFailedScaleUp records failed scale-up for a nodegroup. - // reason denotes optional reason for failed scale-up - // errMsg denotes the actual error message - RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason string, errMsg string, gpuResourceName, gpuType string, currentTime time.Time) + // errorInfo is a wrapper containing the reason for failed scale-up and the actual error message + RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, errorInfo cloudprovider.InstanceErrorInfo, currentTime time.Time) // RegisterFailedScaleDown records failed scale-down for a nodegroup. RegisterFailedScaleDown(nodeGroup cloudprovider.NodeGroup, reason string, currentTime time.Time) } @@ -76,11 +78,11 @@ func (l *NodeGroupChangeObserversList) RegisterScaleDown(nodeGroup cloudprovider // RegisterFailedScaleUp calls RegisterFailedScaleUp for each observer. func (l *NodeGroupChangeObserversList) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, - reason string, errMsg, gpuResourceName, gpuType string, currentTime time.Time) { + errorInfo cloudprovider.InstanceErrorInfo, currentTime time.Time) { l.mutex.Lock() defer l.mutex.Unlock() for _, observer := range l.observers { - observer.RegisterFailedScaleUp(nodeGroup, reason, errMsg, gpuResourceName, gpuType, currentTime) + observer.RegisterFailedScaleUp(nodeGroup, errorInfo, currentTime) } } @@ -98,3 +100,48 @@ func (l *NodeGroupChangeObserversList) RegisterFailedScaleDown(nodeGroup cloudpr func NewNodeGroupChangeObserversList() *NodeGroupChangeObserversList { return &NodeGroupChangeObserversList{} } + +type metricObserver interface { + RegisterFailedScaleUp(reason metrics.FailedScaleUpReason, gpuResourceName, gpuType string) +} + +// NodeGroupChangeMetricsProducer is an implementation of NodeGroupChangeObserver for reporting the scale up/down metrics +type NodeGroupChangeMetricsProducer struct { + cloudProvider cloudprovider.CloudProvider + // metrics is an instance of metricObserver interface which allows to mock and test the nodegroupchange metrics + metrics metricObserver +} + +// RegisterScaleUp calls RegisterScaleUp for each observer. +func (p *NodeGroupChangeMetricsProducer) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, + delta int, currentTime time.Time) { +} + +// RegisterScaleDown calls RegisterScaleDown for each observer. +func (p *NodeGroupChangeMetricsProducer) RegisterScaleDown(nodeGroup cloudprovider.NodeGroup, + nodeName string, currentTime time.Time, expectedDeleteTime time.Time) { +} + +// RegisterFailedScaleUp emits the failed scale up metric. +func (p *NodeGroupChangeMetricsProducer) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, + errorInfo cloudprovider.InstanceErrorInfo, currentTime time.Time) { + availableGPUTypes := p.cloudProvider.GetAvailableGPUTypes() + gpuResourceName, gpuType := "", "" + nodeInfo, err := nodeGroup.TemplateNodeInfo() + if err != nil { + klog.Warningf("Failed to get template node info for a node group: %s", err) + } else { + gpuResourceName, gpuType = gpu.GetGpuInfoForMetrics(p.cloudProvider.GetNodeGpuConfig(nodeInfo.Node()), availableGPUTypes, nodeInfo.Node(), nodeGroup) + } + p.metrics.RegisterFailedScaleUp(metrics.FailedScaleUpReason(errorInfo.ErrorCode), gpuResourceName, gpuType) +} + +// RegisterFailedScaleDown records failed scale-down for a nodegroup. +func (p *NodeGroupChangeMetricsProducer) RegisterFailedScaleDown(nodeGroup cloudprovider.NodeGroup, + reason string, currentTime time.Time) { +} + +// NewNodeGroupChangeMetricsProducer returns a new NodeGroupChangeMetricsProducer. +func NewNodeGroupChangeMetricsProducer(cloudProvider cloudprovider.CloudProvider, metrics metricObserver) *NodeGroupChangeMetricsProducer { + return &NodeGroupChangeMetricsProducer{cloudProvider: cloudProvider, metrics: metrics} +} diff --git a/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor.go b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor.go index 59f1eea06160..e3a8c19e96e3 100644 --- a/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor.go +++ b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor.go @@ -105,7 +105,7 @@ func (p *ScaleDownCandidatesDelayProcessor) RegisterScaleDown(nodeGroup cloudpro // RegisterFailedScaleUp records when the last scale up failed for a nodegroup. func (p *ScaleDownCandidatesDelayProcessor) RegisterFailedScaleUp(_ cloudprovider.NodeGroup, - _ string, _ string, _ string, _ string, _ time.Time) { + _ cloudprovider.InstanceErrorInfo, _ time.Time) { } // RegisterFailedScaleDown records failed scale-down for a nodegroup. diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index 3ebf8f586240..1a46976435f7 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -511,7 +511,7 @@ func setupTest(t *testing.T, client *provreqclient.ProvisioningRequestClient, no nil, ) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingCtx.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingCtx.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker, processors.ScaleStateNotifier) clusterState.UpdateNodes(nodes, nodeInfos, now) var injector *provreq.ProvisioningRequestPodsInjector