Skip to content

Commit 88725c0

Browse files
committed
Extract the RegisterFailedScaleUp metric generation into a separate NodeGroupChangeObserver instance
1 parent 7978c9e commit 88725c0

File tree

10 files changed

+235
-148
lines changed

10 files changed

+235
-148
lines changed

cluster-autoscaler/clusterstate/clusterstate.go

Lines changed: 80 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
3030
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
3131
"k8s.io/autoscaler/cluster-autoscaler/metrics"
32+
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
3233
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
3334
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups"
3435
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
@@ -111,15 +112,14 @@ type UnregisteredNode struct {
111112
UnregisteredSince time.Time
112113
}
113114

114-
// ScaleUpFailure contains information about a failure of a scale-up.
115-
type ScaleUpFailure struct {
116-
NodeGroup cloudprovider.NodeGroup
117-
Reason metrics.FailedScaleUpReason
118-
Time time.Time
119-
}
120-
121-
type metricObserver interface {
122-
RegisterFailedScaleUp(reason metrics.FailedScaleUpReason, gpuResourceName, gpuType string)
115+
// scaleUpFailure contains information about a failure of a scale-up.
116+
type scaleUpFailure struct {
117+
nodeGroup cloudprovider.NodeGroup
118+
reason metrics.FailedScaleUpReason
119+
errorInfo cloudprovider.InstanceErrorInfo
120+
gpuResourceName string
121+
gpuType string
122+
time time.Time
123123
}
124124

125125
// ClusterStateRegistry is a structure to keep track the current state of the cluster.
@@ -148,11 +148,11 @@ type ClusterStateRegistry struct {
148148
interrupt chan struct{}
149149
nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor
150150
asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker
151-
metrics metricObserver
152-
153151
// scaleUpFailures contains information about scale-up failures for each node group. It should be
154152
// cleared periodically to avoid unnecessary accumulation.
155-
scaleUpFailures map[string][]ScaleUpFailure
153+
scaleUpFailures map[string][]scaleUpFailure
154+
155+
scaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList
156156
}
157157

158158
// NodeGroupScalingSafety contains information about the safety of the node group to scale up/down.
@@ -163,11 +163,7 @@ type NodeGroupScalingSafety struct {
163163
}
164164

165165
// NewClusterStateRegistry creates new ClusterStateRegistry.
166-
func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker) *ClusterStateRegistry {
167-
return newClusterStateRegistry(cloudProvider, config, logRecorder, backoff, nodeGroupConfigProcessor, asyncNodeGroupStateChecker, metrics.DefaultMetrics)
168-
}
169-
170-
func newClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker, metrics metricObserver) *ClusterStateRegistry {
166+
func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff, nodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor, asyncNodeGroupStateChecker asyncnodegroups.AsyncNodeGroupStateChecker, scaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList) *ClusterStateRegistry {
171167
return &ClusterStateRegistry{
172168
scaleUpRequests: make(map[string]*ScaleUpRequest),
173169
scaleDownRequests: make([]*ScaleDownRequest, 0),
@@ -185,10 +181,10 @@ func newClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config C
185181
logRecorder: logRecorder,
186182
cloudProviderNodeInstancesCache: utils.NewCloudProviderNodeInstancesCache(cloudProvider),
187183
interrupt: make(chan struct{}),
188-
scaleUpFailures: make(map[string][]ScaleUpFailure),
184+
scaleUpFailures: make(map[string][]scaleUpFailure),
189185
nodeGroupConfigProcessor: nodeGroupConfigProcessor,
190186
asyncNodeGroupStateChecker: asyncNodeGroupStateChecker,
191-
metrics: metrics,
187+
scaleStateNotifier: scaleStateNotifier,
192188
}
193189
}
194190

@@ -282,10 +278,11 @@ func (csr *ClusterStateRegistry) RegisterScaleDown(nodeGroup cloudprovider.NodeG
282278
}
283279

284280
// To be executed under a lock.
285-
func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
281+
func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) []scaleUpFailure {
286282
// clean up stale backoff info
287283
csr.backoff.RemoveStaleBackoffData(currentTime)
288284

285+
var failedScaleUps []scaleUpFailure
289286
for nodeGroupName, scaleUpRequest := range csr.scaleUpRequests {
290287
if csr.asyncNodeGroupStateChecker.IsUpcoming(scaleUpRequest.NodeGroup) {
291288
continue
@@ -312,11 +309,19 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
312309
} else {
313310
gpuResource, gpuType = gpu.GetGpuInfoForMetrics(csr.cloudProvider.GetNodeGpuConfig(nodeInfo.Node()), availableGPUTypes, nodeInfo.Node(), scaleUpRequest.NodeGroup)
314311
}
315-
csr.registerFailedScaleUpNoLock(scaleUpRequest.NodeGroup, metrics.Timeout, cloudprovider.InstanceErrorInfo{
316-
ErrorClass: cloudprovider.OtherErrorClass,
317-
ErrorCode: "timeout",
318-
ErrorMessage: fmt.Sprintf("Scale-up timed out for node group %v after %v", nodeGroupName, currentTime.Sub(scaleUpRequest.Time)),
319-
}, gpuResource, gpuType, currentTime)
312+
313+
failedScaleUps = append(failedScaleUps, scaleUpFailure{
314+
nodeGroup: scaleUpRequest.NodeGroup,
315+
reason: metrics.Timeout,
316+
errorInfo: cloudprovider.InstanceErrorInfo{
317+
ErrorClass: cloudprovider.OtherErrorClass,
318+
ErrorCode: "timeout",
319+
ErrorMessage: fmt.Sprintf("Scale-up timed out for node group %v after %v", nodeGroupName, currentTime.Sub(scaleUpRequest.Time)),
320+
},
321+
gpuResourceName: gpuResource,
322+
gpuType: gpuType,
323+
time: currentTime,
324+
})
320325
delete(csr.scaleUpRequests, nodeGroupName)
321326
}
322327
}
@@ -328,6 +333,7 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
328333
}
329334
}
330335
csr.scaleDownRequests = newScaleDownRequests
336+
return failedScaleUps
331337
}
332338

333339
// To be executed under a lock.
@@ -340,27 +346,21 @@ func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroup cloudprovider.NodeGr
340346
// RegisterFailedScaleUp should be called after getting error from cloudprovider
341347
// when trying to scale-up node group. It will mark this group as not safe to autoscale
342348
// for some time.
343-
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason string, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) {
349+
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, errorInfo cloudprovider.InstanceErrorInfo, gpuResourceName, gpuType string, currentTime time.Time) {
344350
csr.Lock()
345351
defer csr.Unlock()
346-
csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(reason), cloudprovider.InstanceErrorInfo{
347-
ErrorClass: cloudprovider.OtherErrorClass,
348-
ErrorCode: string(reason),
349-
ErrorMessage: errorMessage,
350-
}, gpuResourceName, gpuType, currentTime)
352+
csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], scaleUpFailure{
353+
nodeGroup: nodeGroup,
354+
reason: metrics.FailedScaleUpReason(errorInfo.ErrorCode),
355+
time: currentTime})
356+
csr.backoffNodeGroup(nodeGroup, errorInfo, currentTime)
351357
}
352358

353359
// RegisterFailedScaleDown records failed scale-down for a nodegroup.
354360
// We don't need to implement this function for cluster state registry
355361
func (csr *ClusterStateRegistry) RegisterFailedScaleDown(_ cloudprovider.NodeGroup, _ string, _ time.Time) {
356362
}
357363

358-
func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorInfo cloudprovider.InstanceErrorInfo, gpuResourceName, gpuType string, currentTime time.Time) {
359-
csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{NodeGroup: nodeGroup, Reason: reason, Time: currentTime})
360-
csr.metrics.RegisterFailedScaleUp(reason, gpuResourceName, gpuType)
361-
csr.backoffNodeGroup(nodeGroup, errorInfo, currentTime)
362-
}
363-
364364
// UpdateNodes updates the state of the nodes in the ClusterStateRegistry and recalculates the stats
365365
func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGroups map[string]*framework.NodeInfo, currentTime time.Time) error {
366366
csr.updateNodeGroupMetrics()
@@ -374,12 +374,27 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr
374374
if err != nil {
375375
return err
376376
}
377+
scaleUpFailures := csr.updateClusterStateRegistry(
378+
nodes,
379+
nodeInfosForGroups,
380+
cloudProviderNodeInstances,
381+
currentTime,
382+
targetSizes,
383+
)
384+
for _, failure := range scaleUpFailures {
385+
csr.scaleStateNotifier.RegisterFailedScaleUp(failure.nodeGroup, failure.errorInfo, failure.gpuResourceName, failure.gpuType, failure.time)
386+
}
387+
return nil
388+
}
389+
390+
func (csr *ClusterStateRegistry) updateClusterStateRegistry(nodes []*apiv1.Node,
391+
nodeInfosForGroups map[string]*framework.NodeInfo,
392+
cloudProviderNodeInstances map[string][]cloudprovider.Instance, currentTime time.Time, targetSizes map[string]int) []scaleUpFailure {
377393
cloudProviderNodesRemoved := csr.getCloudProviderDeletedNodes(nodes)
378394
notRegistered := getNotRegisteredNodes(nodes, cloudProviderNodeInstances, currentTime)
379395

380396
csr.Lock()
381397
defer csr.Unlock()
382-
383398
csr.nodes = nodes
384399
csr.nodeInfosForGroups = nodeInfosForGroups
385400
csr.previousCloudProviderNodeInstances = csr.cloudProviderNodeInstances
@@ -392,12 +407,12 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr
392407
// update acceptable ranges based on requests from last loop and targetSizes
393408
// updateScaleRequests relies on acceptableRanges being up to date
394409
csr.updateAcceptableRanges(targetSizes)
395-
csr.updateScaleRequests(currentTime)
396-
csr.handleInstanceCreationErrors(currentTime)
410+
scaleUpFailures := csr.updateScaleRequests(currentTime)
411+
scaleUpFailures = append(scaleUpFailures, csr.handleInstanceCreationErrors(currentTime)...)
397412
// recalculate acceptable ranges after removing timed out requests
398413
csr.updateAcceptableRanges(targetSizes)
399414
csr.updateIncorrectNodeGroupSizes(currentTime)
400-
return nil
415+
return scaleUpFailures
401416
}
402417

403418
// Recalculate cluster state after scale-ups or scale-downs were registered.
@@ -1132,23 +1147,25 @@ func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetS
11321147
return currentSize, targetSize
11331148
}
11341149

1135-
func (csr *ClusterStateRegistry) handleInstanceCreationErrors(currentTime time.Time) {
1150+
func (csr *ClusterStateRegistry) handleInstanceCreationErrors(currentTime time.Time) []scaleUpFailure {
11361151
nodeGroups := csr.getRunningNodeGroups()
11371152

1153+
var failedScaleUps []scaleUpFailure
11381154
for _, nodeGroup := range nodeGroups {
1139-
csr.handleInstanceCreationErrorsForNodeGroup(
1155+
failedScaleUps = append(failedScaleUps, csr.handleInstanceCreationErrorsForNodeGroup(
11401156
nodeGroup,
11411157
csr.cloudProviderNodeInstances[nodeGroup.Id()],
11421158
csr.previousCloudProviderNodeInstances[nodeGroup.Id()],
1143-
currentTime)
1159+
currentTime)...)
11441160
}
1161+
return failedScaleUps
11451162
}
11461163

11471164
func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup(
11481165
nodeGroup cloudprovider.NodeGroup,
11491166
currentInstances []cloudprovider.Instance,
11501167
previousInstances []cloudprovider.Instance,
1151-
currentTime time.Time) {
1168+
currentTime time.Time) []scaleUpFailure {
11521169

11531170
_, currentUniqueErrorMessagesForErrorCode, currentErrorCodeToInstance := csr.buildInstanceToErrorCodeMappings(currentInstances)
11541171
previousInstanceToErrorCode, _, _ := csr.buildInstanceToErrorCodeMappings(previousInstances)
@@ -1159,6 +1176,7 @@ func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup(
11591176
}
11601177
}
11611178

1179+
var failedScaleUps []scaleUpFailure
11621180
// If node group is scaling up and there are new node-create requests which cannot be satisfied because of
11631181
// out-of-resources errors we:
11641182
// - emit event
@@ -1195,13 +1213,21 @@ func (csr *ClusterStateRegistry) handleInstanceCreationErrorsForNodeGroup(
11951213
// Decrease the scale up request by the number of deleted nodes
11961214
csr.registerOrUpdateScaleUpNoLock(nodeGroup, -len(unseenInstanceIds), currentTime)
11971215

1198-
csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(errorCode.code), cloudprovider.InstanceErrorInfo{
1199-
ErrorClass: errorCode.class,
1200-
ErrorCode: errorCode.code,
1201-
ErrorMessage: csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]),
1202-
}, gpuResource, gpuType, currentTime)
1216+
failedScaleUps = append(failedScaleUps, scaleUpFailure{
1217+
nodeGroup: nodeGroup,
1218+
reason: metrics.FailedScaleUpReason(errorCode.code),
1219+
errorInfo: cloudprovider.InstanceErrorInfo{
1220+
ErrorClass: errorCode.class,
1221+
ErrorCode: errorCode.code,
1222+
ErrorMessage: csr.buildErrorMessageEventString(currentUniqueErrorMessagesForErrorCode[errorCode]),
1223+
},
1224+
gpuResourceName: gpuResource,
1225+
gpuType: gpuType,
1226+
time: currentTime,
1227+
})
12031228
}
12041229
}
1230+
return failedScaleUps
12051231
}
12061232

12071233
func (csr *ClusterStateRegistry) buildErrorMessageEventString(uniqErrorMessages []string) string {
@@ -1311,14 +1337,14 @@ func (csr *ClusterStateRegistry) PeriodicCleanup() {
13111337
func (csr *ClusterStateRegistry) clearScaleUpFailures() {
13121338
csr.Lock()
13131339
defer csr.Unlock()
1314-
csr.scaleUpFailures = make(map[string][]ScaleUpFailure)
1340+
csr.scaleUpFailures = make(map[string][]scaleUpFailure)
13151341
}
13161342

1317-
// GetScaleUpFailures returns the scale-up failures map.
1318-
func (csr *ClusterStateRegistry) GetScaleUpFailures() map[string][]ScaleUpFailure {
1343+
// getScaleUpFailures returns the scale-up failures map.
1344+
func (csr *ClusterStateRegistry) getScaleUpFailures() map[string][]scaleUpFailure {
13191345
csr.Lock()
13201346
defer csr.Unlock()
1321-
result := make(map[string][]ScaleUpFailure)
1347+
result := make(map[string][]scaleUpFailure)
13221348
for nodeGroupId, failures := range csr.scaleUpFailures {
13231349
result[nodeGroupId] = failures
13241350
}

0 commit comments

Comments
 (0)