Skip to content

Add abstraction for GetMetricsData processing #1325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 11 additions & 60 deletions pkg/job/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package job
import (
"context"
"fmt"
"math"
"math/rand"
"sync"

Expand All @@ -17,71 +16,23 @@ func runCustomNamespaceJob(
logger logging.Logger,
job model.CustomNamespaceJob,
clientCloudwatch cloudwatch.Client,
metricsPerQuery int,
gmdProcessor getMetricDataProcessor,
) []*model.CloudwatchData {
cw := []*model.CloudwatchData{}

mux := &sync.Mutex{}
var wg sync.WaitGroup

getMetricDatas := getMetricDataForQueriesForCustomNamespace(ctx, job, clientCloudwatch, logger)
metricDataLength := len(getMetricDatas)
if metricDataLength == 0 {
cloudwatchDatas := getMetricDataForQueriesForCustomNamespace(ctx, job, clientCloudwatch, logger)
if len(cloudwatchDatas) == 0 {
logger.Debug("No metrics data found")
return cw
return nil
}

maxMetricCount := metricsPerQuery
length := getMetricDataInputLength(job.Metrics)
partition := int(math.Ceil(float64(metricDataLength) / float64(maxMetricCount)))
logger.Debug("GetMetricData partitions", "total", partition)

wg.Add(partition)

for i := 0; i < metricDataLength; i += maxMetricCount {
go func(i int) {
defer wg.Done()

end := i + maxMetricCount
if end > metricDataLength {
end = metricDataLength
}
input := getMetricDatas[i:end]
data := clientCloudwatch.GetMetricData(ctx, logger, input, job.Namespace, length, job.Delay, job.RoundingPeriod)

if data != nil {
output := make([]*model.CloudwatchData, 0)
for _, result := range data {
getMetricData, err := findGetMetricDataByIDForCustomNamespace(input, result.ID)
if err == nil {
getMetricData.GetMetricDataResult = &model.GetMetricDataResult{
Statistic: getMetricData.GetMetricDataProcessingParams.Statistic,
Datapoint: result.Datapoint,
Timestamp: result.Timestamp,
}
// All done processing we can drop the processing params
getMetricData.GetMetricDataProcessingParams = nil
output = append(output, getMetricData)
}
}
mux.Lock()
cw = append(cw, output...)
mux.Unlock()
}
}(i)
jobLength := getLargestLengthForMetrics(job.Metrics)
var err error
cloudwatchDatas, err = gmdProcessor.Run(ctx, logger, job.Namespace, jobLength, job.Delay, job.RoundingPeriod, cloudwatchDatas)
if err != nil {
logger.Error(err, "Failed to get metric data")
return nil
}

wg.Wait()
return cw
}

func findGetMetricDataByIDForCustomNamespace(getMetricDatas []*model.CloudwatchData, value string) (*model.CloudwatchData, error) {
for _, getMetricData := range getMetricDatas {
if getMetricData.GetMetricDataResult == nil && getMetricData.GetMetricDataProcessingParams.QueryID == value {
return getMetricData, nil
}
}
return nil, fmt.Errorf("metric with id %s not found", value)
return cloudwatchDatas
}

func getMetricDataForQueriesForCustomNamespace(
Expand Down
120 changes: 13 additions & 107 deletions pkg/job/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"strings"
"sync"

"golang.org/x/sync/errgroup"

"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/tagging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
Expand All @@ -23,28 +20,29 @@ type resourceAssociator interface {
AssociateMetricToResource(cwMetric *model.Metric) (*model.TaggedResource, bool)
}

type getMetricDataProcessor interface {
Run(ctx context.Context, logger logging.Logger, namespace string, jobMetricLength int64, jobMetricDelay int64, jobRoundingPeriod *int64, requests []*model.CloudwatchData) ([]*model.CloudwatchData, error)
}

func runDiscoveryJob(
ctx context.Context,
logger logging.Logger,
job model.DiscoveryJob,
region string,
clientTag tagging.Client,
clientCloudwatch cloudwatch.Client,
metricsPerQuery int,
cloudwatchConcurrency cloudwatch.ConcurrencyConfig,
gmdProcessor getMetricDataProcessor,
) ([]*model.TaggedResource, []*model.CloudwatchData) {
logger.Debug("Get tagged resources")

cw := []*model.CloudwatchData{}

resources, err := clientTag.GetResources(ctx, job, region)
if err != nil {
if errors.Is(err, tagging.ErrExpectedToFindResources) {
logger.Error(err, "No tagged resources made it through filtering")
} else {
logger.Error(err, "Couldn't describe resources")
}
return resources, cw
return nil, nil
}

if len(resources) == 0 {
Expand All @@ -53,114 +51,22 @@ func runDiscoveryJob(

svc := config.SupportedServices.GetService(job.Type)
getMetricDatas := getMetricDataForQueries(ctx, logger, job, svc, clientCloudwatch, resources)
metricDataLength := len(getMetricDatas)
if metricDataLength == 0 {
if len(getMetricDatas) == 0 {
logger.Info("No metrics data found")
return resources, cw
}

maxMetricCount := metricsPerQuery
length := getMetricDataInputLength(job.Metrics)
partitionSize := int(math.Ceil(float64(metricDataLength) / float64(maxMetricCount)))
logger.Debug("GetMetricData partitions", "size", partitionSize)

g, gCtx := errgroup.WithContext(ctx)
// TODO: don't know if this is ok, double check, and should work for either per-api and single cl
g.SetLimit(cloudwatchConcurrency.GetMetricData)

mu := sync.Mutex{}
getMetricDataOutput := make([][]cloudwatch.MetricDataResult, 0, partitionSize)

count := 0

for i := 0; i < metricDataLength; i += maxMetricCount {
start := i
end := i + maxMetricCount
if end > metricDataLength {
end = metricDataLength
}
partitionNum := count
count++

g.Go(func() error {
logger.Debug("GetMetricData partition", "start", start, "end", end, "partitionNum", partitionNum)

input := getMetricDatas[start:end]
data := clientCloudwatch.GetMetricData(gCtx, logger, input, svc.Namespace, length, job.Delay, job.RoundingPeriod)
if data != nil {
mu.Lock()
getMetricDataOutput = append(getMetricDataOutput, data)
mu.Unlock()
} else {
logger.Warn("GetMetricData partition empty result", "start", start, "end", end, "partitionNum", partitionNum)
}

return nil
})
return resources, nil
}

if err = g.Wait(); err != nil {
logger.Error(err, "GetMetricData work group error")
jobLength := getLargestLengthForMetrics(job.Metrics)
getMetricDatas, err = gmdProcessor.Run(ctx, logger, svc.Namespace, jobLength, job.Delay, job.RoundingPeriod, getMetricDatas)
if err != nil {
logger.Error(err, "Failed to get metric data")
return nil, nil
}

mapResultsToMetricDatas(getMetricDataOutput, getMetricDatas, logger)

// Remove unprocessed/unknown elements in place, if any. Since getMetricDatas
// is a slice of pointers, the compaction can be easily done in-place.
getMetricDatas = compact(getMetricDatas, func(m *model.CloudwatchData) bool {
return m.GetMetricDataResult != nil
})
return resources, getMetricDatas
}

// mapResultsToMetricDatas walks over all CW GetMetricData results, and map each one with the corresponding model.CloudwatchData.
//
// This has been extracted into a separate function to make benchmarking easier.
func mapResultsToMetricDatas(output [][]cloudwatch.MetricDataResult, datas []*model.CloudwatchData, logger logging.Logger) {
// queryIDToData is a support structure used to easily find via a QueryID, the corresponding
// model.CloudatchData.
queryIDToData := make(map[string]*model.CloudwatchData, len(datas))

// load the index
for _, data := range datas {
queryIDToData[data.GetMetricDataProcessingParams.QueryID] = data
}

// Update getMetricDatas slice with values and timestamps from API response.
// We iterate through the response MetricDataResults and match the result ID
// with what was sent in the API request.
// In the event that the API response contains any ID we don't know about
// (shouldn't really happen) we log a warning and move on. On the other hand,
// in case the API response does not contain results for all the IDs we've
// requested, unprocessed elements will be removed later on.
for _, data := range output {
if data == nil {
continue
}
for _, metricDataResult := range data {
// find into index
metricData, ok := queryIDToData[metricDataResult.ID]
if !ok {
logger.Warn("GetMetricData returned unknown metric ID", "metric_id", metricDataResult.ID)
continue
}
// skip elements that have been already mapped but still exist in queryIDToData
if metricData.GetMetricDataResult != nil {
continue
}
metricData.GetMetricDataResult = &model.GetMetricDataResult{
Statistic: metricData.GetMetricDataProcessingParams.Statistic,
Datapoint: metricDataResult.Datapoint,
Timestamp: metricDataResult.Timestamp,
}
// All GetMetricData processing is done clear the params
metricData.GetMetricDataProcessingParams = nil
}
}
}

func getMetricDataInputLength(metrics []*model.MetricConfig) int64 {
func getLargestLengthForMetrics(metrics []*model.MetricConfig) int64 {
var length int64
for _, metric := range metrics {
if metric.Length > length {
Expand Down
Loading
Loading