Skip to content
Merged
9 changes: 9 additions & 0 deletions exporter/awsemfexporter/grouped_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ func addToGroupedMetric(
// them together into one EMF log event, so don't set batchIndex when it's a summary metric
metadata.batchIndex = i
}

// Handle metric types with container insights
if metadata.receiver == containerInsightsReceiver {
// For container insights, treat gauge metrics as sum to keep metrics in the same EMF record
if metadata.metricDataType == pmetric.MetricTypeGauge {
metadata.metricDataType = pmetric.MetricTypeSum
}
}

groupKey := aws.NewKey(metadata.groupedMetricMetadata, labels)
if _, ok := groupedMetrics[groupKey]; ok {
// if MetricName already exists in metrics map, print warning log
Expand Down
18 changes: 15 additions & 3 deletions exporter/awsemfexporter/grouped_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package awsemfexporter
import (
"encoding/json"
"fmt"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -430,13 +431,24 @@ func TestAddToGroupedMetric(t *testing.T) {
}
assert.Len(t, groupedMetrics, 2)
expectedLabels := map[string]string{"label1": "value1"}
idx := 0

// Sort metadata list to prevent race condition
var metadataList []cWMetricMetadata
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a flaky test. The github runner runs the unit tests with -race. Had to fix it by sorting the metadata list

for _, v := range groupedMetrics {
metadataList = append(metadataList, v.metadata)
}
sort.Slice(metadataList, func(i, j int) bool {
return metadataList[i].batchIndex < metadataList[j].batchIndex
})

for i, metadata := range metadataList {
expectedMetadata := generateTestMetricMetadata(namespace, timestamp, logGroup, logStreamName, instrumentationLibName, metrics.At(0).Type(), i)
assert.Equal(t, expectedMetadata, metadata)
}
for _, v := range groupedMetrics {
assert.Len(t, v.metrics, 1)
assert.Len(t, v.labels, 1)
assert.Equal(t, generateTestMetricMetadata(namespace, timestamp, logGroup, logStreamName, instrumentationLibName, metrics.At(0).Type(), idx), v.metadata)
assert.Equal(t, expectedLabels, v.labels)
idx++
}
})

Expand Down
3 changes: 2 additions & 1 deletion exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
if strings.HasPrefix(serviceName.Str(), "containerInsightsKubeAPIServerScraper") ||
strings.HasPrefix(serviceName.Str(), "containerInsightsDCGMExporterScraper") ||
strings.HasPrefix(serviceName.Str(), "containerInsightsNeuronMonitorScraper") ||
strings.HasPrefix(serviceName.Str(), "containerInsightsKueueMetricsScraper") {
strings.HasPrefix(serviceName.Str(), "containerInsightsKueueMetricsScraper") ||
strings.HasPrefix(serviceName.Str(), "containerInsightsNVMeExporterScraper") {
// the prometheus metrics that come from the container insight receiver need to be clearly tagged as coming from container insights
metricReceiver = containerInsightsReceiver
}
Expand Down
34 changes: 34 additions & 0 deletions exporter/awsemfexporter/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
neuronMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsNeuronMonitorScraper")
kueueMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
kueueMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsKueueMetricsScraper")
nvmeMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
nvmeMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsNVMeExporterScraper")

counterSumMetrics := map[string]*metricInfo{
"spanCounter": {
Expand Down Expand Up @@ -395,6 +397,19 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
"myServiceNS/containerInsightsKueueMetricsScraper",
containerInsightsReceiver,
},
{
"nvme receiver",
nvmeMetric,
map[string]string{
"isItAnError": "false",
"spanName": "testSpan",
},
map[string]string{
"spanName": "testSpan",
},
"myServiceNS/containerInsightsNVMeExporterScraper",
containerInsightsReceiver,
},
}

for _, tc := range testCases {
Expand All @@ -416,6 +431,25 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
}
}

// container insights has special handling such as converting gauges to sum
if tc.expectedReceiver == containerInsightsReceiver {
switch v.metadata.metricDataType {
case pmetric.MetricTypeSum:
assert.Len(t, v.metrics, 2)
assert.Equal(t, tc.counterLabels, v.labels)
assert.True(t, reflect.DeepEqual(counterSumMetrics, v.metrics) ||
reflect.DeepEqual(counterGaugeMetrics, v.metrics),
)
case pmetric.MetricTypeHistogram:
assert.Len(t, v.metrics, 1)
assert.Equal(t, tc.timerLabels, v.labels)
assert.Equal(t, timerMetrics, v.metrics)
default:
assert.Fail(t, fmt.Sprintf("Unhandled metric type %s not expected", v.metadata.metricDataType))
}
continue
}

switch v.metadata.metricDataType {
case pmetric.MetricTypeSum:
assert.Len(t, v.metrics, 2)
Expand Down
Loading