diff --git a/pkg/job/custom.go b/pkg/job/custom.go index 10f61d60f..4fb0a0465 100644 --- a/pkg/job/custom.go +++ b/pkg/job/custom.go @@ -3,7 +3,6 @@ package job import ( "context" "fmt" - "math" "math/rand" "sync" @@ -17,71 +16,23 @@ func runCustomNamespaceJob( logger logging.Logger, job model.CustomNamespaceJob, clientCloudwatch cloudwatch.Client, - metricsPerQuery int, + gmdProcessor getMetricDataProcessor, ) []*model.CloudwatchData { - cw := []*model.CloudwatchData{} - - mux := &sync.Mutex{} - var wg sync.WaitGroup - - getMetricDatas := getMetricDataForQueriesForCustomNamespace(ctx, job, clientCloudwatch, logger) - metricDataLength := len(getMetricDatas) - if metricDataLength == 0 { + cloudwatchDatas := getMetricDataForQueriesForCustomNamespace(ctx, job, clientCloudwatch, logger) + if len(cloudwatchDatas) == 0 { logger.Debug("No metrics data found") - return cw + return nil } - maxMetricCount := metricsPerQuery - length := getMetricDataInputLength(job.Metrics) - partition := int(math.Ceil(float64(metricDataLength) / float64(maxMetricCount))) - logger.Debug("GetMetricData partitions", "total", partition) - - wg.Add(partition) - - for i := 0; i < metricDataLength; i += maxMetricCount { - go func(i int) { - defer wg.Done() - - end := i + maxMetricCount - if end > metricDataLength { - end = metricDataLength - } - input := getMetricDatas[i:end] - data := clientCloudwatch.GetMetricData(ctx, logger, input, job.Namespace, length, job.Delay, job.RoundingPeriod) - - if data != nil { - output := make([]*model.CloudwatchData, 0) - for _, result := range data { - getMetricData, err := findGetMetricDataByIDForCustomNamespace(input, result.ID) - if err == nil { - getMetricData.GetMetricDataResult = &model.GetMetricDataResult{ - Statistic: getMetricData.GetMetricDataProcessingParams.Statistic, - Datapoint: result.Datapoint, - Timestamp: result.Timestamp, - } - // All done processing we can drop the processing params - getMetricData.GetMetricDataProcessingParams = nil - output = append(output, getMetricData) - } - } - mux.Lock() - cw = append(cw, output...) - mux.Unlock() - } - }(i) + jobLength := getLargestLengthForMetrics(job.Metrics) + var err error + cloudwatchDatas, err = gmdProcessor.Run(ctx, logger, job.Namespace, jobLength, job.Delay, job.RoundingPeriod, cloudwatchDatas) + if err != nil { + logger.Error(err, "Failed to get metric data") + return nil } - wg.Wait() - return cw -} - -func findGetMetricDataByIDForCustomNamespace(getMetricDatas []*model.CloudwatchData, value string) (*model.CloudwatchData, error) { - for _, getMetricData := range getMetricDatas { - if getMetricData.GetMetricDataResult == nil && getMetricData.GetMetricDataProcessingParams.QueryID == value { - return getMetricData, nil - } - } - return nil, fmt.Errorf("metric with id %s not found", value) + return cloudwatchDatas } func getMetricDataForQueriesForCustomNamespace( diff --git a/pkg/job/discovery.go b/pkg/job/discovery.go index 7f0e67c5c..38d3f9bfe 100644 --- a/pkg/job/discovery.go +++ b/pkg/job/discovery.go @@ -4,13 +4,10 @@ import ( "context" "errors" "fmt" - "math" "math/rand" "strings" "sync" - "golang.org/x/sync/errgroup" - "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/tagging" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config" @@ -23,6 +20,10 @@ type resourceAssociator interface { AssociateMetricToResource(cwMetric *model.Metric) (*model.TaggedResource, bool) } +type getMetricDataProcessor interface { + Run(ctx context.Context, logger logging.Logger, namespace string, jobMetricLength int64, jobMetricDelay int64, jobRoundingPeriod *int64, requests []*model.CloudwatchData) ([]*model.CloudwatchData, error) +} + func runDiscoveryJob( ctx context.Context, logger logging.Logger, @@ -30,13 +31,10 @@ func runDiscoveryJob( region string, clientTag tagging.Client, clientCloudwatch cloudwatch.Client, - metricsPerQuery int, - cloudwatchConcurrency cloudwatch.ConcurrencyConfig, + gmdProcessor getMetricDataProcessor, ) ([]*model.TaggedResource, []*model.CloudwatchData) { logger.Debug("Get tagged resources") - cw := []*model.CloudwatchData{} - resources, err := clientTag.GetResources(ctx, job, region) if err != nil { if errors.Is(err, tagging.ErrExpectedToFindResources) { @@ -44,7 +42,7 @@ func runDiscoveryJob( } else { logger.Error(err, "Couldn't describe resources") } - return resources, cw + return nil, nil } if len(resources) == 0 { @@ -53,114 +51,22 @@ func runDiscoveryJob( svc := config.SupportedServices.GetService(job.Type) getMetricDatas := getMetricDataForQueries(ctx, logger, job, svc, clientCloudwatch, resources) - metricDataLength := len(getMetricDatas) - if metricDataLength == 0 { + if len(getMetricDatas) == 0 { logger.Info("No metrics data found") - return resources, cw - } - - maxMetricCount := metricsPerQuery - length := getMetricDataInputLength(job.Metrics) - partitionSize := int(math.Ceil(float64(metricDataLength) / float64(maxMetricCount))) - logger.Debug("GetMetricData partitions", "size", partitionSize) - - g, gCtx := errgroup.WithContext(ctx) - // TODO: don't know if this is ok, double check, and should work for either per-api and single cl - g.SetLimit(cloudwatchConcurrency.GetMetricData) - - mu := sync.Mutex{} - getMetricDataOutput := make([][]cloudwatch.MetricDataResult, 0, partitionSize) - - count := 0 - - for i := 0; i < metricDataLength; i += maxMetricCount { - start := i - end := i + maxMetricCount - if end > metricDataLength { - end = metricDataLength - } - partitionNum := count - count++ - - g.Go(func() error { - logger.Debug("GetMetricData partition", "start", start, "end", end, "partitionNum", partitionNum) - - input := getMetricDatas[start:end] - data := clientCloudwatch.GetMetricData(gCtx, logger, input, svc.Namespace, length, job.Delay, job.RoundingPeriod) - if data != nil { - mu.Lock() - getMetricDataOutput = append(getMetricDataOutput, data) - mu.Unlock() - } else { - logger.Warn("GetMetricData partition empty result", "start", start, "end", end, "partitionNum", partitionNum) - } - - return nil - }) + return resources, nil } - if err = g.Wait(); err != nil { - logger.Error(err, "GetMetricData work group error") + jobLength := getLargestLengthForMetrics(job.Metrics) + getMetricDatas, err = gmdProcessor.Run(ctx, logger, svc.Namespace, jobLength, job.Delay, job.RoundingPeriod, getMetricDatas) + if err != nil { + logger.Error(err, "Failed to get metric data") return nil, nil } - mapResultsToMetricDatas(getMetricDataOutput, getMetricDatas, logger) - - // Remove unprocessed/unknown elements in place, if any. Since getMetricDatas - // is a slice of pointers, the compaction can be easily done in-place. - getMetricDatas = compact(getMetricDatas, func(m *model.CloudwatchData) bool { - return m.GetMetricDataResult != nil - }) return resources, getMetricDatas } -// mapResultsToMetricDatas walks over all CW GetMetricData results, and map each one with the corresponding model.CloudwatchData. -// -// This has been extracted into a separate function to make benchmarking easier. -func mapResultsToMetricDatas(output [][]cloudwatch.MetricDataResult, datas []*model.CloudwatchData, logger logging.Logger) { - // queryIDToData is a support structure used to easily find via a QueryID, the corresponding - // model.CloudatchData. - queryIDToData := make(map[string]*model.CloudwatchData, len(datas)) - - // load the index - for _, data := range datas { - queryIDToData[data.GetMetricDataProcessingParams.QueryID] = data - } - - // Update getMetricDatas slice with values and timestamps from API response. - // We iterate through the response MetricDataResults and match the result ID - // with what was sent in the API request. - // In the event that the API response contains any ID we don't know about - // (shouldn't really happen) we log a warning and move on. On the other hand, - // in case the API response does not contain results for all the IDs we've - // requested, unprocessed elements will be removed later on. - for _, data := range output { - if data == nil { - continue - } - for _, metricDataResult := range data { - // find into index - metricData, ok := queryIDToData[metricDataResult.ID] - if !ok { - logger.Warn("GetMetricData returned unknown metric ID", "metric_id", metricDataResult.ID) - continue - } - // skip elements that have been already mapped but still exist in queryIDToData - if metricData.GetMetricDataResult != nil { - continue - } - metricData.GetMetricDataResult = &model.GetMetricDataResult{ - Statistic: metricData.GetMetricDataProcessingParams.Statistic, - Datapoint: metricDataResult.Datapoint, - Timestamp: metricDataResult.Timestamp, - } - // All GetMetricData processing is done clear the params - metricData.GetMetricDataProcessingParams = nil - } - } -} - -func getMetricDataInputLength(metrics []*model.MetricConfig) int64 { +func getLargestLengthForMetrics(metrics []*model.MetricConfig) int64 { var length int64 for _, metric := range metrics { if metric.Length > length { diff --git a/pkg/job/discovery_test.go b/pkg/job/discovery_test.go index b6581c67e..383710f3b 100644 --- a/pkg/job/discovery_test.go +++ b/pkg/job/discovery_test.go @@ -1,15 +1,10 @@ package job import ( - "fmt" "testing" - "time" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/maxdimassociator" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging" @@ -442,361 +437,3 @@ func Test_getFilteredMetricDatas(t *testing.T) { }) } } - -func Test_mapResultsToMetricDatas(t *testing.T) { - type args struct { - metricDataResults [][]cloudwatch.MetricDataResult - cloudwatchDatas []*model.CloudwatchData - } - tests := []struct { - name string - args args - wantCloudwatchDatas []*model.CloudwatchData - }{ - { - "all datapoints present", - args{ - metricDataResults: [][]cloudwatch.MetricDataResult{ - { - {ID: "metric-3", Datapoint: aws.Float64(15), Timestamp: time.Date(2023, time.June, 7, 3, 9, 8, 0, time.UTC)}, - {ID: "metric-1", Datapoint: aws.Float64(5), Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC)}, - }, - { - {ID: "metric-4", Datapoint: aws.Float64(20), Timestamp: time.Date(2023, time.June, 7, 4, 9, 8, 0, time.UTC)}, - }, - { - {ID: "metric-2", Datapoint: aws.Float64(12), Timestamp: time.Date(2023, time.June, 7, 2, 9, 8, 0, time.UTC)}, - }, - }, - cloudwatchDatas: []*model.CloudwatchData{ - {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-1", Statistic: "Min"}, MetricName: "MetricOne", Namespace: "svc"}, - {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-2", Statistic: "Max"}, MetricName: "MetricTwo", Namespace: "svc"}, - {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-3", Statistic: "Sum"}, MetricName: "MetricThree", Namespace: "svc"}, - {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-4", Statistic: "Count"}, MetricName: "MetricFour", Namespace: "svc"}, - }, - }, - []*model.CloudwatchData{ - { - MetricName: "MetricOne", - Namespace: "svc", - GetMetricDataResult: &model.GetMetricDataResult{ - Statistic: "Min", - Datapoint: aws.Float64(5), - Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC), - }, - }, - { - MetricName: "MetricTwo", - Namespace: "svc", - GetMetricDataResult: &model.GetMetricDataResult{ - Statistic: "Max", - Datapoint: aws.Float64(12), - Timestamp: time.Date(2023, time.June, 7, 2, 9, 8, 0, time.UTC), - }, - }, - { - MetricName: "MetricThree", - Namespace: "svc", - GetMetricDataResult: &model.GetMetricDataResult{ - Statistic: "Sum", - Datapoint: aws.Float64(15), - Timestamp: time.Date(2023, time.June, 7, 3, 9, 8, 0, time.UTC), - }, - }, - { - MetricName: "MetricFour", - Namespace: "svc", - GetMetricDataResult: &model.GetMetricDataResult{ - Statistic: "Count", - Datapoint: aws.Float64(20), - Timestamp: time.Date(2023, time.June, 7, 4, 9, 8, 0, time.UTC), - }, - }, - }, - }, - { - "duplicate results", - args{ - metricDataResults: [][]cloudwatch.MetricDataResult{ - { - {ID: "metric-1", Datapoint: aws.Float64(5), Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC)}, - {ID: "metric-1", Datapoint: aws.Float64(15), Timestamp: time.Date(2023, time.June, 7, 2, 9, 8, 0, time.UTC)}, - }, - }, - cloudwatchDatas: []*model.CloudwatchData{ - {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-1", Statistic: "Min"}, MetricName: "MetricOne", Namespace: "svc"}, - }, - }, - []*model.CloudwatchData{ - { - MetricName: "MetricOne", - Namespace: "svc", - GetMetricDataResult: &model.GetMetricDataResult{ - Statistic: "Min", - Datapoint: aws.Float64(5), - Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC), - }, - }, - }, - }, - { - "unexpected result ID", - args{ - metricDataResults: [][]cloudwatch.MetricDataResult{ - { - {ID: "metric-1", Datapoint: aws.Float64(5), Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC)}, - {ID: "metric-2", Datapoint: aws.Float64(15), Timestamp: time.Date(2023, time.June, 7, 2, 9, 8, 0, time.UTC)}, - }, - }, - cloudwatchDatas: []*model.CloudwatchData{ - {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-1", Statistic: "Min"}, MetricName: "MetricOne", Namespace: "svc"}, - }, - }, - []*model.CloudwatchData{ - { - MetricName: "MetricOne", - Namespace: "svc", - GetMetricDataResult: &model.GetMetricDataResult{ - Statistic: "Min", - Datapoint: aws.Float64(5), - Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC), - }, - }, - }, - }, - { - "nil metric data result", - args{ - metricDataResults: [][]cloudwatch.MetricDataResult{ - { - {ID: "metric-1", Datapoint: aws.Float64(5), Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC)}, - }, - nil, - { - {ID: "metric-2", Datapoint: aws.Float64(12), Timestamp: time.Date(2023, time.June, 7, 2, 9, 8, 0, time.UTC)}, - }, - }, - cloudwatchDatas: []*model.CloudwatchData{ - {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-1", Statistic: "Min"}, MetricName: "MetricOne", Namespace: "svc"}, - {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-2", Statistic: "Max"}, MetricName: "MetricTwo", Namespace: "svc"}, - }, - }, - []*model.CloudwatchData{ - { - MetricName: "MetricOne", - Namespace: "svc", - GetMetricDataResult: &model.GetMetricDataResult{ - Statistic: "Min", - Datapoint: aws.Float64(5), - Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC), - }, - }, - { - MetricName: "MetricTwo", - Namespace: "svc", - GetMetricDataResult: &model.GetMetricDataResult{ - Statistic: "Max", - Datapoint: aws.Float64(12), - Timestamp: time.Date(2023, time.June, 7, 2, 9, 8, 0, time.UTC), - }, - }, - }, - }, - { - "missing metric data result", - args{ - metricDataResults: [][]cloudwatch.MetricDataResult{ - { - {ID: "metric-1", Datapoint: aws.Float64(5), Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC)}, - }, - }, - cloudwatchDatas: []*model.CloudwatchData{ - {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-1", Statistic: "Min"}, MetricName: "MetricOne", Namespace: "svc"}, - {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-2", Statistic: "Max"}, MetricName: "MetricTwo", Namespace: "svc"}, - }, - }, - []*model.CloudwatchData{ - { - MetricName: "MetricOne", - Namespace: "svc", - GetMetricDataResult: &model.GetMetricDataResult{ - Statistic: "Min", - Datapoint: aws.Float64(5), - Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC), - }, - }, - { - MetricName: "MetricTwo", - Namespace: "svc", - GetMetricDataResult: nil, - }, - }, - }, - { - "nil metric datapoint", - args{ - metricDataResults: [][]cloudwatch.MetricDataResult{ - { - {ID: "metric-1", Datapoint: aws.Float64(5), Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC)}, - {ID: "metric-2"}, - }, - }, - cloudwatchDatas: []*model.CloudwatchData{ - {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-1", Statistic: "Min"}, MetricName: "MetricOne", Namespace: "svc"}, - {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-2", Statistic: "Max"}, MetricName: "MetricTwo", Namespace: "svc"}, - }, - }, - []*model.CloudwatchData{ - { - MetricName: "MetricOne", - Namespace: "svc", - GetMetricDataResult: &model.GetMetricDataResult{ - Statistic: "Min", - Datapoint: aws.Float64(5), - Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC), - }, - }, - { - MetricName: "MetricTwo", - Namespace: "svc", - GetMetricDataResult: &model.GetMetricDataResult{ - Statistic: "Max", - Datapoint: nil, - Timestamp: time.Time{}, - }, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - mapResultsToMetricDatas(tt.args.metricDataResults, tt.args.cloudwatchDatas, logging.NewNopLogger()) - // mapResultsToMetricDatas() modifies its []*model.CloudwatchData parameter in-place, assert that it was updated - - // Ensure processing params were nil'ed when expected to be - for _, data := range tt.args.cloudwatchDatas { - if data.GetMetricDataResult != nil { - require.Nil(t, data.GetMetricDataProcessingParams, "GetMetricDataResult is not nil GetMetricDataProcessingParams should have been nil") - } else { - require.NotNil(t, data.GetMetricDataProcessingParams, "GetMetricDataResult is nil GetMetricDataProcessingParams should not have been nil") - } - - // Drop processing params to simplify further asserts - data.GetMetricDataProcessingParams = nil - } - require.ElementsMatch(t, tt.wantCloudwatchDatas, tt.args.cloudwatchDatas) - }) - } -} - -func getSampleMetricDatas(id string) *model.CloudwatchData { - return &model.CloudwatchData{ - MetricName: "StorageBytes", - Dimensions: []model.Dimension{ - { - Name: "FileSystemId", - Value: "fs-abc123", - }, - { - Name: "StorageClass", - Value: "Standard", - }, - }, - ResourceName: id, - Namespace: "efs", - Tags: []model.Tag{ - { - Key: "Value1", - Value: "", - }, - { - Key: "Value2", - Value: "", - }, - }, - MetricMigrationParams: model.MetricMigrationParams{ - NilToZero: false, - AddCloudwatchTimestamp: false, - }, - GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{ - QueryID: id, - Period: 60, - Length: 60, - Delay: 0, - Statistic: "Average", - }, - } -} - -func BenchmarkMapResultsToMetricDatas(b *testing.B) { - type testcase struct { - metricsPerQuery int - testResourcesCount int - metricsPerResource int - } - - for name, tc := range map[string]testcase{ - "small case": { - metricsPerQuery: 500, - testResourcesCount: 10, - metricsPerResource: 10, - }, - "medium case": { - metricsPerQuery: 500, - testResourcesCount: 1000, - metricsPerResource: 50, - }, - "big case": { - metricsPerQuery: 500, - testResourcesCount: 2000, - metricsPerResource: 50, - }, - } { - b.Run(name, func(b *testing.B) { - doBench(b, tc.metricsPerQuery, tc.testResourcesCount, tc.metricsPerResource) - }) - } -} - -func doBench(b *testing.B, metricsPerQuery, testResourcesCount, metricsPerResource int) { - outputs := [][]cloudwatch.MetricDataResult{} - now := time.Now() - testResourceIDs := make([]string, testResourcesCount) - - for i := 0; i < testResourcesCount; i++ { - testResourceIDs[i] = fmt.Sprintf("test-resource-%d", i) - } - - totalMetricsDatapoints := metricsPerResource * testResourcesCount - batchesCount := totalMetricsDatapoints / metricsPerQuery - - if batchesCount == 0 { - batchesCount = 1 - } - - for batch := 0; batch < batchesCount; batch++ { - newBatchOutputs := make([]cloudwatch.MetricDataResult, 0) - for i := 0; i < metricsPerQuery; i++ { - id := testResourceIDs[(batch*metricsPerQuery+i)%testResourcesCount] - newBatchOutputs = append(newBatchOutputs, cloudwatch.MetricDataResult{ - ID: id, - Datapoint: aws.Float64(1.4 * float64(batch)), - Timestamp: now, - }) - } - outputs = append(outputs, newBatchOutputs) - } - - for i := 0; i < b.N; i++ { - // stop timer to not affect benchmark run - // this has to do in every run, since mapResultsToMetricDatas mutates the metric datas slice - b.StopTimer() - datas := []*model.CloudwatchData{} - for i := 0; i < testResourcesCount; i++ { - datas = append(datas, getSampleMetricDatas(testResourceIDs[i])) - } - // re-start timer - b.StartTimer() - mapResultsToMetricDatas(outputs, datas, logging.NewNopLogger()) - } -} diff --git a/pkg/job/compact.go b/pkg/job/getmetricdata/compact.go similarity index 96% rename from pkg/job/compact.go rename to pkg/job/getmetricdata/compact.go index ef2a9cbac..5fe1f88fe 100644 --- a/pkg/job/compact.go +++ b/pkg/job/getmetricdata/compact.go @@ -1,4 +1,4 @@ -package job +package getmetricdata // compact iterates over a slice of pointers and deletes // unwanted elements as per the keep function return value. diff --git a/pkg/job/compact_test.go b/pkg/job/getmetricdata/compact_test.go similarity index 98% rename from pkg/job/compact_test.go rename to pkg/job/getmetricdata/compact_test.go index 9b4e625d8..09a32d118 100644 --- a/pkg/job/compact_test.go +++ b/pkg/job/getmetricdata/compact_test.go @@ -1,4 +1,4 @@ -package job +package getmetricdata import ( "testing" diff --git a/pkg/job/getmetricdata/processor.go b/pkg/job/getmetricdata/processor.go new file mode 100644 index 000000000..58d56a85e --- /dev/null +++ b/pkg/job/getmetricdata/processor.go @@ -0,0 +1,124 @@ +package getmetricdata + +import ( + "context" + "fmt" + "math" + "sync" + + "golang.org/x/sync/errgroup" + + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model" +) + +type Client interface { + GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64) []cloudwatch.MetricDataResult +} + +type Processor struct { + metricsPerQuery int + client Client + concurrency int +} + +func NewProcessor(client Client, metricsPerQuery int, concurrency int) Processor { + return Processor{ + metricsPerQuery: metricsPerQuery, + client: client, + concurrency: concurrency, + } +} + +func (p Processor) Run(ctx context.Context, logger logging.Logger, namespace string, jobMetricLength int64, jobMetricDelay int64, jobRoundingPeriod *int64, requests []*model.CloudwatchData) ([]*model.CloudwatchData, error) { + metricDataLength := len(requests) + partitionSize := int(math.Ceil(float64(metricDataLength) / float64(p.metricsPerQuery))) + logger.Debug("GetMetricData partitions", "size", partitionSize) + getMetricDataOutput := make([][]cloudwatch.MetricDataResult, 0, partitionSize) + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(p.concurrency) + mu := sync.Mutex{} + count := 0 + for i := 0; i < metricDataLength; i += p.metricsPerQuery { + start := i + end := i + p.metricsPerQuery + if end > metricDataLength { + end = metricDataLength + } + partitionNum := count + count++ + + g.Go(func() error { + input := requests[start:end] + data := p.client.GetMetricData(gCtx, logger, input, namespace, jobMetricLength, jobMetricDelay, jobRoundingPeriod) + if data != nil { + mu.Lock() + getMetricDataOutput = append(getMetricDataOutput, data) + mu.Unlock() + } else { + logger.Warn("GetMetricData partition empty result", "start", start, "end", end, "partitionNum", partitionNum) + } + + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, fmt.Errorf("GetMetricData work group error: %w", err) + } + + mapResultsToMetricDatas(getMetricDataOutput, requests, logger) + + // Remove unprocessed/unknown elements in place, if any. Since getMetricDatas + // is a slice of pointers, the compaction can be easily done in-place. + requests = compact(requests, func(m *model.CloudwatchData) bool { + return m.GetMetricDataResult != nil + }) + + return requests, nil +} + +func mapResultsToMetricDatas(output [][]cloudwatch.MetricDataResult, datas []*model.CloudwatchData, logger logging.Logger) { + // queryIDToData is a support structure used to easily find via a QueryID, the corresponding + // model.CloudatchData. + queryIDToData := make(map[string]*model.CloudwatchData, len(datas)) + + // load the index + for _, data := range datas { + queryIDToData[data.GetMetricDataProcessingParams.QueryID] = data + } + + // Update getMetricDatas slice with values and timestamps from API response. + // We iterate through the response MetricDataResults and match the result ID + // with what was sent in the API request. + // In the event that the API response contains any ID we don't know about + // (shouldn't really happen) we log a warning and move on. On the other hand, + // in case the API response does not contain results for all the IDs we've + // requested, unprocessed elements will be removed later on. + for _, data := range output { + if data == nil { + continue + } + for _, metricDataResult := range data { + // find into index + metricData, ok := queryIDToData[metricDataResult.ID] + if !ok { + logger.Warn("GetMetricData returned unknown metric ID", "metric_id", metricDataResult.ID) + continue + } + // skip elements that have been already mapped but still exist in queryIDToData + if metricData.GetMetricDataResult != nil { + continue + } + metricData.GetMetricDataResult = &model.GetMetricDataResult{ + Statistic: metricData.GetMetricDataProcessingParams.Statistic, + Datapoint: metricDataResult.Datapoint, + Timestamp: metricDataResult.Timestamp, + } + // All GetMetricData processing is done clear the params + metricData.GetMetricDataProcessingParams = nil + } + } +} diff --git a/pkg/job/getmetricdata/processor_test.go b/pkg/job/getmetricdata/processor_test.go new file mode 100644 index 000000000..2abfb6510 --- /dev/null +++ b/pkg/job/getmetricdata/processor_test.go @@ -0,0 +1,512 @@ +package getmetricdata + +import ( + "context" + "fmt" + "strconv" + "sync/atomic" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model" +) + +func Test_mapResultsToMetricDatas(t *testing.T) { + type args struct { + metricDataResults [][]cloudwatch.MetricDataResult + cloudwatchDatas []*model.CloudwatchData + } + tests := []struct { + name string + args args + wantCloudwatchDatas []*model.CloudwatchData + }{ + { + "all datapoints present", + args{ + metricDataResults: [][]cloudwatch.MetricDataResult{ + { + {ID: "metric-3", Datapoint: aws.Float64(15), Timestamp: time.Date(2023, time.June, 7, 3, 9, 8, 0, time.UTC)}, + {ID: "metric-1", Datapoint: aws.Float64(5), Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC)}, + }, + { + {ID: "metric-4", Datapoint: aws.Float64(20), Timestamp: time.Date(2023, time.June, 7, 4, 9, 8, 0, time.UTC)}, + }, + { + {ID: "metric-2", Datapoint: aws.Float64(12), Timestamp: time.Date(2023, time.June, 7, 2, 9, 8, 0, time.UTC)}, + }, + }, + cloudwatchDatas: []*model.CloudwatchData{ + {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-1", Statistic: "Min"}, MetricName: "MetricOne", Namespace: "svc"}, + {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-2", Statistic: "Max"}, MetricName: "MetricTwo", Namespace: "svc"}, + {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-3", Statistic: "Sum"}, MetricName: "MetricThree", Namespace: "svc"}, + {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-4", Statistic: "Count"}, MetricName: "MetricFour", Namespace: "svc"}, + }, + }, + []*model.CloudwatchData{ + { + MetricName: "MetricOne", + Namespace: "svc", + GetMetricDataResult: &model.GetMetricDataResult{ + Statistic: "Min", + Datapoint: aws.Float64(5), + Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC), + }, + }, + { + MetricName: "MetricTwo", + Namespace: "svc", + GetMetricDataResult: &model.GetMetricDataResult{ + Statistic: "Max", + Datapoint: aws.Float64(12), + Timestamp: time.Date(2023, time.June, 7, 2, 9, 8, 0, time.UTC), + }, + }, + { + MetricName: "MetricThree", + Namespace: "svc", + GetMetricDataResult: &model.GetMetricDataResult{ + Statistic: "Sum", + Datapoint: aws.Float64(15), + Timestamp: time.Date(2023, time.June, 7, 3, 9, 8, 0, time.UTC), + }, + }, + { + MetricName: "MetricFour", + Namespace: "svc", + GetMetricDataResult: &model.GetMetricDataResult{ + Statistic: "Count", + Datapoint: aws.Float64(20), + Timestamp: time.Date(2023, time.June, 7, 4, 9, 8, 0, time.UTC), + }, + }, + }, + }, + { + "duplicate results", + args{ + metricDataResults: [][]cloudwatch.MetricDataResult{ + { + {ID: "metric-1", Datapoint: aws.Float64(5), Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC)}, + {ID: "metric-1", Datapoint: aws.Float64(15), Timestamp: time.Date(2023, time.June, 7, 2, 9, 8, 0, time.UTC)}, + }, + }, + cloudwatchDatas: []*model.CloudwatchData{ + {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-1", Statistic: "Min"}, MetricName: "MetricOne", Namespace: "svc"}, + }, + }, + []*model.CloudwatchData{ + { + MetricName: "MetricOne", + Namespace: "svc", + GetMetricDataResult: &model.GetMetricDataResult{ + Statistic: "Min", + Datapoint: aws.Float64(5), + Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC), + }, + }, + }, + }, + { + "unexpected result ID", + args{ + metricDataResults: [][]cloudwatch.MetricDataResult{ + { + {ID: "metric-1", Datapoint: aws.Float64(5), Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC)}, + {ID: "metric-2", Datapoint: aws.Float64(15), Timestamp: time.Date(2023, time.June, 7, 2, 9, 8, 0, time.UTC)}, + }, + }, + cloudwatchDatas: []*model.CloudwatchData{ + {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-1", Statistic: "Min"}, MetricName: "MetricOne", Namespace: "svc"}, + }, + }, + []*model.CloudwatchData{ + { + MetricName: "MetricOne", + Namespace: "svc", + GetMetricDataResult: &model.GetMetricDataResult{ + Statistic: "Min", + Datapoint: aws.Float64(5), + Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC), + }, + }, + }, + }, + { + "nil metric data result", + args{ + metricDataResults: [][]cloudwatch.MetricDataResult{ + { + {ID: "metric-1", Datapoint: aws.Float64(5), Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC)}, + }, + nil, + { + {ID: "metric-2", Datapoint: aws.Float64(12), Timestamp: time.Date(2023, time.June, 7, 2, 9, 8, 0, time.UTC)}, + }, + }, + cloudwatchDatas: []*model.CloudwatchData{ + {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-1", Statistic: "Min"}, MetricName: "MetricOne", Namespace: "svc"}, + {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-2", Statistic: "Max"}, MetricName: "MetricTwo", Namespace: "svc"}, + }, + }, + []*model.CloudwatchData{ + { + MetricName: "MetricOne", + Namespace: "svc", + GetMetricDataResult: &model.GetMetricDataResult{ + Statistic: "Min", + Datapoint: aws.Float64(5), + Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC), + }, + }, + { + MetricName: "MetricTwo", + Namespace: "svc", + GetMetricDataResult: &model.GetMetricDataResult{ + Statistic: "Max", + Datapoint: aws.Float64(12), + Timestamp: time.Date(2023, time.June, 7, 2, 9, 8, 0, time.UTC), + }, + }, + }, + }, + { + "missing metric data result", + args{ + metricDataResults: [][]cloudwatch.MetricDataResult{ + { + {ID: "metric-1", Datapoint: aws.Float64(5), Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC)}, + }, + }, + cloudwatchDatas: []*model.CloudwatchData{ + {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-1", Statistic: "Min"}, MetricName: "MetricOne", Namespace: "svc"}, + {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-2", Statistic: "Max"}, MetricName: "MetricTwo", Namespace: "svc"}, + }, + }, + []*model.CloudwatchData{ + { + MetricName: "MetricOne", + Namespace: "svc", + GetMetricDataResult: &model.GetMetricDataResult{ + Statistic: "Min", + Datapoint: aws.Float64(5), + Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC), + }, + }, + { + MetricName: "MetricTwo", + Namespace: "svc", + GetMetricDataResult: nil, + }, + }, + }, + { + "nil metric datapoint", + args{ + metricDataResults: [][]cloudwatch.MetricDataResult{ + { + {ID: "metric-1", Datapoint: aws.Float64(5), Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC)}, + {ID: "metric-2"}, + }, + }, + cloudwatchDatas: []*model.CloudwatchData{ + {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-1", Statistic: "Min"}, MetricName: "MetricOne", Namespace: "svc"}, + {GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{QueryID: "metric-2", Statistic: "Max"}, MetricName: "MetricTwo", Namespace: "svc"}, + }, + }, + []*model.CloudwatchData{ + { + MetricName: "MetricOne", + Namespace: "svc", + GetMetricDataResult: &model.GetMetricDataResult{ + Statistic: "Min", + Datapoint: aws.Float64(5), + Timestamp: time.Date(2023, time.June, 7, 1, 9, 8, 0, time.UTC), + }, + }, + { + MetricName: "MetricTwo", + Namespace: "svc", + GetMetricDataResult: &model.GetMetricDataResult{ + Statistic: "Max", + Datapoint: nil, + Timestamp: time.Time{}, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mapResultsToMetricDatas(tt.args.metricDataResults, tt.args.cloudwatchDatas, logging.NewNopLogger()) + // mapResultsToMetricDatas() modifies its []*model.CloudwatchData parameter in-place, assert that it was updated + + // Ensure processing params were nil'ed when expected to be + for _, data := range tt.args.cloudwatchDatas { + if data.GetMetricDataResult != nil { + require.Nil(t, data.GetMetricDataProcessingParams, "GetMetricDataResult is not nil GetMetricDataProcessingParams should have been nil") + } else { + require.NotNil(t, data.GetMetricDataProcessingParams, "GetMetricDataResult is nil GetMetricDataProcessingParams should not have been nil") + } + + // Drop processing params to simplify further asserts + data.GetMetricDataProcessingParams = nil + } + require.ElementsMatch(t, tt.wantCloudwatchDatas, tt.args.cloudwatchDatas) + }) + } +} + +func TestProcessor_Run(t *testing.T) { + now := time.Now() + tests := []struct { + name string + requests []*model.GetMetricDataProcessingParams + getMetricDataResponse []cloudwatch.MetricDataResult + want []*model.GetMetricDataResult + }{ + { + name: "successfully maps input to output when GetMetricData returns data", + requests: []*model.GetMetricDataProcessingParams{{ + QueryID: "1234", + Statistic: "Average", + }}, + getMetricDataResponse: []cloudwatch.MetricDataResult{{ID: "1234", Datapoint: aws.Float64(1000), Timestamp: now}}, + want: []*model.GetMetricDataResult{{ + Statistic: "Average", + Datapoint: aws.Float64(1000), + Timestamp: now, + }}, + }, + { + name: "does not return a request when QueryID is not in MetricDataResult", + requests: []*model.GetMetricDataProcessingParams{{ + QueryID: "1234", + Statistic: "Average", + }, { + QueryID: "make-me-disappear", + Statistic: "Average", + }}, + getMetricDataResponse: []cloudwatch.MetricDataResult{{ID: "1234", Datapoint: aws.Float64(1000), Timestamp: now}}, + want: []*model.GetMetricDataResult{{ + Statistic: "Average", + Datapoint: aws.Float64(1000), + Timestamp: now, + }}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := Processor{ + metricsPerQuery: 500, + client: testClient{GetMetricDataResponse: tt.getMetricDataResponse}, + concurrency: 1, + } + cloudwatchData, err := r.Run(context.Background(), logging.NewNopLogger(), "anything_is_fine", 1, 1, aws.Int64(1), getMetricDataProcessingParamsToCloudwatchData(tt.requests)) + require.NoError(t, err) + require.Len(t, cloudwatchData, len(tt.want)) + got := make([]*model.GetMetricDataResult, 0, len(cloudwatchData)) + for _, data := range cloudwatchData { + assert.Nil(t, data.GetMetricStatisticsResult) + assert.Nil(t, data.GetMetricDataProcessingParams) + assert.NotNil(t, data.GetMetricDataResult) + got = append(got, data.GetMetricDataResult) + } + + assert.ElementsMatch(t, tt.want, got) + }) + } +} + +func TestProcessor_Run_BatchesByMetricsPerQuery(t *testing.T) { + now := time.Now() + tests := []struct { + name string + metricsPerQuery int + numberOfRequests int + expectedNumberOfCallsToGetMetricData int32 + }{ + {name: "1 per batch", metricsPerQuery: 1, numberOfRequests: 10, expectedNumberOfCallsToGetMetricData: 10}, + {name: "divisible batches and requests", metricsPerQuery: 5, numberOfRequests: 100, expectedNumberOfCallsToGetMetricData: 20}, + {name: "indivisible batches and requests", metricsPerQuery: 5, numberOfRequests: 94, expectedNumberOfCallsToGetMetricData: 19}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var callCounter atomic.Int32 + getMetricDataFunc := func(_ context.Context, _ logging.Logger, requests []*model.CloudwatchData, _ string, _ int64, _ int64, _ *int64) []cloudwatch.MetricDataResult { + callCounter.Add(1) + response := make([]cloudwatch.MetricDataResult, 0, len(requests)) + for _, gmd := range requests { + response = append(response, cloudwatch.MetricDataResult{ + ID: gmd.GetMetricDataProcessingParams.QueryID, + Datapoint: aws.Float64(1000), + Timestamp: now, + }) + } + return response + } + + requests := make([]*model.CloudwatchData, 0, tt.numberOfRequests) + for i := 0; i < tt.numberOfRequests; i++ { + requests = append(requests, getSampleMetricDatas(strconv.Itoa(i))) + } + r := Processor{ + metricsPerQuery: tt.metricsPerQuery, + client: testClient{GetMetricDataFunc: getMetricDataFunc}, + concurrency: 1, + } + cloudwatchData, err := r.Run(context.Background(), logging.NewNopLogger(), "anything_is_fine", 1, 1, aws.Int64(1), requests) + require.NoError(t, err) + assert.Len(t, cloudwatchData, tt.numberOfRequests) + assert.Equal(t, tt.expectedNumberOfCallsToGetMetricData, callCounter.Load()) + }) + } +} + +func getMetricDataProcessingParamsToCloudwatchData(params []*model.GetMetricDataProcessingParams) []*model.CloudwatchData { + output := make([]*model.CloudwatchData, 0, len(params)) + for _, param := range params { + cloudwatchData := &model.CloudwatchData{ + MetricName: "test-metric", + ResourceName: "test", + Namespace: "test", + Tags: []model.Tag{{Key: "tag", Value: "value"}}, + Dimensions: []model.Dimension{{Name: "dimension", Value: "value"}}, + GetMetricDataProcessingParams: param, + GetMetricDataResult: nil, + GetMetricStatisticsResult: nil, + } + output = append(output, cloudwatchData) + } + return output +} + +type testClient struct { + GetMetricDataFunc func(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64) []cloudwatch.MetricDataResult + GetMetricDataResponse []cloudwatch.MetricDataResult +} + +func (t testClient) GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64) []cloudwatch.MetricDataResult { + if t.GetMetricDataResponse != nil { + return t.GetMetricDataResponse + } + return t.GetMetricDataFunc(ctx, logger, getMetricData, namespace, length, delay, configuredRoundingPeriod) +} + +func getSampleMetricDatas(id string) *model.CloudwatchData { + return &model.CloudwatchData{ + MetricName: "StorageBytes", + Dimensions: []model.Dimension{ + { + Name: "FileSystemId", + Value: "fs-abc123", + }, + { + Name: "StorageClass", + Value: "Standard", + }, + }, + ResourceName: id, + Namespace: "efs", + Tags: []model.Tag{ + { + Key: "Value1", + Value: "", + }, + { + Key: "Value2", + Value: "", + }, + }, + MetricMigrationParams: model.MetricMigrationParams{ + NilToZero: false, + AddCloudwatchTimestamp: false, + }, + GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{ + QueryID: id, + Period: 60, + Length: 60, + Delay: 0, + Statistic: "Average", + }, + } +} + +func BenchmarkMapResultsToMetricDatas(b *testing.B) { + type testcase struct { + metricsPerQuery int + testResourcesCount int + metricsPerResource int + } + + for name, tc := range map[string]testcase{ + "small case": { + metricsPerQuery: 500, + testResourcesCount: 10, + metricsPerResource: 10, + }, + "medium case": { + metricsPerQuery: 500, + testResourcesCount: 1000, + metricsPerResource: 50, + }, + "big case": { + metricsPerQuery: 500, + testResourcesCount: 2000, + metricsPerResource: 50, + }, + } { + b.Run(name, func(b *testing.B) { + doBench(b, tc.metricsPerQuery, tc.testResourcesCount, tc.metricsPerResource) + }) + } +} + +func doBench(b *testing.B, metricsPerQuery, testResourcesCount, metricsPerResource int) { + outputs := [][]cloudwatch.MetricDataResult{} + now := time.Now() + testResourceIDs := make([]string, testResourcesCount) + + for i := 0; i < testResourcesCount; i++ { + testResourceIDs[i] = fmt.Sprintf("test-resource-%d", i) + } + + totalMetricsDatapoints := metricsPerResource * testResourcesCount + batchesCount := totalMetricsDatapoints / metricsPerQuery + + if batchesCount == 0 { + batchesCount = 1 + } + + for batch := 0; batch < batchesCount; batch++ { + newBatchOutputs := make([]cloudwatch.MetricDataResult, 0) + for i := 0; i < metricsPerQuery; i++ { + id := testResourceIDs[(batch*metricsPerQuery+i)%testResourcesCount] + newBatchOutputs = append(newBatchOutputs, cloudwatch.MetricDataResult{ + ID: id, + Datapoint: aws.Float64(1.4 * float64(batch)), + Timestamp: now, + }) + } + outputs = append(outputs, newBatchOutputs) + } + + for i := 0; i < b.N; i++ { + // stop timer to not affect benchmark run + // this has to do in every run, since mapResultsToMetricDatas mutates the metric datas slice + b.StopTimer() + datas := []*model.CloudwatchData{} + for i := 0; i < testResourcesCount; i++ { + datas = append(datas, getSampleMetricDatas(testResourceIDs[i])) + } + // re-start timer + b.StartTimer() + mapResultsToMetricDatas(outputs, datas, logging.NewNopLogger()) + } +} diff --git a/pkg/job/scrape.go b/pkg/job/scrape.go index 0e32759d7..8384983ef 100644 --- a/pkg/job/scrape.go +++ b/pkg/job/scrape.go @@ -7,6 +7,7 @@ import ( "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/getmetricdata" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model" ) @@ -39,7 +40,9 @@ func ScrapeAwsData( } jobLogger = jobLogger.With("account", accountID) - resources, metrics := runDiscoveryJob(ctx, jobLogger, discoveryJob, region, factory.GetTaggingClient(region, role, taggingAPIConcurrency), factory.GetCloudwatchClient(region, role, cloudwatchConcurrency), metricsPerQuery, cloudwatchConcurrency) + cloudwatchClient := factory.GetCloudwatchClient(region, role, cloudwatchConcurrency) + gmdProcessor := getmetricdata.NewProcessor(cloudwatchClient, metricsPerQuery, cloudwatchConcurrency.GetMetricData) + resources, metrics := runDiscoveryJob(ctx, jobLogger, discoveryJob, region, factory.GetTaggingClient(region, role, taggingAPIConcurrency), cloudwatchClient, gmdProcessor) addDataToOutput := len(metrics) != 0 if config.FlagsFromCtx(ctx).IsFeatureEnabled(config.AlwaysReturnInfoMetrics) { addDataToOutput = addDataToOutput || len(resources) != 0 @@ -116,7 +119,9 @@ func ScrapeAwsData( } jobLogger = jobLogger.With("account", accountID) - metrics := runCustomNamespaceJob(ctx, jobLogger, customNamespaceJob, factory.GetCloudwatchClient(region, role, cloudwatchConcurrency), metricsPerQuery) + cloudwatchClient := factory.GetCloudwatchClient(region, role, cloudwatchConcurrency) + gmdProcessor := getmetricdata.NewProcessor(cloudwatchClient, metricsPerQuery, cloudwatchConcurrency.GetMetricData) + metrics := runCustomNamespaceJob(ctx, jobLogger, customNamespaceJob, cloudwatchClient, gmdProcessor) metricResult := model.CloudwatchMetricResult{ Context: &model.ScrapeContext{ Region: region,