Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 76 additions & 50 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,18 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, controlPl
return result, err
}

if err := r.syncMachines(ctx, controlPlane); err != nil {
stopReconcile, err := r.syncMachines(ctx, controlPlane)
if err != nil {
// Note: If any of the calls got a NotFound error, it means that at least one Machine got deleted.
// Let's return here so that the next Reconcile will get the updated list of Machines.
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil // Note: Requeue is not needed, changes to Machines trigger another reconcile.
}
return ctrl.Result{}, errors.Wrap(err, "failed to sync Machines")
}
if stopReconcile {
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil // Explicitly requeue as we are not watching all objects.
}

// Aggregate the operational state of all the machines; while aggregating we are adding the
// source ref (reason@machine/name) so the problem can be easily tracked down to its source machine.
Expand Down Expand Up @@ -845,16 +849,17 @@ func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(_ context.C
}

// syncMachines updates Machines, InfrastructureMachines and KubeadmConfigs to propagate in-place mutable fields from KCP.
func (r *KubeadmControlPlaneReconciler) syncMachines(ctx context.Context, controlPlane *internal.ControlPlane) error {
func (r *KubeadmControlPlaneReconciler) syncMachines(ctx context.Context, controlPlane *internal.ControlPlane) (bool, error) {
patchHelpers := map[string]*patch.Helper{}
var anyManagedFieldIssueMitigated bool
for machineName := range controlPlane.Machines {
m := controlPlane.Machines[machineName]
// If the Machine is already being deleted, we only need to sync
// the subset of fields that impact tearing down the Machine.
if !m.DeletionTimestamp.IsZero() {
patchHelper, err := patch.NewHelper(m, r.Client)
if err != nil {
return err
return true, err
}

// Set all other in-place mutable fields that impact the ability to tear down existing machines.
Expand All @@ -865,84 +870,105 @@ func (r *KubeadmControlPlaneReconciler) syncMachines(ctx context.Context, contro

// Note: We intentionally don't set "minReadySeconds" on Machines because we consider it enough to have machine availability driven by readiness of control plane components.
if err := patchHelper.Patch(ctx, m); err != nil {
return err
return true, err
}

controlPlane.Machines[machineName] = m
patchHelper, err = patch.NewHelper(m, r.Client)
if err != nil {
return err
return true, err
}
patchHelpers[machineName] = patchHelper
continue
}

// Update Machine to propagate in-place mutable fields from KCP.
updatedMachine, err := r.updateMachine(ctx, m, controlPlane.KCP, controlPlane.Cluster)
if err != nil {
return errors.Wrapf(err, "failed to update Machine: %s", klog.KObj(m))
}
// Note: Ensure ControlPlane has the latest version of the Machine. This is required because
// e.g. the in-place update code that is called later has to use the latest version of the Machine.
controlPlane.Machines[machineName] = updatedMachine
if _, ok := controlPlane.MachinesNotUpToDate[machineName]; ok {
controlPlane.MachinesNotUpToDate[machineName] = updatedMachine
}
// Since the machine is updated, re-create the patch helper so that any subsequent
// Patch calls use the correct base machine object to calculate the diffs.
// Example: reconcileControlPlaneAndMachinesConditions patches the machine objects in a subsequent call
// and, it should use the updated machine to calculate the diff.
// Note: If the patchHelpers are not re-computed based on the new updated machines, subsequent
// Patch calls will fail because the patch will be calculated based on an outdated machine and will error
// because of outdated resourceVersion.
// TODO: This should be cleaned-up to have a more streamline way of constructing and using patchHelpers.
patchHelper, err := patch.NewHelper(updatedMachine, r.Client)
managedFieldIssueMitigated, err := ssa.MitigateManagedFieldsIssue(ctx, r.Client, m, kcpManagerName)
if err != nil {
return err
return true, err
}
anyManagedFieldIssueMitigated = anyManagedFieldIssueMitigated || managedFieldIssueMitigated
if !anyManagedFieldIssueMitigated {
// Update Machine to propagate in-place mutable fields from KCP.
updatedMachine, err := r.updateMachine(ctx, m, controlPlane.KCP, controlPlane.Cluster)
if err != nil {
return true, errors.Wrapf(err, "failed to update Machine: %s", klog.KObj(m))
}
// Note: Ensure ControlPlane has the latest version of the Machine. This is required because
// e.g. the in-place update code that is called later has to use the latest version of the Machine.
controlPlane.Machines[machineName] = updatedMachine
if _, ok := controlPlane.MachinesNotUpToDate[machineName]; ok {
controlPlane.MachinesNotUpToDate[machineName] = updatedMachine
}
// Since the machine is updated, re-create the patch helper so that any subsequent
// Patch calls use the correct base machine object to calculate the diffs.
// Example: reconcileControlPlaneAndMachinesConditions patches the machine objects in a subsequent call
// and, it should use the updated machine to calculate the diff.
// Note: If the patchHelpers are not re-computed based on the new updated machines, subsequent
// Patch calls will fail because the patch will be calculated based on an outdated machine and will error
// because of outdated resourceVersion.
// TODO: This should be cleaned-up to have a more streamline way of constructing and using patchHelpers.
patchHelper, err := patch.NewHelper(updatedMachine, r.Client)
if err != nil {
return true, err
}
patchHelpers[machineName] = patchHelper
}
patchHelpers[machineName] = patchHelper

infraMachine, infraMachineFound := controlPlane.InfraResources[machineName]
// Only update the InfraMachine if it is already found, otherwise just skip it.
// This could happen e.g. if the cache is not up-to-date yet.
if infraMachineFound {
// Drop managedFields for manager:Update and capi-kubeadmcontrolplane:Apply for all objects created with CAPI <= v1.11.
// Starting with CAPI v1.12 we have a new managedField structure where capi-kubeadmcontrolplane-metadata will own
// labels and annotations and capi-kubeadmcontrolplane everything else.
// Note: We have to call ssa.MigrateManagedFields for every Machine created with CAPI <= v1.11 once.
// Given that this was introduced in CAPI v1.12 and our n-3 upgrade policy this can
// be removed with CAPI v1.15.
if err := ssa.MigrateManagedFields(ctx, r.Client, infraMachine, kcpManagerName, kcpMetadataManagerName); err != nil {
return errors.Wrapf(err, "failed to clean up managedFields of InfrastructureMachine %s", klog.KObj(infraMachine))
managedFieldIssueMitigated, err = ssa.MitigateManagedFieldsIssue(ctx, r.Client, infraMachine, kcpMetadataManagerName)
if err != nil {
return true, err
}
// Update in-place mutating fields on InfrastructureMachine.
if err := r.updateLabelsAndAnnotations(ctx, infraMachine, infraMachine.GroupVersionKind(), controlPlane.KCP, controlPlane.Cluster); err != nil {
return errors.Wrapf(err, "failed to update InfrastructureMachine %s", klog.KObj(infraMachine))
anyManagedFieldIssueMitigated = anyManagedFieldIssueMitigated || managedFieldIssueMitigated
if !anyManagedFieldIssueMitigated {
// Drop managedFields for manager:Update and capi-kubeadmcontrolplane:Apply for all objects created with CAPI <= v1.11.
// Starting with CAPI v1.12 we have a new managedField structure where capi-kubeadmcontrolplane-metadata will own
// labels and annotations and capi-kubeadmcontrolplane everything else.
// Note: We have to call ssa.MigrateManagedFields for every Machine created with CAPI <= v1.11 once.
// Given that this was introduced in CAPI v1.12 and our n-3 upgrade policy this can
// be removed with CAPI v1.15.
if err := ssa.MigrateManagedFields(ctx, r.Client, infraMachine, kcpManagerName, kcpMetadataManagerName); err != nil {
return true, errors.Wrapf(err, "failed to clean up managedFields of InfrastructureMachine %s", klog.KObj(infraMachine))
}
// Update in-place mutating fields on InfrastructureMachine.
if err := r.updateLabelsAndAnnotations(ctx, infraMachine, infraMachine.GroupVersionKind(), controlPlane.KCP, controlPlane.Cluster); err != nil {
return true, errors.Wrapf(err, "failed to update InfrastructureMachine %s", klog.KObj(infraMachine))
}
}
}

kubeadmConfig, kubeadmConfigFound := controlPlane.KubeadmConfigs[machineName]
// Only update the KubeadmConfig if it is already found, otherwise just skip it.
// This could happen e.g. if the cache is not up-to-date yet.
if kubeadmConfigFound {
// Drop managedFields for manager:Update and capi-kubeadmcontrolplane:Apply for all objects created with CAPI <= v1.11.
// Starting with CAPI v1.12 we have a new managedField structure where capi-kubeadmcontrolplane-metadata will own
// labels and annotations and capi-kubeadmcontrolplane everything else.
// Note: We have to call ssa.MigrateManagedFields for every Machine created with CAPI <= v1.11 once.
// Given that this was introduced in CAPI v1.12 and our n-3 upgrade policy this can
// be removed with CAPI v1.15.
if err := ssa.MigrateManagedFields(ctx, r.Client, kubeadmConfig, kcpManagerName, kcpMetadataManagerName); err != nil {
return errors.Wrapf(err, "failed to clean up managedFields of KubeadmConfig %s", klog.KObj(kubeadmConfig))
managedFieldIssueMitigated, err = ssa.MitigateManagedFieldsIssue(ctx, r.Client, kubeadmConfig, kcpMetadataManagerName)
if err != nil {
return true, err
}
// Update in-place mutating fields on BootstrapConfig.
if err := r.updateLabelsAndAnnotations(ctx, kubeadmConfig, bootstrapv1.GroupVersion.WithKind("KubeadmConfig"), controlPlane.KCP, controlPlane.Cluster); err != nil {
return errors.Wrapf(err, "failed to update KubeadmConfig %s", klog.KObj(kubeadmConfig))
anyManagedFieldIssueMitigated = anyManagedFieldIssueMitigated || managedFieldIssueMitigated
if !anyManagedFieldIssueMitigated {
// Drop managedFields for manager:Update and capi-kubeadmcontrolplane:Apply for all objects created with CAPI <= v1.11.
// Starting with CAPI v1.12 we have a new managedField structure where capi-kubeadmcontrolplane-metadata will own
// labels and annotations and capi-kubeadmcontrolplane everything else.
// Note: We have to call ssa.MigrateManagedFields for every Machine created with CAPI <= v1.11 once.
// Given that this was introduced in CAPI v1.12 and our n-3 upgrade policy this can
// be removed with CAPI v1.15.
if err := ssa.MigrateManagedFields(ctx, r.Client, kubeadmConfig, kcpManagerName, kcpMetadataManagerName); err != nil {
return true, errors.Wrapf(err, "failed to clean up managedFields of KubeadmConfig %s", klog.KObj(kubeadmConfig))
}
// Update in-place mutating fields on BootstrapConfig.
if err := r.updateLabelsAndAnnotations(ctx, kubeadmConfig, bootstrapv1.GroupVersion.WithKind("KubeadmConfig"), controlPlane.KCP, controlPlane.Cluster); err != nil {
return true, errors.Wrapf(err, "failed to update KubeadmConfig %s", klog.KObj(kubeadmConfig))
}
}
}
}
// Update the patch helpers.
controlPlane.SetPatchHelpers(patchHelpers)
return nil
return anyManagedFieldIssueMitigated, nil
}

// reconcileControlPlaneAndMachinesConditions is responsible of reconciling conditions reporting the status of static pods and
Expand Down
8 changes: 6 additions & 2 deletions controlplane/kubeadm/internal/controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2069,7 +2069,9 @@ func TestKubeadmControlPlaneReconciler_syncMachines(t *testing.T) {
SecretCachingClient: secretCachingClient,
ssaCache: ssa.NewCache("test-controller"),
}
g.Expect(reconciler.syncMachines(ctx, controlPlane)).To(Succeed())
stopReconcile, err := reconciler.syncMachines(ctx, controlPlane)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(stopReconcile).To(BeFalse())

updatedInPlaceMutatingMachine := inPlaceMutatingMachine.DeepCopy()
g.Eventually(func(g Gomega) {
Expand Down Expand Up @@ -2145,7 +2147,9 @@ func TestKubeadmControlPlaneReconciler_syncMachines(t *testing.T) {
kcp.Spec.MachineTemplate.Spec.Deletion.NodeDeletionTimeoutSeconds = duration10s
kcp.Spec.MachineTemplate.Spec.Deletion.NodeVolumeDetachTimeoutSeconds = duration10s
controlPlane.KCP = kcp
g.Expect(reconciler.syncMachines(ctx, controlPlane)).To(Succeed())
stopReconcile, err = reconciler.syncMachines(ctx, controlPlane)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(stopReconcile).To(BeFalse())

// Verify in-place mutable fields are updated on the Machine.
updatedInPlaceMutatingMachine = inPlaceMutatingMachine.DeepCopy()
Expand Down
52 changes: 51 additions & 1 deletion controlplane/kubeadm/internal/controllers/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"encoding/json"
"strings"
"testing"

Expand All @@ -28,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -457,7 +459,9 @@ func TestCloneConfigsAndGenerateMachineAndSyncMachines(t *testing.T) {

controlPlane, err := internal.NewControlPlane(ctx, r.managementCluster, r.Client, cluster, kcp, collections.FromMachines(&m))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(r.syncMachines(ctx, controlPlane)).To(Succeed())
stopReconcile, err := r.syncMachines(ctx, controlPlane)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(stopReconcile).To(BeFalse())

// Verify managedFields again.
infraObj, err = external.GetObjectFromContractVersionedRef(ctx, env.GetAPIReader(), m.Spec.InfrastructureRef, m.Namespace)
Expand Down Expand Up @@ -529,6 +533,52 @@ func TestCloneConfigsAndGenerateMachineAndSyncMachines(t *testing.T) {
}
}}`,
}})))

// Purge managedFields from objects.
jsonPatch := []map[string]interface{}{
{
"op": "replace",
"path": "/metadata/managedFields",
"value": []metav1.ManagedFieldsEntry{{}},
},
}
patch, err := json.Marshal(jsonPatch)
g.Expect(err).ToNot(HaveOccurred())
for _, object := range []client.Object{&m, infraObj, kubeadmConfig} {
g.Expect(env.Client.Patch(ctx, object, client.RawPatch(types.JSONPatchType, patch))).To(Succeed())
g.Expect(object.GetManagedFields()).To(BeEmpty())
}

// syncMachines to run mitigation code.
controlPlane.Machines[m.Name] = &m
controlPlane.InfraResources[infraObj.GetName()] = infraObj
controlPlane.KubeadmConfigs[kubeadmConfig.Name] = kubeadmConfig
stopReconcile, err = r.syncMachines(ctx, controlPlane)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(stopReconcile).To(BeTrue())

// verify mitigation worked
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(&m), &m)).To(Succeed())
g.Expect(cleanupTime(m.GetManagedFields())).To(ConsistOf(toManagedFields([]managedFieldEntry{{
APIVersion: clusterv1.GroupVersion.String(),
Manager: kcpManagerName, // matches manager of next Apply.
Operation: metav1.ManagedFieldsOperationApply,
FieldsV1: `{"f:metadata":{"f:name":{}}}`,
}})))
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(infraObj), infraObj)).To(Succeed())
g.Expect(cleanupTime(infraObj.GetManagedFields())).To(ConsistOf(toManagedFields([]managedFieldEntry{{
APIVersion: infraObj.GetAPIVersion(),
Manager: kcpMetadataManagerName, // matches manager of next Apply.
Operation: metav1.ManagedFieldsOperationApply,
FieldsV1: `{"f:metadata":{"f:name":{}}}`,
}})))
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(kubeadmConfig), kubeadmConfig)).To(Succeed())
g.Expect(cleanupTime(kubeadmConfig.GetManagedFields())).To(ConsistOf(toManagedFields([]managedFieldEntry{{
APIVersion: bootstrapv1.GroupVersion.String(),
Manager: kcpMetadataManagerName, // matches manager of next Apply.
Operation: metav1.ManagedFieldsOperationApply,
FieldsV1: `{"f:metadata":{"f:name":{}}}`,
}})))
}

func TestCloneConfigsAndGenerateMachineFailInfraMachineCreation(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions controlplane/kubeadm/internal/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ func (r *KubeadmControlPlaneReconciler) preflightChecks(ctx context.Context, con
// Block when we expect an upgrade to be propagated for topology clusters.
// NOTE: in case the cluster is performing an upgrade, allow creation of machines for the intermediate step.
hasSameVersionOfCurrentUpgradeStep := false
if version, ok := controlPlane.Cluster.GetAnnotations()[clusterv1.ClusterTopologyUpgradeStepAnnotation]; ok {
if version, ok := controlPlane.Cluster.GetAnnotations()[clusterv1.ClusterTopologyUpgradeStepAnnotation]; ok && version != "" {
hasSameVersionOfCurrentUpgradeStep = version == controlPlane.KCP.Spec.Version
}

if controlPlane.Cluster.Spec.Topology.IsDefined() && controlPlane.Cluster.Spec.Topology.Version != controlPlane.KCP.Spec.Version && !hasSameVersionOfCurrentUpgradeStep {
v := controlPlane.Cluster.Spec.Topology.Version
if version, ok := controlPlane.Cluster.GetAnnotations()[clusterv1.ClusterTopologyUpgradeStepAnnotation]; ok {
if version, ok := controlPlane.Cluster.GetAnnotations()[clusterv1.ClusterTopologyUpgradeStepAnnotation]; ok && version != "" {
v = version
}
log.Info(fmt.Sprintf("Waiting for a version upgrade to %s to be propagated", v))
Expand Down
2 changes: 1 addition & 1 deletion controlplane/kubeadm/internal/controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ func getPreflightMessages(cluster *clusterv1.Cluster, preflightChecks internal.P
additionalMessages := []string{}
if preflightChecks.TopologyVersionMismatch {
v := cluster.Spec.Topology.Version
if version, ok := cluster.GetAnnotations()[clusterv1.ClusterTopologyUpgradeStepAnnotation]; ok {
if version, ok := cluster.GetAnnotations()[clusterv1.ClusterTopologyUpgradeStepAnnotation]; ok && version != "" {
v = version
}
additionalMessages = append(additionalMessages, fmt.Sprintf("* waiting for a version upgrade to %s to be propagated", v))
Expand Down
11 changes: 8 additions & 3 deletions exp/topology/desiredstate/desired_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"sigs.k8s.io/cluster-api/internal/topology/selectors"
"sigs.k8s.io/cluster-api/internal/webhooks"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/cache"
"sigs.k8s.io/cluster-api/util/conversion"
)
Expand Down Expand Up @@ -734,15 +733,21 @@ func computeCluster(_ context.Context, s *scope.Scope, infrastructureCluster, co
// NOTE, it is required to surface intermediate steps of the upgrade plan to allow creation of machines in KCP/MS.
// TODO: consider if we want to surface the upgrade plan (or the list of desired versions) in cluster status;
// TBD if the semantic of the new field can replace this annotation.
if cluster.Annotations == nil {
cluster.Annotations = map[string]string{}
}
if hooks.IsPending(runtimehooksv1.AfterClusterUpgrade, s.Current.Cluster) {
// NOTE: to detect if we are at the beginning of an upgrade, we check if the intent to call the AfterClusterUpgrade is already tracked.
controlPlaneVersion, err := contract.ControlPlane().Version().Get(controlPlane)
if err != nil {
return nil, errors.Wrap(err, "error getting control plane version")
}
annotations.AddAnnotations(cluster, map[string]string{clusterv1.ClusterTopologyUpgradeStepAnnotation: *controlPlaneVersion})
cluster.Annotations[clusterv1.ClusterTopologyUpgradeStepAnnotation] = *controlPlaneVersion
} else {
delete(cluster.Annotations, clusterv1.ClusterTopologyUpgradeStepAnnotation)
// Note: Setting the annotation to "" instead of deleting it because we cannot be sure
// that we are able to remove the annotation from the Cluster with SSA if we lost ownership of
// the annotation in managedFields e.g. because of: https://github.com/kubernetes/kubernetes/issues/136919.
cluster.Annotations[clusterv1.ClusterTopologyUpgradeStepAnnotation] = ""
}

return cluster, nil
Expand Down
Loading
Loading