Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: respect multinode consolidation timeout in all cases #2025

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package disruption

import (
"context"
"errors"
"fmt"
"math"
"time"
Expand Down Expand Up @@ -107,6 +108,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB

// firstNConsolidationOption looks at the first N NodeClaims to determine if they can all be consolidated at once. The
// NodeClaims are sorted by increasing disruption order which correlates to likelihood of being able to consolidate the node
// nolint:gocyclo
func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context, candidates []*Candidate, max int) (Command, scheduling.Results, error) {
// we always operate on at least two NodeClaims at once, for single NodeClaims standard consolidation will find all solutions
if len(candidates) < 2 {
Expand All @@ -120,22 +122,27 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
lastSavedCommand := Command{}
lastSavedResults := scheduling.Results{}
// Set a timeout
timeout := m.clock.Now().Add(MultiNodeConsolidationTimeoutDuration)
// binary search to find the maximum number of NodeClaims we can terminate
timeoutCtx, cancel := context.WithDeadline(ctx, m.clock.Now().Add(MultiNodeConsolidationTimeoutDuration))
defer cancel()
for min <= max {
if m.clock.Now().After(timeout) {
// Check for timeout using select
if timeoutCtx.Err() != nil {
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: m.ConsolidationType()})
if lastSavedCommand.candidates == nil {
log.FromContext(ctx).V(1).Info(fmt.Sprintf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2))
} else {
log.FromContext(ctx).V(1).WithValues(lastSavedCommand.LogValues()...).Info("stopping multi-node consolidation after timeout, returning last valid command")
return Command{}, scheduling.Results{}, fmt.Errorf("multi-node consolidation timed out while considering %d nodes without finding a valid command", (min+max)/2)
}
log.FromContext(ctx).V(1).WithValues(lastSavedCommand.LogValues()...).Info("stopping multi-node consolidation after timeout, returning last valid command")
return lastSavedCommand, lastSavedResults, nil
}
mid := (min + max) / 2
candidatesToConsolidate := candidates[0 : mid+1]

cmd, results, err := m.computeConsolidation(ctx, candidatesToConsolidate...)
// Pass the timeout context to ensure sub-operations can be canceled
cmd, results, err := m.computeConsolidation(timeoutCtx, candidatesToConsolidate...)
// context deadline exceeded will return to the top of the loop and either return nothing or the last saved command
if errors.Is(err, context.DeadlineExceeded) {
continue
}
if err != nil {
return Command{}, scheduling.Results{}, err
}
Expand All @@ -149,6 +156,7 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
}

// replacementHasValidInstanceTypes will be false if the replacement action has valid instance types remaining after filtering.

if replacementHasValidInstanceTypes || cmd.Decision() == DeleteDecision {
// We can consolidate NodeClaims [0,mid]
lastSavedCommand = cmd
Expand Down
56 changes: 56 additions & 0 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2022,6 +2022,62 @@ var _ = Describe("Metrics", func() {
"consolidation_type": "multi",
})
})
It("should stop multi-node consolidation after context deadline is reached", func() {
nodeClaims, nodes = test.NodeClaimsAndNodes(3, v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
},
},
Status: v1.NodeClaimStatus{
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("32"),
corev1.ResourcePods: resource.MustParse("100"),
},
},
})
for _, nc := range nodeClaims {
nc.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable)
}
// create our RS so we can link a pod to it
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
pods := test.Pods(4, test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
},
},
})

ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], pods[2], pods[3], nodeClaims[0], nodes[0], nodeClaims[1], nodes[1], nodeClaims[2], nodes[2], nodePool)

// bind pods to nodes
ExpectManualBinding(ctx, env.Client, pods[0], nodes[0])
ExpectManualBinding(ctx, env.Client, pods[1], nodes[1])
ExpectManualBinding(ctx, env.Client, pods[2], nodes[2])
ExpectManualBinding(ctx, env.Client, pods[3], nodes[2])

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{nodes[0], nodes[1], nodes[2]}, []*v1.NodeClaim{nodeClaims[0], nodeClaims[1], nodeClaims[2]})
// create deadline in the past
deadlineCtx, cancel := context.WithDeadline(ctx, fakeClock.Now().Add(-disruption.MultiNodeConsolidationTimeoutDuration))
defer cancel()

ExpectSingletonReconciled(deadlineCtx, disruptionController)
// expect that due to timeout zero nodes were tainted in consolidation
ExpectTaintedNodeCount(ctx, env.Client, 0)
})
})

func leastExpensiveInstanceWithZone(zone string) *cloudprovider.InstanceType {
Expand Down
3 changes: 3 additions & 0 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat

instanceTypes := map[string][]*cloudprovider.InstanceType{}
for _, np := range nodePools {
if ctx.Err() != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this section of code take so much time that we think we need to handle this error at this level? I get that there's a trade-off between the number of times that we write this and how quickly we can respond

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need to check here. It probably makes the most sense to just check the timeout between pods in scheduling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except that this context is timed out and continues on to cloudProvider.GetInstanceTypes. Less that we're handling the error and more that we're silencing spurious logging.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't completely solve it right? I think we just have to handle it generally because we can still race pass this check and fire spurious errors

return nil, fmt.Errorf("context error while getting instance types, %w", ctx.Err())
}
// Get instance type options
its, err := p.cloudProvider.GetInstanceTypes(ctx, np)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
break
}

// when context has been canceled or deadline exceeded, stop attempting to schedule pods and mark current pod as unschedulable
if ctx.Err() != nil {
errors[pod] = ctx.Err()
break
}

// Schedule to existing nodes or create a new node
if errors[pod] = s.add(ctx, pod); errors[pod] == nil {
delete(errors, pod)
Expand Down Expand Up @@ -289,6 +295,10 @@ func (s *Scheduler) updateCachedPodData(p *corev1.Pod) {
}

func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
// Check if context has been canceled or deadline exceeded
if ctx.Err() != nil {
return ctx.Err()
}
// first try to schedule against an in-flight real node
for _, node := range s.existingNodes {
if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodData[pod.UID]); err == nil {
Expand Down