Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 4 additions & 19 deletions controllers/clustercache/cluster_accessor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,25 +80,10 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect

// If the controller runs on the workload cluster, access the apiserver directly by using the
// CA and Host from the in-cluster configuration.
if runningOnCluster {
log.V(6).Info("Controller is running on the cluster, updating REST config with in-cluster config")

inClusterConfig, err := ctrl.GetConfig()
if err != nil {
return nil, errors.Wrapf(err, "error getting in-cluster REST config")
}

// Use CA and Host from in-cluster config.
restConfig.CAData = nil
restConfig.CAFile = inClusterConfig.CAFile
restConfig.Host = inClusterConfig.Host

log.V(6).Info(fmt.Sprintf("Creating HTTP client and mapper with updated REST config with host %q", restConfig.Host))
httpClient, mapper, restClient, err = createHTTPClientAndMapper(ctx, ca.config.HealthProbe, restConfig)
if err != nil {
return nil, errors.Wrapf(err, "error creating HTTP client and mapper (using in-cluster config)")
}
}
// NOTE: In-cluster optimization is DISABLED because it causes authentication issues with managed
// Kubernetes services (EKS, GKE, AKS) that use short-lived tokens.
// TODO: Re-enable with proper detection of cluster type if performance becomes critical.
_ = runningOnCluster // Acknowledge we detect this but don't use it

log.V(6).Info("Creating cached client and cache")
cachedClient, cache, err := createCachedClient(ctx, ca.config, restConfig, httpClient, mapper)
Expand Down
84 changes: 83 additions & 1 deletion exp/internal/controllers/machinepool_controller_noderef.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"strconv"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -94,6 +95,27 @@ func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, s *scope)
if err != nil {
if err == errNoAvailableNodes {
log.Info("Cannot assign NodeRefs to MachinePool, no matching Nodes")

// Check if we've failed too many times and need to trigger ProviderID correction retry
failureCount := r.getNodeRefFailureCount(mp)
log.Info("NodeRef assignment failed", "failureCount", failureCount, "maxRetries", 15)

if failureCount >= 15 {
log.Info("Too many NodeRef assignment failures, triggering ProviderID correction retry", "failureCount", failureCount)
// Clear the failure count and requeue to trigger ProviderID correction logic
if err := r.clearNodeRefFailureCount(ctx, mp); err != nil {
log.Error(err, "Failed to clear NodeRef failure count")
}
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
} else {
// Increment failure count
if err := r.incrementNodeRefFailureCount(ctx, mp); err != nil {
log.Error(err, "Failed to increment NodeRef failure count")
} else {
log.Info("Incremented NodeRef failure count", "newCount", failureCount+1)
}
}

// No need to requeue here. Nodes emit an event that triggers reconciliation.
return ctrl.Result{}, nil
}
Expand All @@ -106,6 +128,13 @@ func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, s *scope)
mp.Status.UnavailableReplicas = mp.Status.Replicas - mp.Status.AvailableReplicas
mp.Status.NodeRefs = nodeRefsResult.references

log.V(4).Info("Updated MachinePool status", "readyReplicas", mp.Status.ReadyReplicas, "availableReplicas", mp.Status.AvailableReplicas, "nodeRefs", len(mp.Status.NodeRefs), "mpObjectID", fmt.Sprintf("%p", mp))

// Clear failure count on successful NodeRef assignment
if err := r.clearNodeRefFailureCount(ctx, mp); err != nil {
log.Error(err, "Failed to clear NodeRef failure count on success")
}

log.Info("Set MachinePool's NodeRefs", "nodeRefs", mp.Status.NodeRefs)
r.recorder.Event(mp, corev1.EventTypeNormal, "SuccessfulSetNodeRefs", fmt.Sprintf("%+v", mp.Status.NodeRefs))

Expand All @@ -115,6 +144,7 @@ func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, s *scope)
return ctrl.Result{}, err
}

log.V(4).Info("Final condition check", "replicas", mp.Status.Replicas, "readyReplicas", mp.Status.ReadyReplicas, "nodeRefs", len(nodeRefsResult.references), "mpObjectID", fmt.Sprintf("%p", mp))
if mp.Status.Replicas != mp.Status.ReadyReplicas || len(nodeRefsResult.references) != int(mp.Status.ReadyReplicas) {
log.Info("Not enough ready replicas or node references", "nodeRefs", len(nodeRefsResult.references), "readyReplicas", mp.Status.ReadyReplicas, "replicas", mp.Status.Replicas)
conditions.MarkFalse(mp, expv1.ReplicasReadyCondition, expv1.WaitingForReplicasReadyReason, clusterv1.ConditionSeverityInfo, "")
Expand Down Expand Up @@ -173,10 +203,24 @@ func (r *MachinePoolReconciler) getNodeReferences(ctx context.Context, providerI
continue
}
if node, ok := nodeRefsMap[providerID]; ok {
if noderefutil.IsNodeReady(node) {
// Debug: Log the node's ready condition
readyCondition := noderefutil.GetReadyCondition(&node.Status)
if readyCondition != nil {
log.Info("Node ready condition", "nodeName", node.Name, "status", readyCondition.Status, "message", readyCondition.Message)
} else {
log.Info("Node has no ready condition", "nodeName", node.Name)
}

// Use the node from the map (which should have the latest status from getNodeRefMap)
isReady := noderefutil.IsNodeReady(node)
log.Info("Node ready check result", "nodeName", node.Name, "isReady", isReady)

if isReady {
ready++
log.Info("Incremented ready counter", "ready", ready)
if noderefutil.IsNodeAvailable(node, *minReadySeconds, metav1.Now()) {
available++
log.Info("Incremented available counter", "available", available)
}
}
nodeRefs = append(nodeRefs, corev1.ObjectReference{
Expand All @@ -191,6 +235,8 @@ func (r *MachinePoolReconciler) getNodeReferences(ctx context.Context, providerI
if len(nodeRefs) == 0 && len(providerIDList) != 0 {
return getNodeReferencesResult{}, errNoAvailableNodes
}

log.Info("Final getNodeReferences result", "nodeRefs", len(nodeRefs), "ready", ready, "available", available)
return getNodeReferencesResult{nodeRefs, available, ready}, nil
}

Expand Down Expand Up @@ -226,3 +272,39 @@ func (r *MachinePoolReconciler) patchNodes(ctx context.Context, c client.Client,
}
return nil
}

// getNodeRefFailureCount returns the number of consecutive NodeRef assignment failures
func (r *MachinePoolReconciler) getNodeRefFailureCount(mp *expv1.MachinePool) int {
if mp.Annotations == nil {
return 0
}
if countStr, exists := mp.Annotations["cluster.x-k8s.io/node-ref-failure-count"]; exists {
if count, err := strconv.Atoi(countStr); err == nil {
return count
}
}
return 0
}

// incrementNodeRefFailureCount increments the NodeRef assignment failure count
func (r *MachinePoolReconciler) incrementNodeRefFailureCount(ctx context.Context, mp *expv1.MachinePool) error {
if mp.Annotations == nil {
mp.Annotations = make(map[string]string)
}
currentCount := r.getNodeRefFailureCount(mp)
mp.Annotations["cluster.x-k8s.io/node-ref-failure-count"] = strconv.Itoa(currentCount + 1)
// Do not persist here; a direct Update may drop Status on the in-flight object.
// The outer deferred patch in the main reconcile will persist this change safely.
return nil
}

// clearNodeRefFailureCount clears the NodeRef assignment failure count
func (r *MachinePoolReconciler) clearNodeRefFailureCount(ctx context.Context, mp *expv1.MachinePool) error {
if mp.Annotations == nil {
return nil
}
delete(mp.Annotations, "cluster.x-k8s.io/node-ref-failure-count")
// Do not persist here; a direct Update may drop Status on the in-flight object.
// The outer deferred patch in the main reconcile will persist this change safely.
return nil
}
108 changes: 102 additions & 6 deletions exp/internal/controllers/machinepool_controller_phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package controllers
import (
"context"
"fmt"
"os"
"reflect"
"strings"
"time"

"github.com/pkg/errors"
"gopkg.in/ini.v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -277,8 +280,9 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *
}

var getNodeRefsErr error
var originalRegion string
// Get the nodeRefsMap from the cluster.
s.nodeRefMap, getNodeRefsErr = r.getNodeRefMap(ctx, clusterClient)
s.nodeRefMap, originalRegion, getNodeRefsErr = r.getNodeRefMap(ctx, r.Client, clusterClient, cluster.Namespace)

err = r.reconcileMachines(ctx, s, infraConfig)

Expand All @@ -297,6 +301,35 @@ func (r *MachinePoolReconciler) reconcileInfrastructure(ctx context.Context, s *
return ctrl.Result{}, errors.Wrapf(err, "failed to retrieve data from infrastructure provider for MachinePool %q in namespace %q", mp.Name, mp.Namespace)
}

// Correct AWS ProviderIDs in the list to use the original region
log.Info("Processing ProviderIDList", "count", len(providerIDList), "originalRegion", originalRegion)
if originalRegion != "" {
for i, providerID := range providerIDList {
log.Info("Processing ProviderID", "index", i, "providerID", providerID)
if strings.HasPrefix(providerID, "aws:///") {
parts := strings.Split(providerID, "/")
log.Info("ProviderID parts", "parts", parts, "count", len(parts))
if len(parts) >= 5 {
// Extract the zone from the original ProviderID and replace the region part
originalZone := parts[3] // e.g., "us-east-1a" (index 3, not 2!)
// Replace the region part of the zone with the correct region
// e.g., "us-east-1a" -> "us-isob-east-1a"
zoneParts := strings.Split(originalZone, "-")
log.Info("Zone parts", "originalZone", originalZone, "zoneParts", zoneParts, "count", len(zoneParts))
if len(zoneParts) >= 3 {
// Replace the first 3 parts (us-east-1) with the correct region (us-isob-east-1)
correctedZone := fmt.Sprintf("%s-%s", originalRegion, zoneParts[len(zoneParts)-1]) // e.g., "us-isob-east-1a"
correctedProviderID := fmt.Sprintf("aws:///%s/%s", correctedZone, parts[4])
if correctedProviderID != providerID {
log.Info("Corrected AWS ProviderID in list", "original", providerID, "corrected", correctedProviderID)
providerIDList[i] = correctedProviderID
}
}
}
}
}
}

// Get and set Status.Replicas from the infrastructure provider.
err = util.UnstructuredUnmarshalField(infraConfig, &mp.Status.Replicas, "status", "replicas")
if err != nil {
Expand Down Expand Up @@ -555,28 +588,91 @@ func (r *MachinePoolReconciler) waitForMachineCreation(ctx context.Context, mach
return nil
}

func (r *MachinePoolReconciler) getNodeRefMap(ctx context.Context, c client.Client) (map[string]*corev1.Node, error) {
func (r *MachinePoolReconciler) getNodeRefMap(ctx context.Context, mgmtClient client.Client, clusterClient client.Client, clusterNamespace string) (map[string]*corev1.Node, string, error) {
log := ctrl.LoggerFrom(ctx)
nodeRefsMap := make(map[string]*corev1.Node)
nodeList := corev1.NodeList{}

// Look for the AWS credentials secret in the management cluster namespace (where the controller runs)
secret := &corev1.Secret{}
log.Info("Attempting to get AWS credentials secret", "namespace", clusterNamespace, "secret", "capa-manager-bootstrap-credentials")

// Also try looking in the controller's own namespace as a fallback
controllerNamespace := os.Getenv("POD_NAMESPACE")
if controllerNamespace == "" {
controllerNamespace = "capi-system" // default fallback
}
log.Info("Controller namespace info", "controllerNamespace", controllerNamespace, "clusterNamespace", clusterNamespace)
if err := mgmtClient.Get(ctx, client.ObjectKey{Namespace: clusterNamespace, Name: "capa-manager-bootstrap-credentials"}, secret); err != nil {
if apierrors.IsNotFound(err) {
log.Info("AWS credentials secret not found, proceeding without region correction", "namespace", clusterNamespace, "secret", "capa-manager-bootstrap-credentials")
} else {
log.Info("Error getting AWS credentials secret", "namespace", clusterNamespace, "secret", "capa-manager-bootstrap-credentials", "error", err)
return nil, "", errors.Wrapf(err, "failed to get AWS credentials secret in namespace %q", clusterNamespace)
}
} else {
log.Info("AWS credentials secret found successfully", "namespace", clusterNamespace, "secret", "capa-manager-bootstrap-credentials")
}

var originalRegion string
if secret.Data != nil && secret.Data["config"] != nil {
cfg, err := ini.Load(secret.Data["config"])
if err != nil {
log.Info("Failed to parse AWS config, proceeding without region correction", "error", err)
} else {
originalRegion = cfg.Section("default").Key("region").String()
log.Info("Extracted AWS region from config", "region", originalRegion)
}
}

for {
if err := c.List(ctx, &nodeList, client.Continue(nodeList.Continue)); err != nil {
return nil, err
if err := clusterClient.List(ctx, &nodeList, client.Continue(nodeList.Continue)); err != nil {
return nil, "", err
}

log.Info("Processing nodes", "count", len(nodeList.Items), "originalRegion", originalRegion)
for _, node := range nodeList.Items {
if node.Spec.ProviderID == "" {
log.V(2).Info("No ProviderID detected, skipping", "providerID", node.Spec.ProviderID)
continue
}

nodeRefsMap[node.Spec.ProviderID] = &node
log.Info("Processing node", "nodeName", node.Name, "providerID", node.Spec.ProviderID)

// Create a copy of the node to avoid modifying the original
nodeCopy := node.DeepCopy()
correctedProviderID := node.Spec.ProviderID

// If we have the original region and the providerID is AWS format, correct the zone
if originalRegion != "" && strings.HasPrefix(node.Spec.ProviderID, "aws:///") {
parts := strings.Split(node.Spec.ProviderID, "/")
log.Info("Node ProviderID parts", "parts", parts, "count", len(parts))
if len(parts) >= 5 {
// Extract the zone from the original ProviderID and replace the region part
originalZone := parts[3] // e.g., "us-east-1a" (index 3, not 2!)
// Replace the region part of the zone with the correct region
// e.g., "us-east-1a" -> "us-isob-east-1a"
zoneParts := strings.Split(originalZone, "-")
log.Info("Node zone parts", "originalZone", originalZone, "zoneParts", zoneParts, "count", len(zoneParts))
if len(zoneParts) >= 3 {
// Replace the first 3 parts (us-east-1) with the correct region (us-isob-east-1)
correctedZone := fmt.Sprintf("%s-%s", originalRegion, zoneParts[len(zoneParts)-1]) // e.g., "us-isob-east-1a"
correctedProviderID = fmt.Sprintf("aws:///%s/%s", correctedZone, parts[4])
// Update the node copy's ProviderID to match the corrected one
nodeCopy.Spec.ProviderID = correctedProviderID
log.Info("Corrected AWS ProviderID zone", "original", node.Spec.ProviderID, "corrected", correctedProviderID)
}
}
}

// Store the node with the corrected ProviderID as the key
nodeRefsMap[correctedProviderID] = nodeCopy
}

if nodeList.Continue == "" {
break
}
}

return nodeRefsMap, nil
return nodeRefsMap, originalRegion, nil
}
Loading