Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: respect multinode consolidation timeout in all cases #2025

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package disruption

import (
"context"
"errors"
"fmt"
"math"
"time"
Expand Down Expand Up @@ -107,6 +108,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB

// firstNConsolidationOption looks at the first N NodeClaims to determine if they can all be consolidated at once. The
// NodeClaims are sorted by increasing disruption order which correlates to likelihood of being able to consolidate the node
// nolint:gocyclo
func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context, candidates []*Candidate, max int) (Command, scheduling.Results, error) {
// we always operate on at least two NodeClaims at once, for single NodeClaims standard consolidation will find all solutions
if len(candidates) < 2 {
Expand All @@ -120,23 +122,27 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
lastSavedCommand := Command{}
lastSavedResults := scheduling.Results{}
// Set a timeout
timeout := m.clock.Now().Add(MultiNodeConsolidationTimeoutDuration)
// binary search to find the maximum number of NodeClaims we can terminate
timeoutCtx, cancel := context.WithTimeout(ctx, MultiNodeConsolidationTimeoutDuration)
defer cancel()
for min <= max {
if m.clock.Now().After(timeout) {
if timeoutCtx.Err() != nil {
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: m.ConsolidationType()})
if lastSavedCommand.candidates == nil {
log.FromContext(ctx).V(1).Info(fmt.Sprintf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2))
} else {
log.FromContext(ctx).V(1).WithValues(lastSavedCommand.LogValues()...).Info("stopping multi-node consolidation after timeout, returning last valid command")
return Command{}, scheduling.Results{}, fmt.Errorf("multi-node consolidation timed out while considering %d nodes without finding a valid command", (min+max)/2)
}
log.FromContext(ctx).V(1).WithValues(lastSavedCommand.LogValues()...).Info("stopping multi-node consolidation after timeout, returning last valid command")
return lastSavedCommand, lastSavedResults, nil
}
mid := (min + max) / 2
candidatesToConsolidate := candidates[0 : mid+1]

cmd, results, err := m.computeConsolidation(ctx, candidatesToConsolidate...)
// Pass the timeout context to ensure sub-operations can be canceled
cmd, results, err := m.computeConsolidation(timeoutCtx, candidatesToConsolidate...)
// context deadline exceeded will return to the top of the loop and either return nothing or the last saved command
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
continue
}
return Command{}, scheduling.Results{}, err
}

Expand All @@ -149,6 +155,7 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
}

// replacementHasValidInstanceTypes will be false if the replacement action has valid instance types remaining after filtering.

if replacementHasValidInstanceTypes || cmd.Decision() == DeleteDecision {
// We can consolidate NodeClaims [0,mid]
lastSavedCommand = cmd
Expand Down
56 changes: 56 additions & 0 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2022,6 +2022,62 @@ var _ = Describe("Metrics", func() {
"consolidation_type": "multi",
})
})
It("should stop multi-node consolidation after context deadline is reached", func() {
nodeClaims, nodes = test.NodeClaimsAndNodes(3, v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
},
},
Status: v1.NodeClaimStatus{
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("32"),
corev1.ResourcePods: resource.MustParse("100"),
},
},
})
for _, nc := range nodeClaims {
nc.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable)
}
// create our RS so we can link a pod to it
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
pods := test.Pods(4, test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
},
},
})

ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], pods[2], pods[3], nodeClaims[0], nodes[0], nodeClaims[1], nodes[1], nodeClaims[2], nodes[2], nodePool)

// bind pods to nodes
ExpectManualBinding(ctx, env.Client, pods[0], nodes[0])
ExpectManualBinding(ctx, env.Client, pods[1], nodes[1])
ExpectManualBinding(ctx, env.Client, pods[2], nodes[2])
ExpectManualBinding(ctx, env.Client, pods[3], nodes[2])

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{nodes[0], nodes[1], nodes[2]}, []*v1.NodeClaim{nodeClaims[0], nodeClaims[1], nodeClaims[2]})
// create timeout in the past
timeoutCtx, cancel := context.WithTimeout(ctx, -disruption.MultiNodeConsolidationTimeoutDuration)
defer cancel()

ExpectSingletonReconciled(timeoutCtx, disruptionController)
// expect that due to timeout zero nodes were tainted in consolidation
ExpectTaintedNodeCount(ctx, env.Client, 0)
})
})

func leastExpensiveInstanceWithZone(zone string) *cloudprovider.InstanceType {
Expand Down
14 changes: 13 additions & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ func (p *Provisioner) NewScheduler(

instanceTypes := map[string][]*cloudprovider.InstanceType{}
for _, np := range nodePools {
// stop looping through nodepools if context has been canceled to avoid spurious errors from GetInstanceTypes
if ctx.Err() != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this section of code take so much time that we think we need to handle this error at this level? I get that there's a trade-off between the number of times that we write this and how quickly we can respond

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need to check here. It probably makes the most sense to just check the timeout between pods in scheduling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except that this context is timed out and continues on to cloudProvider.GetInstanceTypes. Less that we're handling the error and more that we're silencing spurious logging.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't completely solve it right? I think we just have to handle it generally because we can still race pass this check and fire spurious errors

return nil, fmt.Errorf("context error while getting instance types, %w", ctx.Err())
}
// Get instance type options
its, err := p.cloudProvider.GetInstanceTypes(ctx, np)
if err != nil {
Expand Down Expand Up @@ -326,9 +330,17 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
}), 5),
).Info("deferring scheduling decision for provisionable pod(s) to future simulation due to limited reserved offering capacity")
}
contextErrors := results.ContextDeadlineExceededErrors()
if len(contextErrors) != 0 {
log.FromContext(ctx).V(1).WithValues(
"Pods", pretty.Slice(lo.Map(lo.Keys(contextErrors), func(p *corev1.Pod, _ int) string {
return klog.KRef(p.Namespace, p.Name).String()
}), 5),
).Info("deferring scheduling decision for provisionable pod(s) to future simulation due to hitting provisioning loop timeout")
}
scheduler.UnschedulablePodsCount.Set(
// A reserved offering error doesn't indicate a pod is unschedulable, just that the scheduling decision was deferred.
float64(len(results.PodErrors)-len(reservedOfferingErrors)),
float64(len(results.PodErrors)-len(reservedOfferingErrors)-len(contextErrors)),
map[string]string{
scheduler.ControllerLabel: injection.GetControllerName(ctx),
},
Expand Down
13 changes: 13 additions & 0 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package scheduling
import (
"bytes"
"context"
"errors"
"fmt"
"sort"
"time"
Expand Down Expand Up @@ -211,6 +212,12 @@ func (r Results) ReservedOfferingErrors() map[*corev1.Pod]error {
})
}

func (r Results) ContextDeadlineExceededErrors() map[*corev1.Pod]error {
return lo.PickBy(r.PodErrors, func(_ *corev1.Pod, err error) bool {
return errors.Is(err, context.DeadlineExceeded)
})
}

// AllNonPendingPodsScheduled returns true if all pods scheduled.
// We don't care if a pod was pending before consolidation and will still be pending after. It may be a pod that we can't
// schedule at all and don't want it to block consolidation.
Expand Down Expand Up @@ -299,6 +306,12 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
break
}

// if a context error is encountered, stop attempting to schedule pods and add error to remaining pods
if ctx.Err() != nil {
errors[pod] = ctx.Err()
continue
}

// Schedule to existing nodes or create a new node
if errors[pod] = s.add(ctx, pod); errors[pod] == nil {
delete(errors, pod)
Expand Down
Loading