Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add validation code to check replicas for quorum loss #102

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
49 changes: 49 additions & 0 deletions src/go/k8s/internal/controller/redpanda/redpanda_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"context"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"k8s.io/utils/pointer"
alejandroEsc marked this conversation as resolved.
Show resolved Hide resolved
"maps"
"reflect"
"time"
Expand Down Expand Up @@ -610,6 +612,13 @@ func (r *RedpandaReconciler) reconcileHelmRelease(ctx context.Context, rp *v1alp
return rp, hr, fmt.Errorf("failed to get HelmRelease '%s/%s': %w", rp.Namespace, rp.Status.HelmRelease, err)
}

// We have retrieved an existing HelmRelease here, if it did not exist, it would have been created above
// so this is a good place, to validate the HelmRelease before updating.
errValidating := validateHelmRelease(rp, hr)
if errValidating != nil {
return rp, hr, fmt.Errorf("validating HelmRelease error: '%s/%s': %w", rp.Namespace, rp.Status.HelmRelease, errValidating)
}

// Check if we need to update here
hrTemplate, errTemplated := r.createHelmReleaseFromTemplate(ctx, rp)
if errTemplated != nil {
Expand Down Expand Up @@ -886,3 +895,43 @@ func disableConsoleReconciliation(console *vectorzied_v1alpha1.Console) {
}
console.Annotations[managedAnnotationKey] = NotManaged
}

func validateHelmRelease(rp *v1alpha1.Redpanda, hr *helmv2beta2.HelmRelease) error {
errs := make([]error, 0)

errReplicaCount := validateHelmReleaseReplicaCount(rp, hr)
if errReplicaCount != nil {
errs = append(errs, errReplicaCount)
}

return errors.Join(errs...)
}

func validateHelmReleaseReplicaCount(rp *v1alpha1.Redpanda, hr *helmv2beta2.HelmRelease) error {
// First validate if we are scaling down too fast
clusterSpec := &v1alpha1.RedpandaClusterSpec{}
err := json.Unmarshal(hr.Spec.Values.Raw, clusterSpec)
alejandroEsc marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("could not unmarshal values data to validate helmrelease")
}

currentReplicas := pointer.IntDeref(clusterSpec.Statefulset.Replicas, 0)
if currentReplicas == 0 {
// current replicas is 0, no longer validating.
return nil
}

// Calculate min number of nodes to (floored) to keep quorum
// Note slowly successful decommissioning will change this value,
// so as long as we do not lose quorum we should be able to scale
// in a controlled manner
minForQuorum := (currentReplicas + 1) / 2

requestedReplicas := pointer.IntDeref(rp.Spec.ClusterSpec.Statefulset.Replicas, 0)

if requestedReplicas < minForQuorum {
return fmt.Errorf("requested replicas of %d is less than %d neeed to maintain quorum", requestedReplicas, minForQuorum)
}

return nil
}
Loading