Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit 3138130

Browse files
authored
Merge pull request #61 from banzaicloud/remove_least_partition
In case of Downgrade remove broker with the least partition
2 parents 7b6eb3b + b275693 commit 3138130

File tree

4 files changed

+58
-3
lines changed

4 files changed

+58
-3
lines changed

charts/kafka-operator/values.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ replicaCount: 1
77
operator:
88
image:
99
repository: banzaicloud/kafka-operator
10-
tag: 0.3.0
10+
tag: 0.3.1
1111
pullPolicy: IfNotPresent
1212
resources:
1313
limits:

internal/alertmanager/currentalert/process.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package currentalert
1717
import (
1818
banzaicloudv1alpha1 "github.com/banzaicloud/kafka-operator/pkg/apis/banzaicloud/v1alpha1"
1919
"github.com/banzaicloud/kafka-operator/pkg/k8sutil"
20+
"github.com/banzaicloud/kafka-operator/pkg/scale"
2021
"github.com/banzaicloud/kafka-operator/pkg/util"
2122
"github.com/prometheus/common/model"
2223
corev1 "k8s.io/api/core/v1"
@@ -52,7 +53,11 @@ func addPVC(labels model.LabelSet, annotations model.LabelSet, client client.Cli
5253
}
5354

5455
func downScale(labels model.LabelSet, client client.Client) error {
55-
err := k8sutil.RemoveBrokerFromCr(string(labels["brokerId"]), string(labels["kafka_cr"]), string(labels["kubernetes_namespace"]), client)
56+
brokerId, err := scale.GetBrokerIDWithLeastPartition(string(labels["kubernetes_namespace"]))
57+
if err != nil {
58+
return err
59+
}
60+
err = k8sutil.RemoveBrokerFromCr(brokerId, string(labels["kafka_cr"]), string(labels["kubernetes_namespace"]), client)
5661
if err != nil {
5762
return err
5863
}

internal/alertmanager/receiver/http_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (a *HTTPController) reciveAlert(w http.ResponseWriter, r *http.Request) {
5858
}
5959
err = alertReciever(a.Logger, alert, a.Client)
6060
if err != nil {
61-
http.Error(w, "alert reciever error", http.StatusBadRequest)
61+
http.Error(w, "alert receiver error", http.StatusBadRequest)
6262
return
6363
}
6464
w.WriteHeader(http.StatusAccepted)

pkg/scale/scale.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,56 @@ func isKafkaBrokerReady(brokerId, namespace string) (bool, error) {
161161
return running, nil
162162
}
163163

164+
// GetBrokerIDWithLeastPartition returns
165+
func GetBrokerIDWithLeastPartition(namespace string) (string, error) {
166+
167+
brokerWithLeastPartition := ""
168+
169+
err := getCruiseControlStatus(namespace)
170+
if err != nil {
171+
return brokerWithLeastPartition, err
172+
}
173+
174+
options := map[string]string{
175+
"json": "true",
176+
}
177+
178+
rsp, err := getCruiseControl(kafkaClusterStateAction, namespace, options)
179+
if err != nil {
180+
log.Error(err, "can't work with cruise-control because it is not ready")
181+
return brokerWithLeastPartition, err
182+
}
183+
184+
body, err := ioutil.ReadAll(rsp.Body)
185+
if err != nil {
186+
return brokerWithLeastPartition, err
187+
}
188+
189+
err = rsp.Body.Close()
190+
if err != nil {
191+
return brokerWithLeastPartition, err
192+
}
193+
194+
var response map[string]interface{}
195+
196+
err = json.Unmarshal(body, &response)
197+
if err != nil {
198+
return brokerWithLeastPartition, err
199+
}
200+
201+
replicaCountByBroker := response["KafkaBrokerState"].(map[string]interface{})["ReplicaCountByBrokerId"].(map[string]interface{})
202+
replicaCount := float64(99999)
203+
204+
for brokerID, replica := range replicaCountByBroker {
205+
if replicaCount > replica.(float64) {
206+
replicaCount = replica.(float64)
207+
brokerWithLeastPartition = brokerID
208+
}
209+
}
210+
return brokerWithLeastPartition, nil
211+
212+
}
213+
164214
// UpScaleCluster upscales Kafka cluster
165215
func UpScaleCluster(brokerId, namespace string) error {
166216

0 commit comments

Comments
 (0)