Skip to content

Commit 8bd628f

Browse files
committed
Add generic CombinedStatus type
1 parent 9c91496 commit 8bd628f

File tree

1 file changed

+240
-0
lines changed

1 file changed

+240
-0
lines changed

cluster-autoscaler/processors/status/scale_up_status_processor.go

+240
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ limitations under the License.
1717
package status
1818

1919
import (
20+
"fmt"
21+
"sort"
22+
"strings"
23+
2024
apiv1 "k8s.io/api/core/v1"
2125
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
2226

@@ -143,3 +147,239 @@ func UpdateScaleUpError(s *ScaleUpStatus, err errors.AutoscalerError) (*ScaleUpS
143147
s.Result = ScaleUpError
144148
return s, err
145149
}
150+
151+
// combinedStatusSet is a helper struct to combine multiple ScaleUpStatuses into one. It keeps track of the best result and all errors that occurred during the ScaleUp process.
152+
type combinedStatusSet struct {
153+
Result ScaleUpResult
154+
ScaleupErrors map[*errors.AutoscalerError]bool
155+
ScaleUpInfosSet map[nodegroupset.ScaleUpInfo]bool
156+
PodsTriggeredScaleUpSet map[*apiv1.Pod]bool
157+
PodsRemainUnschedulableSet map[*NoScaleUpInfo]bool
158+
PodsAwaitEvaluationSet map[*apiv1.Pod]bool
159+
CreateNodeGroupResultsSet map[*nodegroups.CreateNodeGroupResult]bool
160+
ConsideredNodeGroupsSet map[cloudprovider.NodeGroup]bool
161+
FailedCreationNodeGroupsSet map[cloudprovider.NodeGroup]bool
162+
FailedResizeNodeGroupsSet map[cloudprovider.NodeGroup]bool
163+
}
164+
165+
// Add adds a ScaleUpStatus to the combinedStatusSet.
166+
func (c *combinedStatusSet) Add(status *ScaleUpStatus) {
167+
resultPriority := map[ScaleUpResult]int{
168+
ScaleUpNotTried: 0,
169+
ScaleUpNotNeeded: 1,
170+
ScaleUpNoOptionsAvailable: 2,
171+
ScaleUpError: 3,
172+
ScaleUpSuccessful: 4,
173+
}
174+
175+
// If even one scaleUpSuccessful is present, the final result is ScaleUpSuccessful.
176+
// If no ScaleUpSuccessful is present, and even one ScaleUpError is present, the final result is ScaleUpError.
177+
// If no ScaleUpSuccessful or ScaleUpError is present, and even one ScaleUpNoOptionsAvailable is present, the final result is ScaleUpNoOptionsAvailable.
178+
// If no ScaleUpSuccessful, ScaleUpError or ScaleUpNoOptionsAvailable is present, the final result is ScaleUpNotTried.
179+
if resultPriority[c.Result] < resultPriority[status.Result] {
180+
c.Result = status.Result
181+
}
182+
if status.ScaleUpError != nil {
183+
if _, found := c.ScaleupErrors[status.ScaleUpError]; !found {
184+
c.ScaleupErrors[status.ScaleUpError] = true
185+
}
186+
}
187+
if status.ScaleUpInfos != nil {
188+
for _, scaleUpInfo := range status.ScaleUpInfos {
189+
if _, found := c.ScaleUpInfosSet[scaleUpInfo]; !found {
190+
c.ScaleUpInfosSet[scaleUpInfo] = true
191+
}
192+
}
193+
}
194+
if status.PodsTriggeredScaleUp != nil {
195+
for _, pod := range status.PodsTriggeredScaleUp {
196+
if _, found := c.PodsTriggeredScaleUpSet[pod]; !found {
197+
c.PodsTriggeredScaleUpSet[pod] = true
198+
}
199+
}
200+
}
201+
if status.PodsRemainUnschedulable != nil {
202+
for _, pod := range status.PodsRemainUnschedulable {
203+
if _, found := c.PodsRemainUnschedulableSet[&pod]; !found {
204+
c.PodsRemainUnschedulableSet[&pod] = true
205+
}
206+
}
207+
}
208+
if status.PodsAwaitEvaluation != nil {
209+
for _, pod := range status.PodsAwaitEvaluation {
210+
if _, found := c.PodsAwaitEvaluationSet[pod]; !found {
211+
c.PodsAwaitEvaluationSet[pod] = true
212+
}
213+
}
214+
}
215+
if status.CreateNodeGroupResults != nil {
216+
for _, createNodeGroupResult := range status.CreateNodeGroupResults {
217+
if _, found := c.CreateNodeGroupResultsSet[&createNodeGroupResult]; !found {
218+
c.CreateNodeGroupResultsSet[&createNodeGroupResult] = true
219+
}
220+
}
221+
}
222+
if status.ConsideredNodeGroups != nil {
223+
for _, nodeGroup := range status.ConsideredNodeGroups {
224+
if _, found := c.ConsideredNodeGroupsSet[nodeGroup]; !found {
225+
c.ConsideredNodeGroupsSet[nodeGroup] = true
226+
}
227+
}
228+
}
229+
if status.FailedCreationNodeGroups != nil {
230+
for _, nodeGroup := range status.FailedCreationNodeGroups {
231+
if _, found := c.FailedCreationNodeGroupsSet[nodeGroup]; !found {
232+
c.FailedCreationNodeGroupsSet[nodeGroup] = true
233+
}
234+
}
235+
}
236+
if status.FailedResizeNodeGroups != nil {
237+
for _, nodeGroup := range status.FailedResizeNodeGroups {
238+
if _, found := c.FailedResizeNodeGroupsSet[nodeGroup]; !found {
239+
c.FailedResizeNodeGroupsSet[nodeGroup] = true
240+
}
241+
}
242+
}
243+
}
244+
245+
// formatMessageFromBatchErrors formats a message from a list of errors.
246+
func (c *combinedStatusSet) formatMessageFromBatchErrors(errs []errors.AutoscalerError, printErrorTypes bool) string {
247+
firstErr := errs[0]
248+
var builder strings.Builder
249+
builder.WriteString(firstErr.Error())
250+
builder.WriteString(" ...and other concurrent errors: [")
251+
formattedErrs := map[errors.AutoscalerError]bool{
252+
firstErr: true,
253+
}
254+
for _, err := range errs {
255+
if _, has := formattedErrs[err]; has {
256+
continue
257+
}
258+
formattedErrs[err] = true
259+
var message string
260+
if printErrorTypes {
261+
message = fmt.Sprintf("[%s] %s", err.Type(), err.Error())
262+
} else {
263+
message = err.Error()
264+
}
265+
if len(formattedErrs) > 2 {
266+
builder.WriteString(", ")
267+
}
268+
builder.WriteString(fmt.Sprintf("%q", message))
269+
}
270+
builder.WriteString("]")
271+
return builder.String()
272+
}
273+
274+
// combineBatchScaleUpErrors combines multiple errors into one. If there is only one error, it returns that error. If there are multiple errors, it combines them into one error with a message that contains all the errors.
275+
func (c *combinedStatusSet) combineBatchScaleUpErrors() *errors.AutoscalerError {
276+
if len(c.ScaleupErrors) == 0 {
277+
return nil
278+
}
279+
if len(c.ScaleupErrors) == 1 {
280+
for err := range c.ScaleupErrors {
281+
return err
282+
}
283+
}
284+
uniqueMessages := make(map[string]bool)
285+
uniqueTypes := make(map[errors.AutoscalerErrorType]bool)
286+
for err := range c.ScaleupErrors {
287+
uniqueTypes[(*err).Type()] = true
288+
uniqueMessages[(*err).Error()] = true
289+
}
290+
if len(uniqueTypes) == 1 && len(uniqueMessages) == 1 {
291+
for err := range c.ScaleupErrors {
292+
return err
293+
}
294+
}
295+
// sort to stabilize the results and easier log aggregation
296+
errs := make([]errors.AutoscalerError, 0, len(c.ScaleupErrors))
297+
for err := range c.ScaleupErrors {
298+
errs = append(errs, *err)
299+
}
300+
sort.Slice(errs, func(i, j int) bool {
301+
errA := errs[i]
302+
errB := errs[j]
303+
if errA.Type() == errB.Type() {
304+
return errs[i].Error() < errs[j].Error()
305+
}
306+
return errA.Type() < errB.Type()
307+
})
308+
firstErr := errs[0]
309+
printErrorTypes := len(uniqueTypes) > 1
310+
message := c.formatMessageFromBatchErrors(errs, printErrorTypes)
311+
combinedErr := errors.NewAutoscalerError(firstErr.Type(), message)
312+
return &combinedErr
313+
}
314+
315+
// Export converts the combinedStatusSet into a ScaleUpStatus.
316+
func (c *combinedStatusSet) Export() (*ScaleUpStatus, errors.AutoscalerError) {
317+
result := &ScaleUpStatus{Result: c.Result}
318+
if len(c.ScaleupErrors) > 0 {
319+
result.ScaleUpError = c.combineBatchScaleUpErrors()
320+
}
321+
if len(c.ScaleUpInfosSet) > 0 {
322+
for scaleUpInfo := range c.ScaleUpInfosSet {
323+
result.ScaleUpInfos = append(result.ScaleUpInfos, scaleUpInfo)
324+
}
325+
}
326+
if len(c.PodsTriggeredScaleUpSet) > 0 {
327+
for pod := range c.PodsTriggeredScaleUpSet {
328+
result.PodsTriggeredScaleUp = append(result.PodsTriggeredScaleUp, pod)
329+
}
330+
}
331+
if len(c.PodsRemainUnschedulableSet) > 0 {
332+
for pod := range c.PodsRemainUnschedulableSet {
333+
result.PodsRemainUnschedulable = append(result.PodsRemainUnschedulable, *pod)
334+
}
335+
}
336+
if len(c.PodsAwaitEvaluationSet) > 0 {
337+
for pod := range c.PodsAwaitEvaluationSet {
338+
result.PodsAwaitEvaluation = append(result.PodsAwaitEvaluation, pod)
339+
}
340+
}
341+
if len(c.CreateNodeGroupResultsSet) > 0 {
342+
for createNodeGroupResult := range c.CreateNodeGroupResultsSet {
343+
result.CreateNodeGroupResults = append(result.CreateNodeGroupResults, *createNodeGroupResult)
344+
}
345+
}
346+
if len(c.ConsideredNodeGroupsSet) > 0 {
347+
for nodeGroup := range c.ConsideredNodeGroupsSet {
348+
result.ConsideredNodeGroups = append(result.ConsideredNodeGroups, nodeGroup)
349+
}
350+
}
351+
if len(c.FailedCreationNodeGroupsSet) > 0 {
352+
for nodeGroup := range c.FailedCreationNodeGroupsSet {
353+
result.FailedCreationNodeGroups = append(result.FailedCreationNodeGroups, nodeGroup)
354+
}
355+
}
356+
if len(c.FailedResizeNodeGroupsSet) > 0 {
357+
for nodeGroup := range c.FailedResizeNodeGroupsSet {
358+
result.FailedResizeNodeGroups = append(result.FailedResizeNodeGroups, nodeGroup)
359+
}
360+
}
361+
362+
var resErr errors.AutoscalerError
363+
364+
if result.Result == ScaleUpError {
365+
resErr = *result.ScaleUpError
366+
}
367+
368+
return result, resErr
369+
}
370+
371+
// NewCombinedStatusSet creates a new combinedStatusSet.
372+
func NewCombinedStatusSet() combinedStatusSet {
373+
return combinedStatusSet{
374+
Result: ScaleUpNotTried,
375+
ScaleupErrors: make(map[*errors.AutoscalerError]bool),
376+
ScaleUpInfosSet: make(map[nodegroupset.ScaleUpInfo]bool),
377+
PodsTriggeredScaleUpSet: make(map[*apiv1.Pod]bool),
378+
PodsRemainUnschedulableSet: make(map[*NoScaleUpInfo]bool),
379+
PodsAwaitEvaluationSet: make(map[*apiv1.Pod]bool),
380+
CreateNodeGroupResultsSet: make(map[*nodegroups.CreateNodeGroupResult]bool),
381+
ConsideredNodeGroupsSet: make(map[cloudprovider.NodeGroup]bool),
382+
FailedCreationNodeGroupsSet: make(map[cloudprovider.NodeGroup]bool),
383+
FailedResizeNodeGroupsSet: make(map[cloudprovider.NodeGroup]bool),
384+
}
385+
}

0 commit comments

Comments
 (0)