diff --git a/Makefile b/Makefile index 36cb17ab..c65ad4dc 100644 --- a/Makefile +++ b/Makefile @@ -48,9 +48,5 @@ generate-e2e-client: go generate ./e2e/client .PHONY: generate-e2e-client -generate-api-client: - go generate internal/castai/api/generate.go -.PHONY: generate-api-client - deploy-loadtest: release IMAGE_REPOSITORY=$(DOCKER_REPOSITORY) IMAGE_TAG=$(VERSION) ./hack/loadtest/deploy.sh diff --git a/internal/castai/client.go b/internal/castai/client.go index 25ef907e..28cb30ae 100644 --- a/internal/castai/client.go +++ b/internal/castai/client.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "math" "net" "net/http" "time" @@ -198,41 +199,39 @@ func convertPrometheusMetricFamilies(gatherTime time.Time, podName string, metri timeseries := []PrometheusTimeseries{} for _, family := range metricFamilies { for _, metric := range family.Metric { - // Right now we support only export of counter metrics. - if metric.Counter == nil { - continue - } - - timeserie := PrometheusTimeseries{ - Labels: []PrometheusLabel{ - { - Name: "__name__", - Value: family.GetName(), - }, - { - Name: "pod_name", - Value: podName, - }, + commonLabels := []PrometheusLabel{ + { + Name: "pod_name", + Value: podName, }, } + for _, label := range metric.Label { if label.Name == nil { continue } - timeserie.Labels = append(timeserie.Labels, PrometheusLabel{ + commonLabels = append(commonLabels, PrometheusLabel{ Name: *label.Name, Value: lo.FromPtr(label.Value), }) } - timeserie.Samples = []PrometheusSample{} - timeserie.Samples = append(timeserie.Samples, PrometheusSample{ - Timestamp: timestamp, - Value: metric.Counter.GetValue(), - }) + if metric.Counter != nil { + timeseries = append(timeseries, + convertPrometheusCounterMetric(commonLabels, family, metric, timestamp)..., + ) + } - timeseries = append(timeseries, timeserie) + if metric.Histogram != nil { + timeseries = append(timeseries, + convertPrometheusHistogramMetric(commonLabels, family, metric, timestamp)...) + } + + if metric.Summary != nil { + timeseries = append(timeseries, + convertPrometheusSummaryMetric(commonLabels, family, metric, timestamp)...) + } } } @@ -240,3 +239,141 @@ func convertPrometheusMetricFamilies(gatherTime time.Time, podName string, metri Timeseries: timeseries, } } + +func convertPrometheusCounterMetric( + commonLabels []PrometheusLabel, + family *dto.MetricFamily, + metric *dto.Metric, + timestamp int64, +) []PrometheusTimeseries { + return []PrometheusTimeseries{ + { + Labels: copyLabelsWithName(commonLabels, family.GetName()), + Samples: []PrometheusSample{ + { + Timestamp: timestamp, + Value: metric.Counter.GetValue(), + }, + }, + }, + } +} + +func convertPrometheusHistogramMetric( + commonLabels []PrometheusLabel, + family *dto.MetricFamily, + metric *dto.Metric, + timestamp int64, +) []PrometheusTimeseries { + timeseries := make([]PrometheusTimeseries, 0) + h := metric.Histogram + + for _, b := range h.Bucket { + timeseries = append(timeseries, PrometheusTimeseries{ + Labels: copyLabelsWithName(commonLabels, family.GetName()+"_bucket", PrometheusLabel{ + Name: "le", Value: fmt.Sprintf("%f", b.GetUpperBound()), + }), + Samples: []PrometheusSample{ + { + Timestamp: timestamp, + Value: float64(b.GetCumulativeCount()), + }, + }, + }) + } + // We need this +Inf bucket for histogram_quantile query. + timeseries = append(timeseries, PrometheusTimeseries{ + Labels: copyLabelsWithName(commonLabels, family.GetName()+"_bucket", PrometheusLabel{ + Name: "le", Value: "+Inf", + }), + Samples: []PrometheusSample{ + { + Timestamp: timestamp, + Value: float64(h.GetSampleCount()), + }, + }, + }) + + timeseries = append(timeseries, PrometheusTimeseries{ + Labels: copyLabelsWithName(commonLabels, family.GetName()+"_sum"), + Samples: []PrometheusSample{ + { + Timestamp: timestamp, + Value: h.GetSampleSum(), + }, + }, + }) + + timeseries = append(timeseries, PrometheusTimeseries{ + Labels: copyLabelsWithName(commonLabels, family.GetName()+"_count"), + Samples: []PrometheusSample{ + { + Timestamp: timestamp, + Value: float64(h.GetSampleCount()), + }, + }, + }) + + return timeseries +} + +func convertPrometheusSummaryMetric( + commonLabels []PrometheusLabel, + family *dto.MetricFamily, + metric *dto.Metric, + timestamp int64, +) []PrometheusTimeseries { + timeseries := make([]PrometheusTimeseries, 0) + s := metric.Summary + + for _, quantile := range s.Quantile { + if math.IsNaN(quantile.GetValue()) { + continue + } + + timeseries = append(timeseries, PrometheusTimeseries{ + Labels: copyLabelsWithName(commonLabels, family.GetName()+"_quantile", PrometheusLabel{ + Name: "quantile", + Value: fmt.Sprintf("%f", quantile.GetQuantile()), + }), + Samples: []PrometheusSample{ + { + Timestamp: timestamp, + Value: quantile.GetValue(), + }, + }, + }) + } + + timeseries = append(timeseries, PrometheusTimeseries{ + Labels: copyLabelsWithName(commonLabels, family.GetName()+"_sum"), + Samples: []PrometheusSample{ + { + Timestamp: timestamp, + Value: s.GetSampleSum(), + }, + }, + }) + + timeseries = append(timeseries, PrometheusTimeseries{ + Labels: copyLabelsWithName(commonLabels, family.GetName()+"_count"), + Samples: []PrometheusSample{ + { + Timestamp: timestamp, + Value: float64(s.GetSampleCount()), + }, + }, + }) + return timeseries +} + +func copyLabelsWithName(baseLabels []PrometheusLabel, name string, additionalLabels ...PrometheusLabel) []PrometheusLabel { + labels := make([]PrometheusLabel, 0, len(baseLabels)+1) + labels = append(labels, baseLabels...) + labels = append(labels, additionalLabels...) + labels = append(labels, PrometheusLabel{ + Name: "__name__", + Value: name, + }) + return labels +} diff --git a/internal/castai/client_test.go b/internal/castai/client_test.go index 1500d660..524068f1 100644 --- a/internal/castai/client_test.go +++ b/internal/castai/client_test.go @@ -5,6 +5,8 @@ import ( "time" dto "github.com/prometheus/client_model/go" + "github.com/samber/lo" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -58,202 +60,481 @@ q276VYI/vYmMLRI/iE7Qjn9uGEeR1LWpVngE9jSzSdzByvzw3DwO4sL5B+rv7O1T }) } -func TestConvertPrometheusMetricFamilies(t *testing.T) { +func TestConvertPrometheusMetricFamilies_EmptyInput(t *testing.T) { gatherTime := time.Date(2023, 9, 13, 10, 30, 0, 0, time.UTC) - expectedTimestamp := gatherTime.UnixMilli() - - t.Run("empty input", func(t *testing.T) { - r := require.New(t) + r := require.New(t) - result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{}) - - r.NotNil(result) - r.Empty(result.Timeseries) - }) + result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{}) - t.Run("single counter with labels", func(t *testing.T) { - r := require.New(t) + r.NotNil(result) + r.Empty(result.Timeseries) +} - metricName := "test_counter" - counterValue := 42.5 - labelName := "label1" - labelValue := "value1" - - family := &dto.MetricFamily{ - Name: &metricName, - Metric: []*dto.Metric{ - { - Label: []*dto.LabelPair{ - { - Name: &labelName, - Value: &labelValue, - }, - }, - Counter: &dto.Counter{ - Value: &counterValue, +func TestConvertPrometheusMetricFamilies_SingleCounter(t *testing.T) { + gatherTime := time.Date(2023, 9, 13, 10, 30, 0, 0, time.UTC) + expectedTimestamp := gatherTime.UnixMilli() + r := require.New(t) + + metricName := "test_counter" + counterValue := 42.5 + labelName := "label1" + labelValue := "value1" + + family := &dto.MetricFamily{ + Name: &metricName, + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + { + Name: &labelName, + Value: &labelValue, }, }, + Counter: &dto.Counter{ + Value: &counterValue, + }, }, - } + }, + } - result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{family}) + result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{family}) - r.Len(result.Timeseries, 1) - ts := result.Timeseries[0] + r.Len(result.Timeseries, 1) + ts := result.Timeseries[0] - // Verify __name__ and pod_name label is first - r.Len(ts.Labels, 3) - r.Equal("__name__", ts.Labels[0].Name) - r.Equal(metricName, ts.Labels[0].Value) - r.Equal("pod_name", ts.Labels[1].Name) - r.Equal("ctrl_pod", ts.Labels[1].Value) + r.Len(ts.Labels, 3) + assertLabelPresent(t, ts.Labels, "__name__", metricName) + assertLabelPresent(t, ts.Labels, "pod_name", "ctrl_pod") + assertLabelPresent(t, ts.Labels, labelName, labelValue) - // Verify custom label - r.Equal(labelName, ts.Labels[2].Name) - r.Equal(labelValue, ts.Labels[2].Value) + // Verify sample + r.Len(ts.Samples, 1) + r.Equal(expectedTimestamp, ts.Samples[0].Timestamp) + r.Equal(counterValue, ts.Samples[0].Value) +} - // Verify sample - r.Len(ts.Samples, 1) - r.Equal(expectedTimestamp, ts.Samples[0].Timestamp) - r.Equal(counterValue, ts.Samples[0].Value) - }) +func TestConvertPrometheusMetricFamilies_MultipleCounters(t *testing.T) { + gatherTime := time.Date(2023, 9, 13, 10, 30, 0, 0, time.UTC) + r := require.New(t) - t.Run("multiple counters in one family", func(t *testing.T) { - r := require.New(t) + metricName := "test_counter" + counter1Value := 10.0 + counter2Value := 20.0 - metricName := "test_counter" - counter1Value := 10.0 - counter2Value := 20.0 + family := &dto.MetricFamily{ + Name: &metricName, + Metric: []*dto.Metric{ + { + Counter: &dto.Counter{Value: &counter1Value}, + }, + { + Counter: &dto.Counter{Value: &counter2Value}, + }, + }, + } - family := &dto.MetricFamily{ - Name: &metricName, - Metric: []*dto.Metric{ - { - Counter: &dto.Counter{Value: &counter1Value}, - }, - { - Counter: &dto.Counter{Value: &counter2Value}, + result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{family}) + + r.Len(result.Timeseries, 2) + + assertLabelPresent(t, result.Timeseries[0].Labels, "__name__", metricName) + assertLabelPresent(t, result.Timeseries[1].Labels, "__name__", metricName) + + r.Equal(counter1Value, result.Timeseries[0].Samples[0].Value) + r.Equal(counter2Value, result.Timeseries[1].Samples[0].Value) +} + +func TestConvertPrometheusMetricFamilies_MultipleFamilies(t *testing.T) { + gatherTime := time.Date(2023, 9, 13, 10, 30, 0, 0, time.UTC) + r := require.New(t) + + metric1Name := "counter1" + metric2Name := "counter2" + value1 := 1.0 + value2 := 2.0 + + family1 := &dto.MetricFamily{ + Name: &metric1Name, + Metric: []*dto.Metric{ + {Counter: &dto.Counter{Value: &value1}}, + }, + } + + family2 := &dto.MetricFamily{ + Name: &metric2Name, + Metric: []*dto.Metric{ + {Counter: &dto.Counter{Value: &value2}}, + }, + } + + result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{family1, family2}) + + r.Len(result.Timeseries, 2) + + // Verify different metric names + assertLabelPresent(t, result.Timeseries[0].Labels, "__name__", metric1Name) + assertLabelPresent(t, result.Timeseries[1].Labels, "__name__", metric2Name) + + // Verify values + r.Equal(value1, result.Timeseries[0].Samples[0].Value) + r.Equal(value2, result.Timeseries[1].Samples[0].Value) +} + +func TestConvertPrometheusMetricFamilies_LabelEdgeCases(t *testing.T) { + gatherTime := time.Date(2023, 9, 13, 10, 30, 0, 0, time.UTC) + r := require.New(t) + + metricName := "test_counter" + counterValue := 5.0 + validLabelName := "valid_label" + validLabelValue := "valid_value" + emptyLabelName := "empty_label" + emptyLabelValue := "" + + family := &dto.MetricFamily{ + Name: &metricName, + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + { + Name: nil, // Should be skipped + Value: &validLabelValue, + }, + { + Name: &validLabelName, + Value: nil, // Should use empty string + }, + { + Name: &emptyLabelName, + Value: &emptyLabelValue, // Should preserve empty string + }, }, + Counter: &dto.Counter{Value: &counterValue}, }, - } + }, + } - result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{family}) + result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{family}) - r.Len(result.Timeseries, 2) + r.Len(result.Timeseries, 1) + ts := result.Timeseries[0] - // Both should have same __name__ label - r.Equal("__name__", result.Timeseries[0].Labels[0].Name) - r.Equal(metricName, result.Timeseries[0].Labels[0].Value) - r.Equal("__name__", result.Timeseries[1].Labels[0].Name) - r.Equal(metricName, result.Timeseries[1].Labels[0].Value) + // Should have __name__, pod_name + 2 valid labels (nil name skipped, nil value converted to empty) + r.Len(ts.Labels, 4) + assertLabelPresent(t, ts.Labels, "__name__", metricName) + assertLabelPresent(t, ts.Labels, "pod_name", "ctrl_pod") + assertLabelPresent(t, ts.Labels, validLabelName, "") + assertLabelPresent(t, ts.Labels, emptyLabelName, emptyLabelValue) +} - // Different counter values - r.Equal(counter1Value, result.Timeseries[0].Samples[0].Value) - r.Equal(counter2Value, result.Timeseries[1].Samples[0].Value) - }) +func TestConvertPrometheusMetricFamilies_CounterEdgeCases(t *testing.T) { + gatherTime := time.Date(2023, 9, 13, 10, 30, 0, 0, time.UTC) + r := require.New(t) - t.Run("multiple metric families", func(t *testing.T) { - r := require.New(t) + metricName := "test_counter" + + family := &dto.MetricFamily{ + Name: &metricName, + Metric: []*dto.Metric{ + { + Counter: nil, // Should not produce samples + }, + }, + } - metric1Name := "counter1" - metric2Name := "counter2" - value1 := 1.0 - value2 := 2.0 + result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{family}) + + r.Len(result.Timeseries, 0) +} - family1 := &dto.MetricFamily{ - Name: &metric1Name, - Metric: []*dto.Metric{ - {Counter: &dto.Counter{Value: &value1}}, +func TestConvertPrometheusMetricFamilies_Histogram(t *testing.T) { + gatherTime := time.Date(2023, 9, 13, 10, 30, 0, 0, time.UTC) + expectedTimestamp := gatherTime.UnixMilli() + r := require.New(t) + + metricName := "test_histogram" + sampleCount := uint64(100) + sampleSum := 250.5 + + // Create histogram buckets + buckets := []*dto.Bucket{ + { + CumulativeCount: lo.ToPtr(uint64(10)), + UpperBound: lo.ToPtr(1.0), + }, + { + CumulativeCount: lo.ToPtr(uint64(30)), + UpperBound: lo.ToPtr(5.0), + }, + { + CumulativeCount: lo.ToPtr(uint64(80)), + UpperBound: lo.ToPtr(10.0), + }, + { + CumulativeCount: lo.ToPtr(uint64(100)), + UpperBound: lo.ToPtr(25.0), + }, + } + + labelName := "method" + labelValue := "GET" + + family := &dto.MetricFamily{ + Name: &metricName, + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + { + Name: &labelName, + Value: &labelValue, + }, + }, + Histogram: &dto.Histogram{ + SampleCount: &sampleCount, + SampleSum: &sampleSum, + Bucket: buckets, + }, }, + }, + } + + result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{family}) + + // Should have 4 bucket timeseries + 1 +Inf bucket + 1 _sum + 1 _count = 7 total + r.Len(result.Timeseries, 7) + + // Separate timeseries by type + bucketTimeseries := []PrometheusTimeseries{} + var sumTimeseries, countTimeseries *PrometheusTimeseries + + for _, ts := range result.Timeseries { + // Check which type this timeseries is by looking at __name__ label + var metricNameValue string + for _, label := range ts.Labels { + if label.Name == "__name__" { + metricNameValue = label.Value + break + } } - family2 := &dto.MetricFamily{ - Name: &metric2Name, - Metric: []*dto.Metric{ - {Counter: &dto.Counter{Value: &value2}}, - }, + switch metricNameValue { + case metricName + "_bucket": + bucketTimeseries = append(bucketTimeseries, ts) + case metricName + "_sum": + sumTimeseries = &ts + case metricName + "_count": + countTimeseries = &ts + default: + t.Fatalf("Unexpected metric name: %s", metricNameValue) } - result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{family1, family2}) + // Verify common labels are present on all timeseries + assertLabelPresent(t, ts.Labels, "pod_name", "ctrl_pod") + assertLabelPresent(t, ts.Labels, labelName, labelValue) - r.Len(result.Timeseries, 2) + // Verify timestamp and sample structure + r.Len(ts.Samples, 1) + r.Equal(expectedTimestamp, ts.Samples[0].Timestamp) + } + + // Verify bucket timeseries (4 explicit buckets + 1 +Inf bucket) + r.Len(bucketTimeseries, 5) + + // Verify each expected bucket exists + expectedBuckets := map[string]float64{ + "1.000000": 10, + "5.000000": 30, + "10.000000": 80, + "25.000000": 100, + "+Inf": 100, + } + + foundBuckets := make(map[string]bool) + for _, ts := range bucketTimeseries { + assertLabelPresent(t, ts.Labels, "__name__", metricName+"_bucket") + + // Find the 'le' label value + var leValue string + for _, label := range ts.Labels { + if label.Name == "le" { + leValue = label.Value + break + } + } - // Verify different metric names - r.Equal(metric1Name, result.Timeseries[0].Labels[0].Value) - r.Equal(metric2Name, result.Timeseries[1].Labels[0].Value) + r.NotEmpty(leValue, "Bucket timeseries should have 'le' label") - // Verify values - r.Equal(value1, result.Timeseries[0].Samples[0].Value) - r.Equal(value2, result.Timeseries[1].Samples[0].Value) - }) + expectedCount, exists := expectedBuckets[leValue] + r.True(exists, "Unexpected bucket with le=%s", leValue) + r.Equal(expectedCount, ts.Samples[0].Value) + foundBuckets[leValue] = true + } - t.Run("label edge cases", func(t *testing.T) { - r := require.New(t) + // Ensure all expected buckets were found + r.Len(foundBuckets, len(expectedBuckets)) + + // Verify _sum timeseries + r.NotNil(sumTimeseries) + assertLabelPresent(t, sumTimeseries.Labels, "__name__", metricName+"_sum") + r.Equal(sampleSum, sumTimeseries.Samples[0].Value) + + // Verify _sum doesn't have 'le' label + for _, label := range sumTimeseries.Labels { + r.NotEqual("le", label.Name, "_sum should not have 'le' label") + } + + // Verify _count timeseries + r.NotNil(countTimeseries) + assertLabelPresent(t, countTimeseries.Labels, "__name__", metricName+"_count") + r.Equal(float64(sampleCount), countTimeseries.Samples[0].Value) - metricName := "test_counter" - counterValue := 5.0 - validLabelName := "valid_label" - validLabelValue := "valid_value" - emptyLabelValue := "" - - family := &dto.MetricFamily{ - Name: &metricName, - Metric: []*dto.Metric{ - { - Label: []*dto.LabelPair{ - { - Name: nil, // Should be skipped - Value: &validLabelValue, - }, - { - Name: &validLabelName, - Value: nil, // Should use empty string - }, - { - Name: &validLabelName, - Value: &emptyLabelValue, // Should preserve empty string - }, + // Verify _count doesn't have 'le' label + for _, label := range countTimeseries.Labels { + r.NotEqual("le", label.Name, "_count should not have 'le' label") + } +} + +func TestConvertPrometheusMetricFamilies_Summary(t *testing.T) { + gatherTime := time.Date(2023, 9, 13, 10, 30, 0, 0, time.UTC) + expectedTimestamp := gatherTime.UnixMilli() + r := require.New(t) + + metricName := "test_summary" + sampleCount := uint64(50) + sampleSum := 125.75 + + // Create summary quantiles + quantiles := []*dto.Quantile{ + { + Quantile: lo.ToPtr(0.5), + Value: lo.ToPtr(2.5), + }, + { + Quantile: lo.ToPtr(0.9), + Value: lo.ToPtr(8.1), + }, + { + Quantile: lo.ToPtr(0.99), + Value: lo.ToPtr(15.3), + }, + } + + labelName := "handler" + labelValue := "api" + + family := &dto.MetricFamily{ + Name: &metricName, + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + { + Name: &labelName, + Value: &labelValue, }, - Counter: &dto.Counter{Value: &counterValue}, + }, + Summary: &dto.Summary{ + SampleCount: &sampleCount, + SampleSum: &sampleSum, + Quantile: quantiles, }, }, + }, + } + + result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{family}) + + // Should have 3 quantile timeseries + 1 _sum + 1 _count = 5 total + r.Len(result.Timeseries, 5) + + // Separate timeseries by type + quantileTimeseries := []PrometheusTimeseries{} + var sumTimeseries, countTimeseries *PrometheusTimeseries + + for _, ts := range result.Timeseries { + // Check which type this timeseries is by looking at __name__ label + var metricNameValue string + for _, label := range ts.Labels { + if label.Name == "__name__" { + metricNameValue = label.Value + break + } } - result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{family}) + switch metricNameValue { + case metricName + "_quantile": + quantileTimeseries = append(quantileTimeseries, ts) + case metricName + "_sum": + sumTimeseries = &ts + case metricName + "_count": + countTimeseries = &ts + default: + t.Fatalf("Unexpected metric name: %s", metricNameValue) + } + + // Verify common labels are present on all timeseries + assertLabelPresent(t, ts.Labels, "pod_name", "ctrl_pod") + assertLabelPresent(t, ts.Labels, labelName, labelValue) + + // Verify timestamp and sample structure + r.Len(ts.Samples, 1) + r.Equal(expectedTimestamp, ts.Samples[0].Timestamp) + } + + // Verify quantile timeseries (3 quantiles) + r.Len(quantileTimeseries, 3) + + // Verify each expected quantile exists + expectedQuantiles := map[string]float64{ + "0.500000": 2.5, + "0.900000": 8.1, + "0.990000": 15.3, + } + + foundQuantiles := make(map[string]bool) + for _, ts := range quantileTimeseries { + assertLabelPresent(t, ts.Labels, "__name__", metricName+"_quantile") + + // Find the 'quantile' label value + var quantileValue string + for _, label := range ts.Labels { + if label.Name == "quantile" { + quantileValue = label.Value + break + } + } - r.Len(result.Timeseries, 1) - ts := result.Timeseries[0] + r.NotEmpty(quantileValue, "Quantile timeseries should have 'quantile' label") - // Should have __name__, pod_name + 2 valid labels (nil name skipped, nil value converted to empty) - r.Len(ts.Labels, 4) - r.Equal("__name__", ts.Labels[0].Name) - r.Equal(metricName, ts.Labels[0].Value) - r.Equal("pod_name", ts.Labels[1].Name) - r.Equal("ctrl_pod", ts.Labels[1].Value) + expectedValue, exists := expectedQuantiles[quantileValue] + r.True(exists, "Unexpected quantile with quantile=%s", quantileValue) + r.Equal(expectedValue, ts.Samples[0].Value) + foundQuantiles[quantileValue] = true + } - // Verify remaining labels handle nil values correctly - r.Equal(validLabelName, ts.Labels[2].Name) - r.Equal("", ts.Labels[2].Value) // nil value becomes empty string - r.Equal(validLabelName, ts.Labels[3].Name) - r.Equal("", ts.Labels[3].Value) // empty string preserved - }) + // Ensure all expected quantiles were found + r.Len(foundQuantiles, len(expectedQuantiles)) - t.Run("counter edge cases", func(t *testing.T) { - r := require.New(t) + // Verify _sum timeseries + r.NotNil(sumTimeseries) + assertLabelPresent(t, sumTimeseries.Labels, "__name__", metricName+"_sum") + r.Equal(sampleSum, sumTimeseries.Samples[0].Value) - metricName := "test_counter" + // Verify _sum doesn't have 'quantile' label + for _, label := range sumTimeseries.Labels { + r.NotEqual("quantile", label.Name, "_sum should not have 'quantile' label") + } - family := &dto.MetricFamily{ - Name: &metricName, - Metric: []*dto.Metric{ - { - Counter: nil, // Should not produce samples - }, - }, - } + // Verify _count timeseries + r.NotNil(countTimeseries) + assertLabelPresent(t, countTimeseries.Labels, "__name__", metricName+"_count") + r.Equal(float64(sampleCount), countTimeseries.Samples[0].Value) - result := convertPrometheusMetricFamilies(gatherTime, "ctrl_pod", []*dto.MetricFamily{family}) + // Verify _count doesn't have 'quantile' label + for _, label := range countTimeseries.Labels { + r.NotEqual("quantile", label.Name, "_count should not have 'quantile' label") + } +} - r.Len(result.Timeseries, 0) - }) +func assertLabelPresent(t *testing.T, labels []PrometheusLabel, name, value string) { + assert.Contains(t, labels, PrometheusLabel{Name: name, Value: value}) } diff --git a/internal/castai/types.go b/internal/castai/types.go index 44a28be3..c9bc612d 100644 --- a/internal/castai/types.go +++ b/internal/castai/types.go @@ -260,6 +260,9 @@ func (c *ChartSource) Validate() error { return nil } +// PrometheusWriteRequest represents a request to write timeseries data +// to the Components API IngestMetrics endpoint endpoint. +// https://api.cast.ai/v1/spec/#/ComponentsAPI/ComponentsAPI_IngestMetrics type PrometheusWriteRequest struct { Timeseries []PrometheusTimeseries `json:"timeseries"` } diff --git a/internal/config/config.go b/internal/config/config.go index 8fe73a95..b6b5a844 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -56,8 +56,10 @@ type TLS struct { } type Metrics struct { - Port int - ExportEnabled bool + Port int + // ExportEnabled enabled exporting metrics to Cast AI SaaS platform. + ExportEnabled bool + // ExportInterval is the interval at which metrics are exported to Cast AI SaaS platform. ExportInterval time.Duration } diff --git a/internal/controller/controller.go b/internal/controller/controller.go index d282e1e6..5906a508 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -156,12 +156,17 @@ func (s *Controller) handleActions(ctx context.Context, clusterActions []*castai var err error + metrics.ActionStarted(action.GetType()) + startTime := time.Now() + handleErr := s.handleAction(ctx, action) if errors.Is(handleErr, context.Canceled) { // Action should be handled again on context canceled errors. return } - ackErr := s.ackAction(ctx, action, handleErr) + + handleDuration := time.Since(startTime) + ackErr := s.ackAction(ctx, action, handleErr, handleDuration) if handleErr != nil { err = handleErr } @@ -235,7 +240,7 @@ func (s *Controller) handleAction(ctx context.Context, action *castai.ClusterAct return nil } -func (s *Controller) ackAction(ctx context.Context, action *castai.ClusterAction, handleErr error) error { +func (s *Controller) ackAction(ctx context.Context, action *castai.ClusterAction, handleErr error, handleDuration time.Duration) error { actionType := action.GetType() actionError := getHandlerError(handleErr) s.log.WithFields(logrus.Fields{ @@ -244,7 +249,7 @@ func (s *Controller) ackAction(ctx context.Context, action *castai.ClusterAction "successful": actionError == nil, }).Info("ack action") - metrics.ActionFinished(actionType, actionError == nil) + metrics.ActionFinished(actionType, actionError == nil, handleDuration) boff := waitext.NewConstantBackoff(s.cfg.AckRetryWait) diff --git a/internal/controller/metricexporter/metricexporter.go b/internal/controller/metricexporter/metricexporter.go index 54817518..8292ba14 100644 --- a/internal/controller/metricexporter/metricexporter.go +++ b/internal/controller/metricexporter/metricexporter.go @@ -11,31 +11,31 @@ import ( "github.com/castai/cluster-controller/internal/metrics" ) -type MetricSender interface { +type Sender interface { SendMetrics(ctx context.Context, gatherTime time.Time, metricFamilies []*dto.MetricFamily) error } -type MetricGatherer func() ([]*dto.MetricFamily, time.Time, error) +type Gatherer func() ([]*dto.MetricFamily, time.Time, error) func DefaultMetricGatherer() ([]*dto.MetricFamily, time.Time, error) { families, err := metrics.Gather() return families, time.Now(), err } -type MetricsExporter struct { +type Exporter struct { log *logrus.Entry - sender MetricSender - gatherer MetricGatherer + sender Sender + gatherer Gatherer exportInterval time.Duration } func New( log *logrus.Entry, - sender MetricSender, + sender Sender, exportInterval time.Duration, - opts ...func(*MetricsExporter), -) *MetricsExporter { - exp := &MetricsExporter{ + opts ...func(*Exporter), +) *Exporter { + exp := &Exporter{ log: log.WithField("component", "metrics_exporter"), sender: sender, gatherer: DefaultMetricGatherer, @@ -47,8 +47,8 @@ func New( return exp } -func WithMetricGatherer(g MetricGatherer) func(*MetricsExporter) { - return func(me *MetricsExporter) { +func WithMetricGatherer(g Gatherer) func(*Exporter) { + return func(me *Exporter) { if g == nil { return } @@ -56,11 +56,12 @@ func WithMetricGatherer(g MetricGatherer) func(*MetricsExporter) { } } -func (me *MetricsExporter) Run(ctx context.Context) { +func (me *Exporter) Run(ctx context.Context) { t := time.NewTicker(me.exportInterval) defer t.Stop() defer me.log.Info("metrics exporter stopped") me.log.Infof("starting metrics exporter with interval %v", me.exportInterval) + for { select { case <-ctx.Done(): @@ -76,7 +77,7 @@ func (me *MetricsExporter) Run(ctx context.Context) { } } -func (me *MetricsExporter) exportMetrics(ctx context.Context) error { +func (me *Exporter) exportMetrics(ctx context.Context) error { families, gatherTime, err := me.gatherer() if err != nil { return fmt.Errorf("failed to gather metrics: %w", err) diff --git a/internal/metrics/custom_metrics.go b/internal/metrics/custom_metrics.go index d6d6d578..9c041d25 100644 --- a/internal/metrics/custom_metrics.go +++ b/internal/metrics/custom_metrics.go @@ -2,12 +2,22 @@ package metrics import ( "strconv" + "time" "github.com/prometheus/client_golang/prometheus" ) -// actionCounter tracks actions executed by the cluster controller. -var actionCounter = prometheus.NewCounterVec( +// actionStartedCounter tracks actions started by the cluster controller. +var actionStartedCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "action_started_total", + Help: "Count of actions started by type.", + }, + []string{"type"}, +) + +// actionExecutedCounter tracks actions executed by the cluster controller. +var actionExecutedCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "action_executed_total", Help: "Count of successful and unsuccessful actions executed by type.", @@ -15,6 +25,29 @@ var actionCounter = prometheus.NewCounterVec( []string{"success", "type"}, ) -func ActionFinished(actionType string, success bool) { - actionCounter.With(prometheus.Labels{"success": strconv.FormatBool(success), "type": actionType}).Inc() +// actionExecutedDuration tracks the duration of actions executed by the cluster controller. +// Summary is used instead of histogram to use less series. +// We cannot run aggregations on Prometheus side with Summaries, +// but there is only a single active cluster controller pod, +// so this might not be a problem. +var actionExecutedDuration = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Name: "action_executed_duration_seconds", + Help: "Duration of actions executed by type.", + Objectives: map[float64]float64{ + 0.5: 0.05, + 0.9: 0.01, + 0.99: 0.001, + }, + }, + []string{"type"}, +) + +func ActionStarted(actionType string) { + actionStartedCounter.With(prometheus.Labels{"type": actionType}).Inc() +} + +func ActionFinished(actionType string, success bool, duration time.Duration) { + actionExecutedCounter.With(prometheus.Labels{"success": strconv.FormatBool(success), "type": actionType}).Inc() + actionExecutedDuration.With(prometheus.Labels{"type": actionType}).Observe(duration.Seconds()) } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 463cb8c2..a0fd6334 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -26,6 +26,7 @@ func NewMetricsMux() *http.ServeMux { return metricsMux } +// Gather gets all current metric series registered at the Registry. func Gather() ([]*dto.MetricFamily, error) { return registry.Gather() } diff --git a/internal/metrics/register.go b/internal/metrics/register.go index 525d3192..047d0dca 100644 --- a/internal/metrics/register.go +++ b/internal/metrics/register.go @@ -5,5 +5,9 @@ import ( ) func RegisterCustomMetrics() { - registry.MustRegister(actionCounter) + registry.MustRegister( + actionStartedCounter, + actionExecutedCounter, + actionExecutedDuration, + ) }