Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 18ff986

Browse files
authoredNov 11, 2024··
feat: improve kube api usage and reduce unnecessary reconciliations (#155)
* feat: improve kube api usage and reduce unnecessary reconciliations Signed-off-by: Soumya Ghosh Dastidar <gdsoumya@gmail.com> --------- Signed-off-by: Soumya Ghosh Dastidar <gdsoumya@gmail.com>
1 parent cf2a376 commit 18ff986

14 files changed

+214
-97
lines changed
 

‎cmd/exporter/main.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ package main
1818

1919
import (
2020
"net/http"
21+
"time"
2122

23+
"k8s.io/apimachinery/pkg/util/runtime"
2224
_ "k8s.io/client-go/plugin/pkg/client/auth"
25+
"k8s.io/client-go/util/workqueue"
2326

2427
"github.com/prometheus/client_golang/prometheus/promhttp"
2528
"github.com/sirupsen/logrus"
@@ -39,15 +42,21 @@ func init() {
3942
}
4043

4144
func main() {
42-
clients := clients.ClientSets{}
45+
stop := make(chan struct{})
46+
defer close(stop)
47+
defer runtime.HandleCrash()
48+
49+
wq := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
50+
defer wq.ShutDown()
4351

4452
//Getting kubeConfig and Generate ClientSets
45-
if err := clients.GenerateClientSetFromKubeConfig(); err != nil {
53+
clientset, err := clients.NewClientSet(stop, 5*time.Minute, wq)
54+
if err != nil {
4655
log.Fatalf("Unable to Get the kubeconfig, err: %v", err)
4756
}
4857

4958
// Trigger the chaos metrics collection
50-
go controller.Exporter(clients)
59+
go controller.Exporter(clientset, wq)
5160

5261
//This section will start the HTTP server and expose metrics on the /metrics endpoint.
5362
http.Handle("/metrics", promhttp.Handler())

‎controller/collect-data.go

+13-14
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package controller
22

33
import (
4-
"context"
54
"math"
65
"strconv"
76
"strings"
@@ -12,15 +11,15 @@ import (
1211
litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
1312
corev1 "k8s.io/api/core/v1"
1413
k8serrors "k8s.io/apimachinery/pkg/api/errors"
15-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/apimachinery/pkg/labels"
1615
clientTypes "k8s.io/apimachinery/pkg/types"
1716
)
1817

1918
//go:generate mockgen -destination=mocks/mock_collect-data.go -package=mocks github.com/litmuschaos/chaos-exporter/controller ResultCollector
2019

2120
// ResultCollector interface for the both functions GetResultList and getExperimentMetricsFromResult
2221
type ResultCollector interface {
23-
GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) (litmuschaosv1alpha1.ChaosResultList, error)
22+
GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) ([]*v1alpha1.ChaosResult, error)
2423
GetExperimentMetricsFromResult(chaosResult *litmuschaosv1alpha1.ChaosResult, clients clients.ClientSets) (bool, error)
2524
SetResultDetails()
2625
GetResultDetails() ChaosResultDetails
@@ -30,28 +29,28 @@ type ResultDetails struct {
3029
}
3130

3231
// GetResultList return the result list correspond to the monitoring enabled chaosengine
33-
func (r *ResultDetails) GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) (litmuschaosv1alpha1.ChaosResultList, error) {
32+
func (r *ResultDetails) GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) ([]*v1alpha1.ChaosResult, error) {
3433

35-
chaosResultList, err := clients.LitmusClient.LitmuschaosV1alpha1().ChaosResults(chaosNamespace).List(context.Background(), metav1.ListOptions{})
34+
chaosResultList, err := clients.ResultInformer.ChaosResults(chaosNamespace).List(labels.Everything())
3635
if err != nil {
37-
return litmuschaosv1alpha1.ChaosResultList{}, err
36+
return nil, err
3837
}
3938
// waiting until any chaosresult found
40-
if len(chaosResultList.Items) == 0 {
39+
if len(chaosResultList) == 0 {
4140
if monitoringEnabled.IsChaosResultsAvailable {
4241
monitoringEnabled.IsChaosResultsAvailable = false
4342
log.Warnf("No chaosresult found!")
4443
log.Info("[Wait]: Waiting for the chaosresult ... ")
4544
}
46-
return litmuschaosv1alpha1.ChaosResultList{}, nil
45+
return nil, nil
4746
}
4847

4948
if !monitoringEnabled.IsChaosResultsAvailable {
5049
log.Info("[Wait]: Cheers! Wait is over, found desired chaosresult")
5150
monitoringEnabled.IsChaosResultsAvailable = true
5251
}
5352

54-
return *chaosResultList, nil
53+
return chaosResultList, nil
5554
}
5655

5756
// GetExperimentMetricsFromResult derive all the metrics data from the chaosresult and set into resultDetails struct
@@ -61,7 +60,7 @@ func (r *ResultDetails) GetExperimentMetricsFromResult(chaosResult *litmuschaosv
6160
if err != nil {
6261
return false, err
6362
}
64-
engine, err := clients.LitmusClient.LitmuschaosV1alpha1().ChaosEngines(chaosResult.Namespace).Get(context.Background(), chaosResult.Spec.EngineName, metav1.GetOptions{})
63+
engine, err := clients.EngineInformer.ChaosEngines(chaosResult.Namespace).Get(chaosResult.Spec.EngineName)
6564
if err != nil {
6665
// k8serrors.IsNotFound(err) checking k8s resource is found or not,
6766
// It will skip this result if k8s resource is not found.
@@ -267,14 +266,14 @@ func getProbeSuccessPercentage(chaosResult *litmuschaosv1alpha1.ChaosResult) (fl
267266
// getEventsForSpecificInvolvedResource derive all the events correspond to the specific resource
268267
func getEventsForSpecificInvolvedResource(clients clients.ClientSets, resourceUID clientTypes.UID, chaosNamespace string) (corev1.EventList, error) {
269268
finalEventList := corev1.EventList{}
270-
eventsList, err := clients.KubeClient.CoreV1().Events(chaosNamespace).List(context.Background(), metav1.ListOptions{})
269+
eventsList, err := clients.EventsInformer.Events(chaosNamespace).List(labels.Everything())
271270
if err != nil {
272271
return corev1.EventList{}, err
273272
}
274273

275-
for _, event := range eventsList.Items {
276-
if event.InvolvedObject.UID == resourceUID {
277-
finalEventList.Items = append(finalEventList.Items, event)
274+
for _, event := range eventsList {
275+
if event != nil && event.InvolvedObject.UID == resourceUID {
276+
finalEventList.Items = append(finalEventList.Items, *event)
278277
}
279278
}
280279
return finalEventList, nil

‎controller/collect-data_test.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package controller_test
22

33
import (
44
"context"
5+
"testing"
6+
57
"github.com/litmuschaos/chaos-exporter/controller"
68
"github.com/litmuschaos/chaos-exporter/pkg/clients"
79
"github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
@@ -12,7 +14,7 @@ import (
1214
"k8s.io/apimachinery/pkg/runtime"
1315
"k8s.io/client-go/kubernetes/fake"
1416
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
15-
"testing"
17+
"k8s.io/client-go/util/workqueue"
1618
)
1719

1820
func TestGetResultList(t *testing.T) {
@@ -183,5 +185,8 @@ func CreateFakeClient(t *testing.T) clients.ClientSets {
183185
cs := clients.ClientSets{}
184186
cs.KubeClient = fake.NewSimpleClientset([]runtime.Object{}...)
185187
cs.LitmusClient = litmusFakeClientSet.NewSimpleClientset([]runtime.Object{}...)
188+
stopCh := make(chan struct{})
189+
err := cs.SetupInformers(stopCh, cs.KubeClient, cs.LitmusClient, 0, workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()))
190+
require.NoError(t, err)
186191
return cs
187192
}

‎controller/controller.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,15 @@ limitations under the License.
1717
package controller
1818

1919
import (
20-
"time"
21-
2220
"github.com/litmuschaos/chaos-exporter/pkg/clients"
2321
"github.com/litmuschaos/chaos-exporter/pkg/log"
2422
litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
2523
"github.com/prometheus/client_golang/prometheus"
24+
"k8s.io/client-go/util/workqueue"
2625
)
2726

2827
// Exporter continuously collects the chaos metrics for a given chaosengine
29-
func Exporter(clients clients.ClientSets) {
28+
func Exporter(clientSet clients.ClientSets, wq workqueue.RateLimitingInterface) {
3029
log.Info("Started creating Metrics")
3130
// Register the fixed (count) chaos metrics
3231
log.Info("Registering Fixed Metrics")
@@ -35,7 +34,7 @@ func Exporter(clients clients.ClientSets) {
3534
ResultCollector: &ResultDetails{},
3635
}
3736
//gaugeMetrics := GaugeMetrics{}
38-
overallChaosResults := litmuschaosv1alpha1.ChaosResultList{}
37+
overallChaosResults := []*litmuschaosv1alpha1.ChaosResult{}
3938

4039
r.GaugeMetrics.InitializeGaugeMetrics().
4140
RegisterFixedMetrics()
@@ -45,11 +44,18 @@ func Exporter(clients clients.ClientSets) {
4544
IsChaosEnginesAvailable: true,
4645
}
4746

48-
for {
49-
if err := r.GetLitmusChaosMetrics(clients, &overallChaosResults, &monitoringEnabled); err != nil {
47+
// refresh metrics whenever there's a change in chaosengine or chaosresult
48+
// or every informer resync duration, whichever is earlier
49+
for _, done := wq.Get(); !done; _, done = wq.Get() {
50+
needRequeue, err := r.GetLitmusChaosMetrics(clientSet, &overallChaosResults, &monitoringEnabled)
51+
if err != nil {
5052
log.Errorf("err: %v", err)
5153
}
52-
time.Sleep(1000 * time.Millisecond)
54+
wq.Done(clients.ProcessKey)
55+
// Add after
56+
if needRequeue != nil {
57+
wq.AddAfter(clients.ProcessKey, *needRequeue)
58+
}
5359
}
5460
}
5561

‎controller/handle-result-deletion.go

+22-15
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,16 @@ import (
44
"fmt"
55
"os"
66
"strconv"
7+
"time"
78

89
litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
910
)
1011

1112
// unsetDeletedChaosResults unset the metrics correspond to deleted chaosresults
12-
func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newChaosResults *litmuschaosv1alpha1.ChaosResultList) {
13-
for _, oldResult := range oldChaosResults.Items {
13+
func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newChaosResults []*litmuschaosv1alpha1.ChaosResult) {
14+
for _, oldResult := range oldChaosResults {
1415
found := false
15-
for _, newResult := range newChaosResults.Items {
16+
for _, newResult := range newChaosResults {
1617
if oldResult.UID == newResult.UID {
1718
found = true
1819
break
@@ -22,7 +23,7 @@ func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newC
2223
if !found {
2324
for _, value := range resultStore[string(oldResult.UID)] {
2425

25-
probeSuccesPercentage, _ := getProbeSuccessPercentage(&oldResult)
26+
probeSuccesPercentage, _ := getProbeSuccessPercentage(oldResult)
2627
resultDetails := initialiseResult().
2728
setName(oldResult.Name).
2829
setNamespace(oldResult.Namespace).
@@ -46,10 +47,13 @@ func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newC
4647

4748
// unsetOutdatedMetrics unset the metrics when chaosresult verdict changes
4849
// if same chaosresult is continuously repeated more than scrape interval then it sets the metrics value to 0
49-
func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResultDetails) float64 {
50+
func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResultDetails) (float64, *time.Duration) {
5051
scrapeTime, _ := strconv.Atoi(getEnv("TSDB_SCRAPE_INTERVAL", "10"))
5152
result, ok := matchVerdict[string(resultDetails.UID)]
5253
reset := false
54+
var needRequeue *time.Duration
55+
56+
scrapeDuration := time.Duration(scrapeTime) * time.Second
5357

5458
switch ok {
5559
case true:
@@ -58,20 +62,23 @@ func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResult
5862
case result.Verdict != resultDetails.Verdict:
5963
gaugeMetrics.ResultVerdict.DeleteLabelValues(resultDetails.Namespace, resultDetails.Name, resultDetails.ChaosEngineName,
6064
resultDetails.ChaosEngineContext, result.Verdict, fmt.Sprintf("%f", result.ProbeSuccessPercentage), resultDetails.AppLabel,
61-
resultDetails.AppNs, resultDetails.AppKind, resultDetails.WorkflowName, result.FaultName)
62-
result.Count = 1
65+
resultDetails.AppNs, resultDetails.AppKind, resultDetails.WorkflowName, resultDetails.FaultName)
66+
result.Timer = time.Now()
67+
needRequeue = &scrapeDuration
6368
default:
6469
// if time passed scrape time then reset the value to 0
65-
if result.Count >= scrapeTime {
70+
if time.Since(result.Timer) >= scrapeDuration {
6671
reset = true
6772
} else {
68-
result.Count++
73+
scrapeDuration = scrapeDuration - time.Since(result.Timer)
74+
needRequeue = &scrapeDuration
6975
}
7076
}
7177
default:
7278
result = initialiseResultData().
73-
setCount(1).
79+
setTimer(time.Now()).
7480
setVerdictReset(false)
81+
needRequeue = &scrapeDuration
7582
}
7683

7784
// update the values inside matchVerdict
@@ -80,9 +87,9 @@ func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResult
8087
setVerdictReset(reset)
8188

8289
if reset {
83-
return float64(0)
90+
return float64(0), needRequeue
8491
}
85-
return float64(1)
92+
return float64(1), needRequeue
8693
}
8794

8895
// getEnv derived the ENVs and sets the default value if env contains empty value
@@ -105,7 +112,7 @@ func (resultDetails *ChaosResultDetails) setResultData() {
105112
setAppLabel(resultDetails.AppLabel).
106113
setVerdict(resultDetails.Verdict).
107114
setFaultName(resultDetails.FaultName).
108-
setCount(0).
115+
setTimer(time.Now()).
109116
setVerdictReset(false).
110117
setProbeSuccesPercentage(resultDetails.ProbeSuccessPercentage)
111118

@@ -164,8 +171,8 @@ func (resultData *ResultData) setFaultName(fault string) *ResultData {
164171
}
165172

166173
// setCount sets the count inside resultData struct
167-
func (resultData *ResultData) setCount(count int) *ResultData {
168-
resultData.Count = count
174+
func (resultData *ResultData) setTimer(timer time.Time) *ResultData {
175+
resultData.Timer = timer
169176
return resultData
170177
}
171178

‎controller/handle-result-deletion_test.go

+12-15
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package controller
22

33
import (
44
"errors"
5+
"testing"
6+
57
"github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
68
"github.com/stretchr/testify/require"
79
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8-
"testing"
910
)
1011

1112
func Test_unsetDeletedChaosResults(t *testing.T) {
@@ -15,8 +16,8 @@ func Test_unsetDeletedChaosResults(t *testing.T) {
1516
execFunc func(details *ChaosResultDetails)
1617
isErr bool
1718
resultDetails *ChaosResultDetails
18-
oldChaosResult *v1alpha1.ChaosResultList
19-
newChaosResult *v1alpha1.ChaosResultList
19+
oldChaosResult []*v1alpha1.ChaosResult
20+
newChaosResult []*v1alpha1.ChaosResult
2021
}{
2122
{
2223
name: "success: deleted chaosResult",
@@ -26,21 +27,17 @@ func Test_unsetDeletedChaosResults(t *testing.T) {
2627
resultDetails: &ChaosResultDetails{
2728
UID: "FAKE-UID-OLD",
2829
},
29-
oldChaosResult: &v1alpha1.ChaosResultList{
30-
Items: []v1alpha1.ChaosResult{
31-
{
32-
ObjectMeta: metav1.ObjectMeta{
33-
UID: "FAKE-UID-OLD",
34-
},
30+
oldChaosResult: []*v1alpha1.ChaosResult{
31+
{
32+
ObjectMeta: metav1.ObjectMeta{
33+
UID: "FAKE-UID-OLD",
3534
},
3635
},
3736
},
38-
newChaosResult: &v1alpha1.ChaosResultList{
39-
Items: []v1alpha1.ChaosResult{
40-
{
41-
ObjectMeta: metav1.ObjectMeta{
42-
UID: "FAKE-UID-NEW",
43-
},
37+
newChaosResult: []*v1alpha1.ChaosResult{
38+
{
39+
ObjectMeta: metav1.ObjectMeta{
40+
UID: "FAKE-UID-NEW",
4441
},
4542
},
4643
},

‎controller/mocks/mock_collect-data.go

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
There was a problem loading the remainder of the diff.

0 commit comments

Comments
 (0)
Please sign in to comment.