@@ -20,14 +20,17 @@ import (
20
20
"context"
21
21
"fmt"
22
22
"reflect"
23
+ "sort"
23
24
"time"
24
25
25
26
"github.com/pkg/errors"
27
+ "golang.org/x/exp/slices"
26
28
corev1 "k8s.io/api/core/v1"
27
29
apierrors "k8s.io/apimachinery/pkg/api/errors"
28
30
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
31
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30
32
kerrors "k8s.io/apimachinery/pkg/util/errors"
33
+ "k8s.io/apimachinery/pkg/util/intstr"
31
34
"k8s.io/apimachinery/pkg/util/wait"
32
35
"k8s.io/klog/v2"
33
36
"k8s.io/utils/ptr"
@@ -44,6 +47,7 @@ import (
44
47
"sigs.k8s.io/cluster-api/internal/util/ssa"
45
48
"sigs.k8s.io/cluster-api/util"
46
49
"sigs.k8s.io/cluster-api/util/annotations"
50
+ "sigs.k8s.io/cluster-api/util/collections"
47
51
"sigs.k8s.io/cluster-api/util/conditions"
48
52
utilconversion "sigs.k8s.io/cluster-api/util/conversion"
49
53
"sigs.k8s.io/cluster-api/util/labels"
@@ -279,15 +283,18 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *
279
283
// Get the nodeRefsMap from the cluster.
280
284
s .nodeRefMap , getNodeRefsErr = r .getNodeRefMap (ctx , clusterClient )
281
285
282
- err = r .reconcileMachines (ctx , s , infraConfig )
286
+ res := ctrl.Result {}
287
+
288
+ reconcileMachinesRes , err := r .reconcileMachines (ctx , s , infraConfig )
289
+ res = util .LowestNonZeroResult (res , reconcileMachinesRes )
283
290
284
291
if err != nil || getNodeRefsErr != nil {
285
292
return ctrl.Result {}, kerrors .NewAggregate ([]error {errors .Wrapf (err , "failed to reconcile Machines for MachinePool %s" , klog .KObj (mp )), errors .Wrapf (getNodeRefsErr , "failed to get nodeRefs for MachinePool %s" , klog .KObj (mp ))})
286
293
}
287
294
288
295
if ! mp .Status .InfrastructureReady {
289
296
log .Info ("Infrastructure provider is not yet ready" , infraConfig .GetKind (), klog .KObj (infraConfig ))
290
- return ctrl. Result {} , nil
297
+ return res , nil
291
298
}
292
299
293
300
var providerIDList []string
@@ -306,7 +313,7 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *
306
313
307
314
if len (providerIDList ) == 0 && mp .Status .Replicas != 0 {
308
315
log .Info ("Retrieved empty spec.providerIDList from infrastructure provider but status.replicas is not zero." , "replicas" , mp .Status .Replicas )
309
- return ctrl. Result {} , nil
316
+ return res , nil
310
317
}
311
318
312
319
if ! reflect .DeepEqual (mp .Spec .ProviderIDList , providerIDList ) {
@@ -316,7 +323,7 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *
316
323
mp .Status .UnavailableReplicas = mp .Status .Replicas
317
324
}
318
325
319
- return ctrl. Result {} , nil
326
+ return res , nil
320
327
}
321
328
322
329
// reconcileMachines reconciles Machines associated with a MachinePool.
@@ -326,18 +333,18 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *
326
333
// infrastructure is created accordingly.
327
334
// Note: When supported by the cloud provider implementation of the MachinePool, machines will provide a means to interact
328
335
// with the corresponding infrastructure (e.g. delete a specific machine in case MachineHealthCheck detects it is unhealthy).
329
- func (r * MachinePoolReconciler ) reconcileMachines (ctx context.Context , s * scope , infraMachinePool * unstructured.Unstructured ) error {
336
+ func (r * MachinePoolReconciler ) reconcileMachines (ctx context.Context , s * scope , infraMachinePool * unstructured.Unstructured ) (ctrl. Result , error ) {
330
337
log := ctrl .LoggerFrom (ctx )
331
338
mp := s .machinePool
332
339
333
340
var infraMachineKind string
334
341
if err := util .UnstructuredUnmarshalField (infraMachinePool , & infraMachineKind , "status" , "infrastructureMachineKind" ); err != nil {
335
342
if errors .Is (err , util .ErrUnstructuredFieldNotFound ) {
336
343
log .V (4 ).Info ("MachinePool Machines not supported, no infraMachineKind found" )
337
- return nil
344
+ return ctrl. Result {}, nil
338
345
}
339
346
340
- return errors .Wrapf (err , "failed to retrieve infraMachineKind from infrastructure provider for MachinePool %s" , klog .KObj (mp ))
347
+ return ctrl. Result {}, errors .Wrapf (err , "failed to retrieve infraMachineKind from infrastructure provider for MachinePool %s" , klog .KObj (mp ))
341
348
}
342
349
343
350
infraMachineSelector := metav1.LabelSelector {
@@ -354,7 +361,7 @@ func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, s *scope,
354
361
infraMachineList .SetAPIVersion (infraMachinePool .GetAPIVersion ())
355
362
infraMachineList .SetKind (infraMachineKind + "List" )
356
363
if err := r .Client .List (ctx , & infraMachineList , client .InNamespace (mp .Namespace ), client .MatchingLabels (infraMachineSelector .MatchLabels )); err != nil {
357
- return errors .Wrapf (err , "failed to list infra machines for MachinePool %q in namespace %q" , mp .Name , mp .Namespace )
364
+ return ctrl. Result {}, errors .Wrapf (err , "failed to list infra machines for MachinePool %q in namespace %q" , mp .Name , mp .Namespace )
358
365
}
359
366
360
367
// Add watcher for infraMachine, if there isn't one already; this will allow this controller to reconcile
@@ -365,21 +372,26 @@ func (r *MachinePoolReconciler) reconcileMachines(ctx context.Context, s *scope,
365
372
366
373
// Add watcher for infraMachine, if there isn't one already.
367
374
if err := r .externalTracker .Watch (log , sampleInfraMachine , handler .EnqueueRequestsFromMapFunc (r .infraMachineToMachinePoolMapper )); err != nil {
368
- return err
375
+ return ctrl. Result {}, err
369
376
}
370
377
371
378
// Get the list of machines managed by this controller, and align it with the infra machines managed by
372
379
// the InfraMachinePool controller.
373
380
machineList := & clusterv1.MachineList {}
374
381
if err := r .Client .List (ctx , machineList , client .InNamespace (mp .Namespace ), client .MatchingLabels (infraMachineSelector .MatchLabels )); err != nil {
375
- return err
382
+ return ctrl. Result {}, err
376
383
}
377
384
378
385
if err := r .createOrUpdateMachines (ctx , s , machineList .Items , infraMachineList .Items ); err != nil {
379
- return errors .Wrapf (err , "failed to create machines for MachinePool %q in namespace %q" , mp .Name , mp .Namespace )
386
+ return ctrl. Result {}, errors .Wrapf (err , "failed to create machines for MachinePool %q in namespace %q" , mp .Name , mp .Namespace )
380
387
}
381
388
382
- return nil
389
+ res , err := r .reconcileUnhealthyMachines (ctx , s , machineList .Items )
390
+ if err != nil {
391
+ return ctrl.Result {}, errors .Wrapf (err , "failed to reconcile unhealthy machines for MachinePool %s" , klog .KObj (mp ))
392
+ }
393
+
394
+ return res , nil
383
395
}
384
396
385
397
// createOrUpdateMachines creates a MachinePool Machine for each infraMachine if it doesn't already exist and sets the owner reference and infraRef.
@@ -579,3 +591,121 @@ func (r *MachinePoolReconciler) getNodeRefMap(ctx context.Context, c client.Clie
579
591
580
592
return nodeRefsMap , nil
581
593
}
594
+
595
+ func (r * MachinePoolReconciler ) reconcileUnhealthyMachines (ctx context.Context , s * scope , machines []clusterv1.Machine ) (ctrl.Result , error ) {
596
+ if len (machines ) == 0 {
597
+ return ctrl.Result {}, nil
598
+ }
599
+
600
+ log := ctrl .LoggerFrom (ctx )
601
+ mp := s .machinePool
602
+
603
+ machinesWithHealthCheck := slices .DeleteFunc (slices .Clone (machines ), func (machine clusterv1.Machine ) bool {
604
+ return ! conditions .Has (& machine , clusterv1 .MachineHealthCheckSucceededCondition )
605
+ })
606
+ if len (machinesWithHealthCheck ) == 0 {
607
+ // This means there is no MachineHealthCheck selecting any machines
608
+ // of this machine pool. In this case, do not requeue so often,
609
+ // but still check regularly in case a MachineHealthCheck became
610
+ // deployed or activated. This long interval shouldn't be a problem
611
+ // at cluster creation, since newly-created nodes should anyway
612
+ // trigger MachinePool reconciliation as the infrastructure provider
613
+ // creates the InfraMachines.
614
+ log .V (4 ).Info ("Skipping reconciliation of unhealthy MachinePool machines because there are no health-checked machines" )
615
+ return ctrl.Result {RequeueAfter : 10 * time .Minute }, nil
616
+ }
617
+
618
+ unhealthyMachines := slices .DeleteFunc (slices .Clone (machines ), func (machine clusterv1.Machine ) bool {
619
+ return ! collections .IsUnhealthyAndOwnerRemediated (& machine )
620
+ })
621
+ log .V (4 ).Info ("Reconciling unhealthy MachinePool machines" , "unhealthyMachines" , len (unhealthyMachines ))
622
+
623
+ // Calculate how many in flight machines we should remediate.
624
+ // By default, we allow all machines to be remediated at the same time.
625
+ maxInFlight := len (unhealthyMachines )
626
+ if mp .Spec .Strategy != nil && mp .Spec .Strategy .Remediation != nil {
627
+ if mp .Spec .Strategy .Remediation .MaxInFlight != nil {
628
+ var err error
629
+ replicas := int (ptr .Deref (mp .Spec .Replicas , 1 ))
630
+ maxInFlight , err = intstr .GetScaledValueFromIntOrPercent (mp .Spec .Strategy .Remediation .MaxInFlight , replicas , true )
631
+ if err != nil {
632
+ return ctrl.Result {}, fmt .Errorf ("failed to calculate maxInFlight to remediate machines: %v" , err )
633
+ }
634
+ log = log .WithValues ("maxInFlight" , maxInFlight , "replicas" , replicas )
635
+ }
636
+ }
637
+
638
+ machinesToRemediate := make ([]* clusterv1.Machine , 0 , len (unhealthyMachines ))
639
+ inFlight := 0
640
+ for _ , m := range unhealthyMachines {
641
+ if ! m .DeletionTimestamp .IsZero () {
642
+ if conditions .IsTrue (& m , clusterv1 .MachineOwnerRemediatedCondition ) {
643
+ // Machine has been remediated by this controller and still in flight.
644
+ inFlight ++
645
+ }
646
+ continue
647
+ }
648
+ if conditions .IsFalse (& m , clusterv1 .MachineOwnerRemediatedCondition ) {
649
+ machinesToRemediate = append (machinesToRemediate , & m )
650
+ }
651
+ }
652
+ log = log .WithValues ("inFlight" , inFlight )
653
+
654
+ if len (machinesToRemediate ) == 0 {
655
+ // There's a MachineHealthCheck monitoring the machines, but currently
656
+ // no action to be taken. A machine could require remediation at any
657
+ // time, so use a short interval until next reconciliation.
658
+ return ctrl.Result {RequeueAfter : 30 * time .Second }, nil
659
+ }
660
+
661
+ if inFlight >= maxInFlight {
662
+ log .V (3 ).Info ("Remediation strategy is set, and maximum in flight has been reached" , "machinesToBeRemediated" , len (machinesToRemediate ))
663
+
664
+ // Check soon again whether the already-remediating (= deleting) machines are gone
665
+ // so that more machines can be remediated
666
+ return ctrl.Result {RequeueAfter : 15 * time .Second }, nil
667
+ }
668
+
669
+ // Sort the machines from newest to oldest.
670
+ // We are trying to remediate machines failing to come up first because
671
+ // there is a chance that they are not hosting any workloads (minimize disruption).
672
+ sort .SliceStable (machinesToRemediate , func (i , j int ) bool {
673
+ return machinesToRemediate [i ].CreationTimestamp .After (machinesToRemediate [j ].CreationTimestamp .Time )
674
+ })
675
+
676
+ haveMoreMachinesToRemediate := false
677
+ if len (machinesToRemediate ) > (maxInFlight - inFlight ) {
678
+ haveMoreMachinesToRemediate = true
679
+ log .V (5 ).Info ("Remediation strategy is set, limiting in flight operations" , "machinesToBeRemediated" , len (machinesToRemediate ))
680
+ machinesToRemediate = machinesToRemediate [:(maxInFlight - inFlight )]
681
+ }
682
+
683
+ // Remediate unhealthy machines by deleting them
684
+ var errs []error
685
+ for _ , m := range machinesToRemediate {
686
+ log .Info ("Deleting unhealthy Machine" , "Machine" , klog .KObj (m ))
687
+ patch := client .MergeFrom (m .DeepCopy ())
688
+ if err := r .Client .Delete (ctx , m ); err != nil {
689
+ if apierrors .IsNotFound (err ) {
690
+ continue
691
+ }
692
+ errs = append (errs , errors .Wrapf (err , "failed to delete Machine %s" , klog .KObj (m )))
693
+ continue
694
+ }
695
+ conditions .MarkTrue (m , clusterv1 .MachineOwnerRemediatedCondition )
696
+ if err := r .Client .Status ().Patch (ctx , m , patch ); err != nil && ! apierrors .IsNotFound (err ) {
697
+ errs = append (errs , errors .Wrapf (err , "failed to update status of Machine %s" , klog .KObj (m )))
698
+ }
699
+ }
700
+
701
+ if len (errs ) > 0 {
702
+ return ctrl.Result {}, errors .Wrapf (kerrors .NewAggregate (errs ), "failed to delete unhealthy Machines" )
703
+ }
704
+
705
+ if haveMoreMachinesToRemediate {
706
+ // More machines need remediation, so reconcile again sooner
707
+ return ctrl.Result {RequeueAfter : 15 * time .Second }, nil
708
+ }
709
+
710
+ return ctrl.Result {RequeueAfter : 30 * time .Second }, nil
711
+ }
0 commit comments