Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions bootstrap/controllers/certificates_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

bootstrapv1 "github.com/canonical/cluster-api-k8s/bootstrap/api/v1beta2"
"github.com/canonical/cluster-api-k8s/pkg/ck8s"
ck8serrors "github.com/canonical/cluster-api-k8s/pkg/errors"
utiltime "github.com/canonical/cluster-api-k8s/pkg/time"
"github.com/canonical/cluster-api-k8s/pkg/token"
)
Expand Down Expand Up @@ -127,19 +128,28 @@ func (r *CertificatesReconciler) Reconcile(ctx context.Context, req ctrl.Request

if !hasExpiryDateAnnotation {
if err := r.updateExpiryDateAnnotation(ctx, scope); err != nil {
return ctrl.Result{}, err
log.Error(err, "Encountered error during updateExpiryDateAnnotation")
return ck8serrors.RequeueOnK8sdProxyError(err)
}
}

if refreshCertificates {
if err := r.refreshCertificates(ctx, scope); err != nil {
// On error, we requeue the request to retry.
mAnnotations[bootstrapv1.CertificatesRefreshStatusAnnotation] = bootstrapv1.CertificatesRefreshFailedStatus
m.SetAnnotations(mAnnotations)
if err := r.Client.Update(ctx, m); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to clear status annotation after error: %w", err)
log.Error(err, "Encountered error during refreshCertificates")

// Only update the machine if this annotation isn't already set to the same value.
// Updating it will re-trigger this Reconciler, in which case we'd probably hit the same error.
// The request is going to requeued anyways, since we'll be returning an error or a non-zero Result.
if annotation, ok := mAnnotations[bootstrapv1.CertificatesRefreshStatusAnnotation]; !ok || annotation != bootstrapv1.CertificatesRefreshFailedStatus {
mAnnotations[bootstrapv1.CertificatesRefreshStatusAnnotation] = bootstrapv1.CertificatesRefreshFailedStatus
m.SetAnnotations(mAnnotations)
if err := r.Client.Update(ctx, m); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to clear status annotation after error: %w", err)
}
}
return ctrl.Result{}, err

return ck8serrors.RequeueOnK8sdProxyError(err)
}
}

Expand Down
13 changes: 11 additions & 2 deletions bootstrap/controllers/ck8sconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
bootstrapv1 "github.com/canonical/cluster-api-k8s/bootstrap/api/v1beta2"
"github.com/canonical/cluster-api-k8s/pkg/ck8s"
"github.com/canonical/cluster-api-k8s/pkg/cloudinit"
ck8serrors "github.com/canonical/cluster-api-k8s/pkg/errors"
"github.com/canonical/cluster-api-k8s/pkg/locking"
"github.com/canonical/cluster-api-k8s/pkg/secret"
"github.com/canonical/cluster-api-k8s/pkg/token"
Expand Down Expand Up @@ -205,11 +206,19 @@ func (r *CK8sConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// it's a control plane join
if configOwner.IsControlPlaneMachine() {
return reconcile.Result{}, r.joinControlplane(ctx, scope)
if err := r.joinControlplane(ctx, scope); err != nil {
log.Error(err, "Encountered error during joinControlplane")
return ck8serrors.RequeueOnK8sdProxyError(err)
}
return reconcile.Result{}, nil
}

// It's a worker join
return reconcile.Result{}, r.joinWorker(ctx, scope)
if err := r.joinWorker(ctx, scope); err != nil {
log.Error(err, "Encountered error during joinWorker")
return ck8serrors.RequeueOnK8sdProxyError(err)
}
return reconcile.Result{}, nil
}

func (r *CK8sConfigReconciler) joinControlplane(ctx context.Context, scope *Scope) error {
Expand Down
31 changes: 20 additions & 11 deletions controlplane/controllers/ck8scontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ func (r *CK8sControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Req
}

// Always attempt to update status.
if updateErr := r.updateStatus(ctx, kcp, cluster); updateErr != nil {
if requeue, updateErr := r.updateStatus(ctx, kcp, cluster); updateErr != nil {
if requeue && res.IsZero() {
res.RequeueAfter = 10 * time.Second
}
var connFailure *ck8s.RemoteClusterConnectionError
if errors.As(updateErr, &connFailure) {
logger.Info("Could not connect to workload cluster to fetch status", "updateErr", updateErr.Error())
Expand All @@ -167,7 +170,12 @@ func (r *CK8sControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
}

return res, err
if res.IsZero() {
return res, err
}

// If the result is non-zero and we're requeuing the request, we shouldn't return an error.
return res, nil
}

// reconcileDelete handles CK8sControlPlane deletion.
Expand Down Expand Up @@ -325,22 +333,22 @@ func (r *CK8sControlPlaneReconciler) ClusterToCK8sControlPlane(_ context.Context

// updateStatus is called after every reconcilitation loop in a defer statement to always make sure we have the
// resource status subresourcs up-to-date.
func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *controlplanev1.CK8sControlPlane, cluster *clusterv1.Cluster) error {
func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *controlplanev1.CK8sControlPlane, cluster *clusterv1.Cluster) (bool, error) {
selector := collections.ControlPlaneSelectorForCluster(cluster.Name)
// Copy label selector to its status counterpart in string format.
// This is necessary for CRDs including scale subresources.
kcp.Status.Selector = selector.String()

ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), collections.OwnedMachines(kcp))
if err != nil {
return fmt.Errorf("failed to get list of owned machines: %w", err)
return false, fmt.Errorf("failed to get list of owned machines: %w", err)
}

logger := r.Log.WithValues("namespace", kcp.Namespace, "CK8sControlPlane", kcp.Name, "cluster", cluster.Name)
controlPlane, err := ck8s.NewControlPlane(ctx, r.Client, cluster, kcp, ownedMachines)
if err != nil {
logger.Error(err, "failed to initialize control plane")
return err
return false, err
}
kcp.Status.UpdatedReplicas = int32(len(controlPlane.UpToDateMachines()))

Expand All @@ -360,7 +368,7 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont
// Return early if the deletion timestamp is set, because we don't want to try to connect to the workload cluster
// and we don't want to report resize condition (because it is set to deleting into reconcile delete).
if !kcp.DeletionTimestamp.IsZero() {
return nil
return false, nil
}

switch {
Expand All @@ -383,11 +391,12 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont
microclusterPort := kcp.Spec.CK8sConfigSpec.ControlPlaneConfig.GetMicroclusterPort()
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster), microclusterPort)
if err != nil {
return fmt.Errorf("failed to create remote cluster client: %w", err)
return false, fmt.Errorf("failed to create remote cluster client: %w", err)
}
status, err := workloadCluster.ClusterStatus(ctx)
if err != nil {
return err
// we will requeue if HasK8sdConfigMap hasn't been set yet.
return !status.HasK8sdConfigMap, err
}

logger.Info("ClusterStatus", "workload", status)
Expand All @@ -413,15 +422,15 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont
if v, ok := controlPlane.KCP.Annotations[controlplanev1.RemediationInProgressAnnotation]; ok {
remediationData, err := RemediationDataFromAnnotation(v)
if err != nil {
return err
return false, err
}
lastRemediation = remediationData
} else {
for _, m := range controlPlane.Machines.UnsortedList() {
if v, ok := m.Annotations[controlplanev1.RemediationForAnnotation]; ok {
remediationData, err := RemediationDataFromAnnotation(v)
if err != nil {
return err
return false, err
}
if lastRemediation == nil || lastRemediation.Timestamp.Time.Before(remediationData.Timestamp.Time) {
lastRemediation = remediationData
Expand All @@ -434,7 +443,7 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont
controlPlane.KCP.Status.LastRemediation = lastRemediation.ToStatus()
}

return nil
return false, nil
}

// reconcile handles CK8sControlPlane reconciliation.
Expand Down
39 changes: 23 additions & 16 deletions pkg/ck8s/workload_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"

controlplanev1 "github.com/canonical/cluster-api-k8s/controlplane/api/v1beta2"
ck8serrors "github.com/canonical/cluster-api-k8s/pkg/errors"
)

const (
Expand Down Expand Up @@ -83,6 +84,23 @@ func (w *Workload) getControlPlaneNodes(ctx context.Context) (*corev1.NodeList,
func (w *Workload) ClusterStatus(ctx context.Context) (ClusterStatus, error) {
status := ClusterStatus{}

// NOTE(neoaggelos): Check that the k8sd-config on the kube-system configmap exists.
key := ctrlclient.ObjectKey{
Name: k8sdConfigSecretName,
Namespace: metav1.NamespaceSystem,
}

err := w.Client.Get(ctx, key, &corev1.ConfigMap{})
// In case of error we do assume the control plane is not initialized yet.
if err != nil {
logger := log.FromContext(ctx)
logger.Info("Control Plane does not seem to be initialized yet.", "reason", err.Error())
status.HasK8sdConfigMap = false
return status, err
}

status.HasK8sdConfigMap = true

// count the control plane nodes
nodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
Expand All @@ -97,21 +115,6 @@ func (w *Workload) ClusterStatus(ctx context.Context) (ClusterStatus, error) {
}
}

// NOTE(neoaggelos): Check that the k8sd-config on the kube-system configmap exists.
key := ctrlclient.ObjectKey{
Name: k8sdConfigSecretName,
Namespace: metav1.NamespaceSystem,
}

err = w.Client.Get(ctx, key, &corev1.ConfigMap{})
// In case of error we do assume the control plane is not initialized yet.
if err != nil {
logger := log.FromContext(ctx)
logger.Info("Control Plane does not seem to be initialized yet.", "reason", err.Error())
}

status.HasK8sdConfigMap = err == nil

return status, nil
}

Expand Down Expand Up @@ -165,6 +168,10 @@ func (w *Workload) GetK8sdProxyForControlPlane(ctx context.Context, options k8sd
return nil, fmt.Errorf("failed to get proxy pods: %w", err)
}

if len(podmap) == 0 {
return nil, &ck8serrors.K8sdProxyNotFound{}
}

var allErrors []error
for _, node := range cplaneNodes.Items {
if _, ok := options.IgnoreNodes[node.Name]; ok {
Expand All @@ -179,7 +186,7 @@ func (w *Workload) GetK8sdProxyForControlPlane(ctx context.Context, options k8sd

if !podv1.IsPodReady(&pod) {
// if the Pod is not Ready, it won't be able to accept any k8sd API calls.
allErrors = append(allErrors, fmt.Errorf("pod '%s' is not Ready", pod.Name))
allErrors = append(allErrors, &ck8serrors.K8sdProxyNotReady{PodName: pod.Name})
continue
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/ck8s/workload_cluster_k8sd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
_ "embed"
"errors"
"fmt"
"net/http"
"time"
Expand All @@ -13,7 +12,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
podv1 "k8s.io/kubernetes/pkg/api/v1/pod"

ck8serrors "github.com/canonical/cluster-api-k8s/pkg/errors"
"github.com/canonical/cluster-api-k8s/pkg/proxy"
)

Expand Down Expand Up @@ -49,7 +50,12 @@ func (g *k8sdClientGenerator) forNode(ctx context.Context, node *corev1.Node) (*

pod, ok := podmap[node.Name]
if !ok {
return nil, fmt.Errorf("missing k8sd proxy pod for node %s", node.Name)
return nil, &ck8serrors.K8sdProxyNotFound{NodeName: node.Name}
}

if !podv1.IsPodReady(&pod) {
// if the Pod is not Ready, it won't be able to accept any k8sd API calls.
return nil, &ck8serrors.K8sdProxyNotReady{PodName: pod.Name}
}

return g.forNodePod(ctx, node, pod.Name)
Expand Down Expand Up @@ -78,10 +84,6 @@ func (g *k8sdClientGenerator) getProxyPods(ctx context.Context) (map[string]core
return nil, fmt.Errorf("unable to list k8sd-proxy pods in target cluster: %w", err)
}

if len(pods.Items) == 0 {
return nil, errors.New("there isn't any k8sd-proxy pods in target cluster")
}

podmap := make(map[string]corev1.Pod, len(pods.Items))
for _, pod := range pods.Items {
podmap[pod.Spec.NodeName] = pod
Expand Down
17 changes: 12 additions & 5 deletions pkg/ck8s/workload_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ func TestClusterStatus(t *testing.T) {
}{
{
name: "returns cluster status",
objs: []client.Object{node1, node2},
objs: []client.Object{},
expectHasSecret: false,
expectErr: true,
},
{
name: "returns cluster status with k8sd-config configmap",
objs: []client.Object{node1, node2, servingSecret},
objs: []client.Object{servingSecret, node1, node2},
expectHasSecret: true,
},
}
Expand All @@ -72,9 +73,15 @@ func TestClusterStatus(t *testing.T) {
Client: fakeClient,
}
status, err := w.ClusterStatus(context.TODO())
g.Expect(err).ToNot(HaveOccurred())
g.Expect(status.Nodes).To(BeEquivalentTo(2))
g.Expect(status.ReadyNodes).To(BeEquivalentTo(1))
if tt.expectErr {
g.Expect(err).To(HaveOccurred())
g.Expect(status.Nodes).To(BeEquivalentTo(0))
g.Expect(status.ReadyNodes).To(BeEquivalentTo(0))
} else {
g.Expect(err).ToNot(HaveOccurred())
g.Expect(status.Nodes).To(BeEquivalentTo(2))
g.Expect(status.ReadyNodes).To(BeEquivalentTo(1))
}
g.Expect(status.HasK8sdConfigMap).To(Equal(tt.expectHasSecret))
})
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package errors

import (
"errors"
"fmt"
"time"

ctrl "sigs.k8s.io/controller-runtime"
)

type CK8sControlPlaneStatusError string

const (
Expand All @@ -24,3 +32,35 @@ const (
// when trying to delete the CK8s control plane.
DeleteCK8sControlPlaneError CK8sControlPlaneStatusError = "DeleteError"
)

type K8sdProxyNotFound struct {
NodeName string
}

func (e *K8sdProxyNotFound) Error() string {
if e.NodeName == "" {
return "missing k8sd proxy pod(s)"
}
return fmt.Sprintf("missing k8sd proxy pod for node %s", e.NodeName)
}

type K8sdProxyNotReady struct {
PodName string
}

func (e *K8sdProxyNotReady) Error() string {
return fmt.Sprintf("pod '%s' is not Ready", e.PodName)
}

func RequeueOnK8sdProxyError(err error) (ctrl.Result, error) {
var (
notFoundErr *K8sdProxyNotFound
notReadyErr *K8sdProxyNotReady
)
if errors.As(err, &notFoundErr) || errors.As(err, &notReadyErr) {
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why choose 30 seconds? It seems quite long. How long does the service usually take to become ready?

Copy link
Contributor Author

@claudiubelu claudiubelu May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair, the value is a bit arbitrary. We could lower it to something like 15 seconds.

The k8sd-proxy pods can take quite a bit to spawn. Based on a test's run (in a test run that didn't have this PR), we can see in the Certificates Controller Reconciler, we can see how many k8sd-proxy related Reconciler errors there are, and how long it can take until they're no longer an issue (from: https://github.com/canonical/cluster-api-k8s/actions/runs/14885411990/job/41834498342)

2025-05-07T22:45:51Z	ERROR	Reconciler error	{"controller": "ck8sconfig", "controllerGroup": "bootstrap.cluster.x-k8s.io", "controllerKind": "CK8sConfig", "CK8sConfig": {"name":"worker-md-0-cld5g-7dvb9","namespace":"workload-cluster-certificate-refresh-u270u9"}, "namespace": "workload-cluster-certificate-refresh-u270u9", "name": "worker-md-0-cld5g-7dvb9", "reconcileID": "59ed7295-ad38-44ee-9119-1268fc2955b4", "error": "failed to request join token: failed to create k8sd proxy: failed to get proxy pods: there isn't any k8sd-proxy pods in target cluster"}
.
.
.
2025-05-07T22:46:02Z	ERROR	Reconciler error	{"controller": "ck8sconfig", "controllerGroup": "bootstrap.cluster.x-k8s.io", "controllerKind": "CK8sConfig", "CK8sConfig": {"name":"worker-md-0-cld5g-7dvb9","namespace":"workload-cluster-certificate-refresh-u270u9"}, "namespace": "workload-cluster-certificate-refresh-u270u9", "name": "worker-md-0-cld5g-7dvb9", "reconcileID": "1e4eade8-44be-41bd-967d-b57e581e9ea1", "error": "failed to request join token: failed to create k8sd proxy: failed to get k8sd proxy for control plane, previous errors: pod 'k8sd-proxy-w626x' is not Ready"}
.
.
.
2025-05-07T22:46:34Z	ERROR	Reconciler error	{"controller": "machine", "controllerGroup": "cluster.x-k8s.io", "controllerKind": "Machine", "Machine": {"name":"worker-md-0-cld5g-7dvb9","namespace":"workload-cluster-certificate-refresh-u270u9"}, "namespace": "workload-cluster-certificate-refresh-u270u9", "name": "worker-md-0-cld5g-7dvb9", "reconcileID": "a78fbd92-b84c-400a-b3c2-d25e5ebd4207", "error": "failed to get certificates expiry date: failed to create k8sd proxy: missing k8sd proxy pod for node capick8s-certificate-refresh-opv64c-worker-md-0-cld5g-7dvb9"}
.
.
.
2025-05-07T22:47:37Z	ERROR	Reconciler error	{"controller": "machine", "controllerGroup": "cluster.x-k8s.io", "controllerKind": "Machine", "Machine": {"name":"worker-md-0-cld5g-7dvb9","namespace":"workload-cluster-certificate-refresh-u270u9"}, "namespace": "workload-cluster-certificate-refresh-u270u9", "name": "worker-md-0-cld5g-7dvb9", "reconcileID": "ddb63da9-22b4-4183-a7e3-c20a47fc0f05", "error": "failed to get certificates expiry date: failed to create k8sd proxy: missing k8sd proxy pod for node capick8s-certificate-refresh-opv64c-worker-md-0-cld5g-7dvb9"}
.
.
.
2025-05-07T22:47:51Z	DEBUG	events	Certificates refresh in progress. TTL: 1y	{"type": "Normal", "object": {"kind":"Machine","namespace":"workload-cluster-certificate-refresh-u270u9","name":"worker-md-0-cld5g-7dvb9","uid":"b96ba028-7986-41da-9f52-9b9df1b6f7cc","apiVersion":"cluster.x-k8s.io/v1beta1","resourceVersion":"1879"}, "reason": "CertificatesRefreshInProgress"}
2025-05-07T22:47:51Z	INFO	controllers.Certificates	Certificates refreshed	{"namespace": "workload-cluster-certificate-refresh-u270u9", "machine": "worker-md-0-cld5g-7dvb9", "cluster": "capick8s-certificate-refresh-opv64c", "machine": "worker-md-0-cld5g-7dvb9", "expiry": "2026-05-07T22:47:51Z"}
...

So, 2 minutes, more or less.

}

// Not a k8sd-proxy related error.
return ctrl.Result{}, err
}
Loading
Loading