Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit 66176e5

Browse files
stoaderbaluchicken
authored andcommitted
Fix: upscale/downscale action executed only even though continously receiving triggering alert
1 parent c08f805 commit 66176e5

File tree

3 files changed

+180
-7
lines changed

3 files changed

+180
-7
lines changed

internal/alertmanager/currentalert/process.go

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@
1515
package currentalert
1616

1717
import (
18+
"strconv"
19+
1820
"github.com/banzaicloud/kafka-operator/api/v1beta1"
1921
"github.com/banzaicloud/kafka-operator/pkg/k8sutil"
22+
"github.com/banzaicloud/kafka-operator/pkg/resources/kafka"
2023
"github.com/banzaicloud/kafka-operator/pkg/scale"
2124
"github.com/banzaicloud/kafka-operator/pkg/util"
25+
"github.com/go-logr/logr"
2226
"github.com/prometheus/common/model"
2327
corev1 "k8s.io/api/core/v1"
2428
"k8s.io/apimachinery/pkg/api/resource"
@@ -93,22 +97,26 @@ func (e *examiner) processAlert(ds disableScaling) (bool, error) {
9397
}
9498
case "downScale":
9599
if ds.Down {
96-
e.Log.Info("downscaling is skipped due to downscale limit")
100+
e.Log.Info("downscale is skipped due to downscale limit")
97101
return true, nil
98102
}
99-
err := downScale(e.Alert.Labels, e.Client)
103+
err := downScale(e.Log, e.Alert.Labels, e.Client)
100104
if err != nil {
101105
return false, err
102106
}
107+
108+
return true, nil
103109
case "upScale":
104110
if ds.Up {
105-
e.Log.Info("upscaling is skipped due to upscale limit")
111+
e.Log.Info("upscale is skipped due to upscale limit")
106112
return true, nil
107113
}
108-
err := upScale(e.Alert.Labels, e.Alert.Annotations, e.Client)
114+
err := upScale(e.Log, e.Alert.Labels, e.Alert.Annotations, e.Client)
109115
if err != nil {
110116
return false, err
111117
}
118+
119+
return true, nil
112120
}
113121
return false, nil
114122
}
@@ -118,13 +126,25 @@ func addPVC(labels model.LabelSet, annotations model.LabelSet, client client.Cli
118126
return nil
119127
}
120128

121-
func downScale(labels model.LabelSet, client client.Client) error {
129+
func downScale(log logr.Logger, labels model.LabelSet, client client.Client) error {
122130

123131
cr, err := k8sutil.GetCr(string(labels["kafka_cr"]), string(labels["namespace"]), client)
124132
if err != nil {
125133
return err
126134
}
127135

136+
if ids := kafka.GetBrokersWithPendingOrRunningCCTask(cr); len(ids) > 0 {
137+
var keyVals []interface{}
138+
for _, id := range ids {
139+
brokerId := strconv.Itoa(int(id))
140+
keyVals = append(keyVals, brokerId, cr.Status.BrokersState[brokerId].GracefulActionState.CruiseControlState)
141+
}
142+
143+
log.Info("downscale is skipped as there are brokers which have tasks pending or running in CC", keyVals)
144+
145+
return nil
146+
}
147+
128148
brokerId, err := scale.GetBrokerIDWithLeastPartition(string(labels["namespace"]), cr.Spec.CruiseControlConfig.CruiseControlEndpoint, cr.Name)
129149
if err != nil {
130150
return err
@@ -136,13 +156,25 @@ func downScale(labels model.LabelSet, client client.Client) error {
136156
return nil
137157
}
138158

139-
func upScale(labels model.LabelSet, annotations model.LabelSet, client client.Client) error {
159+
func upScale(log logr.Logger, labels model.LabelSet, annotations model.LabelSet, client client.Client) error {
140160

141161
cr, err := k8sutil.GetCr(string(labels["kafka_cr"]), string(labels["namespace"]), client)
142162
if err != nil {
143163
return err
144164
}
145165

166+
if ids := kafka.GetBrokersWithPendingOrRunningCCTask(cr); len(ids) > 0 {
167+
var keyVals []interface{}
168+
for _, id := range ids {
169+
brokerId := strconv.Itoa(int(id))
170+
keyVals = append(keyVals, brokerId, cr.Status.BrokersState[brokerId].GracefulActionState.CruiseControlState)
171+
}
172+
173+
log.Info("upscale is skipped as there are brokers which have tasks pending or running in CC", keyVals)
174+
175+
return nil
176+
}
177+
146178
biggestId := int32(0)
147179
for _, broker := range cr.Spec.Brokers {
148180
if broker.Id > biggestId {
@@ -160,6 +192,10 @@ func upScale(labels model.LabelSet, annotations model.LabelSet, client client.Cl
160192
broker.Id = biggestId + 1
161193

162194
} else {
195+
var storageClassName *string
196+
if annotations["storageClass"] != "" {
197+
storageClassName = util.StringPointer(string(annotations["storageClass"]))
198+
}
163199

164200
broker = v1beta1.Broker{
165201
Id: biggestId + 1,
@@ -172,7 +208,7 @@ func upScale(labels model.LabelSet, annotations model.LabelSet, client client.Cl
172208
AccessModes: []corev1.PersistentVolumeAccessMode{
173209
corev1.ReadWriteOnce,
174210
},
175-
StorageClassName: util.StringPointer(string(annotations["storageClass"])),
211+
StorageClassName: storageClassName,
176212
Resources: corev1.ResourceRequirements{
177213
Requests: corev1.ResourceList{
178214
"storage": resource.MustParse(string(annotations["diskSize"])),

pkg/resources/kafka/kafka.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,3 +767,17 @@ func (r *Reconciler) reconcileKafkaPVC(log logr.Logger, desiredPVC *corev1.Persi
767767
}
768768
return nil
769769
}
770+
771+
// GetBrokersWithPendingOrRunningCCTask returns list of brokers that are either waiting for CC
772+
// to start executing a broker task (add broker, remove broker, etc) or CC already running a task for it.
773+
func GetBrokersWithPendingOrRunningCCTask(kafkaCluster *v1beta1.KafkaCluster) []int32 {
774+
var brokerIDs []int32
775+
for i := range kafkaCluster.Spec.Brokers {
776+
if state, ok := kafkaCluster.Status.BrokersState[strconv.Itoa(int(kafkaCluster.Spec.Brokers[i].Id))]; ok {
777+
if state.GracefulActionState.CruiseControlState == v1beta1.GracefulUpdateRequired || (state.GracefulActionState.CruiseControlTaskId != "" && state.GracefulActionState.CruiseControlState == v1beta1.GracefulUpdateRunning) {
778+
brokerIDs = append(brokerIDs, kafkaCluster.Spec.Brokers[i].Id)
779+
}
780+
}
781+
}
782+
return brokerIDs
783+
}

pkg/resources/kafka/kafka_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright © 2020 Banzai Cloud
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package kafka
16+
17+
import (
18+
"reflect"
19+
"testing"
20+
21+
"github.com/banzaicloud/kafka-operator/api/v1beta1"
22+
)
23+
24+
func TestGetBrokersWithPendingOrRunningCCTask(t *testing.T) {
25+
testCases := []struct {
26+
testName string
27+
kafkaCluster v1beta1.KafkaCluster
28+
expectedIDs []int32
29+
}{
30+
{
31+
testName: "no pending or running CC tasks",
32+
kafkaCluster: v1beta1.KafkaCluster{
33+
Spec: v1beta1.KafkaClusterSpec{
34+
Brokers: []v1beta1.Broker{
35+
{
36+
Id: 0,
37+
},
38+
{
39+
Id: 1,
40+
},
41+
{
42+
Id: 2,
43+
},
44+
},
45+
},
46+
Status: v1beta1.KafkaClusterStatus{
47+
BrokersState: map[string]v1beta1.BrokerState{
48+
"0": {
49+
GracefulActionState: v1beta1.GracefulActionState{CruiseControlState: v1beta1.GracefulUpdateSucceeded},
50+
},
51+
"1": {
52+
GracefulActionState: v1beta1.GracefulActionState{CruiseControlState: v1beta1.GracefulUpdateFailed},
53+
},
54+
"2": {
55+
GracefulActionState: v1beta1.GracefulActionState{CruiseControlState: v1beta1.GracefulUpdateNotRequired},
56+
},
57+
"3": {
58+
GracefulActionState: v1beta1.GracefulActionState{CruiseControlState: v1beta1.GracefulUpdateRequired},
59+
},
60+
},
61+
},
62+
},
63+
},
64+
{
65+
testName: "pending and running CC tasks",
66+
kafkaCluster: v1beta1.KafkaCluster{
67+
Spec: v1beta1.KafkaClusterSpec{
68+
Brokers: []v1beta1.Broker{
69+
{
70+
Id: 0,
71+
},
72+
{
73+
Id: 1,
74+
},
75+
{
76+
Id: 2,
77+
},
78+
{
79+
Id: 4,
80+
},
81+
},
82+
},
83+
Status: v1beta1.KafkaClusterStatus{
84+
BrokersState: map[string]v1beta1.BrokerState{
85+
"0": {
86+
GracefulActionState: v1beta1.GracefulActionState{
87+
CruiseControlTaskId: "cc-task-id-1",
88+
CruiseControlState: v1beta1.GracefulUpdateRunning,
89+
},
90+
},
91+
"1": {
92+
GracefulActionState: v1beta1.GracefulActionState{CruiseControlState: v1beta1.GracefulUpdateFailed},
93+
},
94+
"2": {
95+
GracefulActionState: v1beta1.GracefulActionState{CruiseControlState: v1beta1.GracefulUpdateRequired},
96+
},
97+
"3": {
98+
GracefulActionState: v1beta1.GracefulActionState{CruiseControlState: v1beta1.GracefulUpdateRequired},
99+
},
100+
"4": {
101+
GracefulActionState: v1beta1.GracefulActionState{CruiseControlState: v1beta1.GracefulUpdateRunning},
102+
},
103+
},
104+
},
105+
},
106+
expectedIDs: []int32{0, 2},
107+
},
108+
}
109+
110+
t.Parallel()
111+
112+
for _, test := range testCases {
113+
test := test
114+
115+
t.Run(test.testName, func(t *testing.T) {
116+
actual := GetBrokersWithPendingOrRunningCCTask(&test.kafkaCluster)
117+
118+
if !reflect.DeepEqual(actual, test.expectedIDs) {
119+
t.Error("Expected:", test.expectedIDs, ", got:", actual)
120+
}
121+
})
122+
}
123+
}

0 commit comments

Comments
 (0)