Skip to content

Commit 3b5e265

Browse files
Add abstraction for GetMetricsData processing (#1325)
Co-authored-by: Cristian Greco <[email protected]>
1 parent 6d60544 commit 3b5e265

File tree

8 files changed

+669
-534
lines changed

8 files changed

+669
-534
lines changed

pkg/job/custom.go

Lines changed: 11 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package job
33
import (
44
"context"
55
"fmt"
6-
"math"
76
"math/rand"
87
"sync"
98

@@ -17,71 +16,23 @@ func runCustomNamespaceJob(
1716
logger logging.Logger,
1817
job model.CustomNamespaceJob,
1918
clientCloudwatch cloudwatch.Client,
20-
metricsPerQuery int,
19+
gmdProcessor getMetricDataProcessor,
2120
) []*model.CloudwatchData {
22-
cw := []*model.CloudwatchData{}
23-
24-
mux := &sync.Mutex{}
25-
var wg sync.WaitGroup
26-
27-
getMetricDatas := getMetricDataForQueriesForCustomNamespace(ctx, job, clientCloudwatch, logger)
28-
metricDataLength := len(getMetricDatas)
29-
if metricDataLength == 0 {
21+
cloudwatchDatas := getMetricDataForQueriesForCustomNamespace(ctx, job, clientCloudwatch, logger)
22+
if len(cloudwatchDatas) == 0 {
3023
logger.Debug("No metrics data found")
31-
return cw
24+
return nil
3225
}
3326

34-
maxMetricCount := metricsPerQuery
35-
length := getMetricDataInputLength(job.Metrics)
36-
partition := int(math.Ceil(float64(metricDataLength) / float64(maxMetricCount)))
37-
logger.Debug("GetMetricData partitions", "total", partition)
38-
39-
wg.Add(partition)
40-
41-
for i := 0; i < metricDataLength; i += maxMetricCount {
42-
go func(i int) {
43-
defer wg.Done()
44-
45-
end := i + maxMetricCount
46-
if end > metricDataLength {
47-
end = metricDataLength
48-
}
49-
input := getMetricDatas[i:end]
50-
data := clientCloudwatch.GetMetricData(ctx, logger, input, job.Namespace, length, job.Delay, job.RoundingPeriod)
51-
52-
if data != nil {
53-
output := make([]*model.CloudwatchData, 0)
54-
for _, result := range data {
55-
getMetricData, err := findGetMetricDataByIDForCustomNamespace(input, result.ID)
56-
if err == nil {
57-
getMetricData.GetMetricDataResult = &model.GetMetricDataResult{
58-
Statistic: getMetricData.GetMetricDataProcessingParams.Statistic,
59-
Datapoint: result.Datapoint,
60-
Timestamp: result.Timestamp,
61-
}
62-
// All done processing we can drop the processing params
63-
getMetricData.GetMetricDataProcessingParams = nil
64-
output = append(output, getMetricData)
65-
}
66-
}
67-
mux.Lock()
68-
cw = append(cw, output...)
69-
mux.Unlock()
70-
}
71-
}(i)
27+
jobLength := getLargestLengthForMetrics(job.Metrics)
28+
var err error
29+
cloudwatchDatas, err = gmdProcessor.Run(ctx, logger, job.Namespace, jobLength, job.Delay, job.RoundingPeriod, cloudwatchDatas)
30+
if err != nil {
31+
logger.Error(err, "Failed to get metric data")
32+
return nil
7233
}
7334

74-
wg.Wait()
75-
return cw
76-
}
77-
78-
func findGetMetricDataByIDForCustomNamespace(getMetricDatas []*model.CloudwatchData, value string) (*model.CloudwatchData, error) {
79-
for _, getMetricData := range getMetricDatas {
80-
if getMetricData.GetMetricDataResult == nil && getMetricData.GetMetricDataProcessingParams.QueryID == value {
81-
return getMetricData, nil
82-
}
83-
}
84-
return nil, fmt.Errorf("metric with id %s not found", value)
35+
return cloudwatchDatas
8536
}
8637

8738
func getMetricDataForQueriesForCustomNamespace(

pkg/job/discovery.go

Lines changed: 13 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,10 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"math"
87
"math/rand"
98
"strings"
109
"sync"
1110

12-
"golang.org/x/sync/errgroup"
13-
1411
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
1512
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/tagging"
1613
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
@@ -23,28 +20,29 @@ type resourceAssociator interface {
2320
AssociateMetricToResource(cwMetric *model.Metric) (*model.TaggedResource, bool)
2421
}
2522

23+
type getMetricDataProcessor interface {
24+
Run(ctx context.Context, logger logging.Logger, namespace string, jobMetricLength int64, jobMetricDelay int64, jobRoundingPeriod *int64, requests []*model.CloudwatchData) ([]*model.CloudwatchData, error)
25+
}
26+
2627
func runDiscoveryJob(
2728
ctx context.Context,
2829
logger logging.Logger,
2930
job model.DiscoveryJob,
3031
region string,
3132
clientTag tagging.Client,
3233
clientCloudwatch cloudwatch.Client,
33-
metricsPerQuery int,
34-
cloudwatchConcurrency cloudwatch.ConcurrencyConfig,
34+
gmdProcessor getMetricDataProcessor,
3535
) ([]*model.TaggedResource, []*model.CloudwatchData) {
3636
logger.Debug("Get tagged resources")
3737

38-
cw := []*model.CloudwatchData{}
39-
4038
resources, err := clientTag.GetResources(ctx, job, region)
4139
if err != nil {
4240
if errors.Is(err, tagging.ErrExpectedToFindResources) {
4341
logger.Error(err, "No tagged resources made it through filtering")
4442
} else {
4543
logger.Error(err, "Couldn't describe resources")
4644
}
47-
return resources, cw
45+
return nil, nil
4846
}
4947

5048
if len(resources) == 0 {
@@ -53,114 +51,22 @@ func runDiscoveryJob(
5351

5452
svc := config.SupportedServices.GetService(job.Type)
5553
getMetricDatas := getMetricDataForQueries(ctx, logger, job, svc, clientCloudwatch, resources)
56-
metricDataLength := len(getMetricDatas)
57-
if metricDataLength == 0 {
54+
if len(getMetricDatas) == 0 {
5855
logger.Info("No metrics data found")
59-
return resources, cw
60-
}
61-
62-
maxMetricCount := metricsPerQuery
63-
length := getMetricDataInputLength(job.Metrics)
64-
partitionSize := int(math.Ceil(float64(metricDataLength) / float64(maxMetricCount)))
65-
logger.Debug("GetMetricData partitions", "size", partitionSize)
66-
67-
g, gCtx := errgroup.WithContext(ctx)
68-
// TODO: don't know if this is ok, double check, and should work for either per-api and single cl
69-
g.SetLimit(cloudwatchConcurrency.GetMetricData)
70-
71-
mu := sync.Mutex{}
72-
getMetricDataOutput := make([][]cloudwatch.MetricDataResult, 0, partitionSize)
73-
74-
count := 0
75-
76-
for i := 0; i < metricDataLength; i += maxMetricCount {
77-
start := i
78-
end := i + maxMetricCount
79-
if end > metricDataLength {
80-
end = metricDataLength
81-
}
82-
partitionNum := count
83-
count++
84-
85-
g.Go(func() error {
86-
logger.Debug("GetMetricData partition", "start", start, "end", end, "partitionNum", partitionNum)
87-
88-
input := getMetricDatas[start:end]
89-
data := clientCloudwatch.GetMetricData(gCtx, logger, input, svc.Namespace, length, job.Delay, job.RoundingPeriod)
90-
if data != nil {
91-
mu.Lock()
92-
getMetricDataOutput = append(getMetricDataOutput, data)
93-
mu.Unlock()
94-
} else {
95-
logger.Warn("GetMetricData partition empty result", "start", start, "end", end, "partitionNum", partitionNum)
96-
}
97-
98-
return nil
99-
})
56+
return resources, nil
10057
}
10158

102-
if err = g.Wait(); err != nil {
103-
logger.Error(err, "GetMetricData work group error")
59+
jobLength := getLargestLengthForMetrics(job.Metrics)
60+
getMetricDatas, err = gmdProcessor.Run(ctx, logger, svc.Namespace, jobLength, job.Delay, job.RoundingPeriod, getMetricDatas)
61+
if err != nil {
62+
logger.Error(err, "Failed to get metric data")
10463
return nil, nil
10564
}
10665

107-
mapResultsToMetricDatas(getMetricDataOutput, getMetricDatas, logger)
108-
109-
// Remove unprocessed/unknown elements in place, if any. Since getMetricDatas
110-
// is a slice of pointers, the compaction can be easily done in-place.
111-
getMetricDatas = compact(getMetricDatas, func(m *model.CloudwatchData) bool {
112-
return m.GetMetricDataResult != nil
113-
})
11466
return resources, getMetricDatas
11567
}
11668

117-
// mapResultsToMetricDatas walks over all CW GetMetricData results, and map each one with the corresponding model.CloudwatchData.
118-
//
119-
// This has been extracted into a separate function to make benchmarking easier.
120-
func mapResultsToMetricDatas(output [][]cloudwatch.MetricDataResult, datas []*model.CloudwatchData, logger logging.Logger) {
121-
// queryIDToData is a support structure used to easily find via a QueryID, the corresponding
122-
// model.CloudatchData.
123-
queryIDToData := make(map[string]*model.CloudwatchData, len(datas))
124-
125-
// load the index
126-
for _, data := range datas {
127-
queryIDToData[data.GetMetricDataProcessingParams.QueryID] = data
128-
}
129-
130-
// Update getMetricDatas slice with values and timestamps from API response.
131-
// We iterate through the response MetricDataResults and match the result ID
132-
// with what was sent in the API request.
133-
// In the event that the API response contains any ID we don't know about
134-
// (shouldn't really happen) we log a warning and move on. On the other hand,
135-
// in case the API response does not contain results for all the IDs we've
136-
// requested, unprocessed elements will be removed later on.
137-
for _, data := range output {
138-
if data == nil {
139-
continue
140-
}
141-
for _, metricDataResult := range data {
142-
// find into index
143-
metricData, ok := queryIDToData[metricDataResult.ID]
144-
if !ok {
145-
logger.Warn("GetMetricData returned unknown metric ID", "metric_id", metricDataResult.ID)
146-
continue
147-
}
148-
// skip elements that have been already mapped but still exist in queryIDToData
149-
if metricData.GetMetricDataResult != nil {
150-
continue
151-
}
152-
metricData.GetMetricDataResult = &model.GetMetricDataResult{
153-
Statistic: metricData.GetMetricDataProcessingParams.Statistic,
154-
Datapoint: metricDataResult.Datapoint,
155-
Timestamp: metricDataResult.Timestamp,
156-
}
157-
// All GetMetricData processing is done clear the params
158-
metricData.GetMetricDataProcessingParams = nil
159-
}
160-
}
161-
}
162-
163-
func getMetricDataInputLength(metrics []*model.MetricConfig) int64 {
69+
func getLargestLengthForMetrics(metrics []*model.MetricConfig) int64 {
16470
var length int64
16571
for _, metric := range metrics {
16672
if metric.Length > length {

0 commit comments

Comments
 (0)