Skip to content

Commit 48087c1

Browse files
committed
Updating changes for elastic to extract results
1 parent f33d9bf commit 48087c1

File tree

1 file changed

+80
-59
lines changed
  • metrics-operator/controllers/common/providers/elastic

1 file changed

+80
-59
lines changed

metrics-operator/controllers/common/providers/elastic/elastic.go

Lines changed: 80 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717

1818
const (
1919
warningLogStringElastic = "%s API returned warnings: %s"
20-
defaultTimeRange = 30 * time.Minute
2120
)
2221

2322
type KeptnElasticProvider struct {
@@ -26,14 +25,7 @@ type KeptnElasticProvider struct {
2625
Elastic *elastic.Client
2726
}
2827

29-
type ElasticsearchResponse struct {
30-
Hits struct {
31-
Total struct {
32-
Value int `json:"value"`
33-
} `json:"total"`
34-
} `json:"hits"`
35-
}
36-
28+
// GetElasticClient will create a new elastic client
3729
func GetElasticClient(provider metricsapi.KeptnMetricsProvider) (*elastic.Client, error) {
3830
es, err := elastic.NewClient(elastic.Config{
3931
Addresses: []string{provider.Spec.TargetServer},
@@ -50,31 +42,68 @@ func GetElasticClient(provider metricsapi.KeptnMetricsProvider) (*elastic.Client
5042
return es, nil
5143
}
5244

45+
// FetchAnalysisValue will fetch analysis value depends on query and the metrics provided as input
5346
func (r *KeptnElasticProvider) FetchAnalysisValue(ctx context.Context, query string, analysis metricsapi.Analysis, provider *metricsapi.KeptnMetricsProvider) (string, error) {
47+
// Retrieve the AnalysisDefinition referenced in Analysis
48+
var analysisDef metricsapi.AnalysisDefinition
49+
err := r.K8sClient.Get(ctx, client.ObjectKey{
50+
Name: analysis.Spec.AnalysisDefinition.Name,
51+
Namespace: analysis.Namespace,
52+
}, &analysisDef)
53+
54+
if err != nil {
55+
r.Log.Error(err, "Failed to retrieve AnalysisDefinition")
56+
return "", fmt.Errorf("failed to get AnalysisDefinition: %w", err)
57+
}
58+
59+
// Extract the referenced AnalysisValueTemplate name
60+
if len(analysisDef.Spec.Objectives) == 0 {
61+
return "", fmt.Errorf("no objectives defined in AnalysisDefinition")
62+
}
63+
64+
templateName := analysisDef.Spec.Objectives[0].AnalysisValueTemplateRef.Name
65+
r.Log.Info("Found referenced AnalysisValueTemplate", "templateName", templateName)
66+
// Retrieve the AnalysisValueTemplate using the extracted name
67+
var template metricsapi.AnalysisValueTemplate
68+
err = r.K8sClient.Get(ctx, client.ObjectKey{
69+
Name: templateName,
70+
Namespace: analysis.Namespace,
71+
}, &template)
72+
73+
if err != nil {
74+
r.Log.Error(err, "Failed to retrieve AnalysisValueTemplate")
75+
return "", fmt.Errorf("failed to get AnalysisValueTemplate: %w", err)
76+
}
77+
78+
// Extract metricPath from args
79+
metricPathStr, exists := analysis.Spec.Args["metricPath"]
80+
if !exists || metricPathStr == "" {
81+
return "", fmt.Errorf("metric path is missing in AnalysisValueTemplate annotations")
82+
}
83+
5484
ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
5585
defer cancel()
5686

57-
result, err := r.runElasticQuery(ctx, query, analysis.GetFrom(), analysis.GetTo())
87+
result, err := r.runElasticQuery(ctx, query)
5888
if err != nil {
5989
return "", err
6090
}
6191

62-
r.Log.Info(fmt.Sprintf("Elasticsearch query result: %v", result))
63-
return r.extractMetric(result)
92+
r.Log.Info("Elasticsearch query result", "result", result)
93+
return r.extractMetric(result, metricPathStr)
6494
}
6595

96+
// EvaluateQuery takes query as a input but doesn't return anything
6697
func (r *KeptnElasticProvider) EvaluateQuery(ctx context.Context, metric metricsapi.KeptnMetric, provider metricsapi.KeptnMetricsProvider) (string, []byte, error) {
6798
ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
6899
defer cancel()
69100

70-
timeRange := getTimeRangeFromSpec(metric.Spec.Range)
71-
72-
result, err := r.runElasticQuery(ctx, metric.Spec.Query, time.Now().Add(-timeRange), time.Now())
101+
result, err := r.runElasticQuery(ctx, metric.Spec.Query)
73102
if err != nil {
74103
return "", nil, err
75104
}
76105

77-
metricValue, err := r.extractMetric(result)
106+
metricValue, err := r.extractMetric(result, "")
78107
if err != nil {
79108
return "", nil, err
80109
}
@@ -87,42 +116,12 @@ func (r *KeptnElasticProvider) EvaluateQueryForStep(ctx context.Context, metric
87116
return nil, nil, nil
88117
}
89118

90-
func getTimeRangeFromSpec(rangeSpec *metricsapi.RangeSpec) time.Duration {
91-
if rangeSpec == nil || rangeSpec.Interval == "" {
92-
return defaultTimeRange
93-
}
94-
95-
duration, err := time.ParseDuration(rangeSpec.Interval)
96-
if err != nil {
97-
return defaultTimeRange
98-
}
99-
100-
return duration
101-
}
102-
103-
func (r *KeptnElasticProvider) runElasticQuery(ctx context.Context, query string, from, to time.Time) (map[string]interface{}, error) {
104-
queryBody := fmt.Sprintf(`
105-
{
106-
"query": {
107-
"bool": {
108-
"must": [
109-
%s,
110-
{
111-
"range": {
112-
"@timestamp": {
113-
"gte": "%s",
114-
"lte": "%s"
115-
}
116-
}
117-
}
118-
]
119-
}
120-
}
121-
}`, query, from.Format(time.RFC3339), to.Format(time.RFC3339))
119+
// runElasticQuery runs query on elastic search to get output from elasticsearch
120+
func (r *KeptnElasticProvider) runElasticQuery(ctx context.Context, query string) (map[string]interface{}, error) {
122121

123122
res, err := r.Elastic.Search(
124123
r.Elastic.Search.WithContext(ctx),
125-
r.Elastic.Search.WithBody(strings.NewReader(queryBody)),
124+
r.Elastic.Search.WithBody(strings.NewReader(query)),
126125
)
127126
if err != nil {
128127
return nil, fmt.Errorf("failed to execute Elasticsearch query: %w", err)
@@ -141,17 +140,39 @@ func (r *KeptnElasticProvider) runElasticQuery(ctx context.Context, query string
141140
return result, nil
142141
}
143142

144-
func (r *KeptnElasticProvider) extractMetric(result map[string]interface{}) (string, error) {
145-
var response ElasticsearchResponse
146-
jsonData, err := json.Marshal(result)
147-
if err != nil {
148-
return "", fmt.Errorf("failed to marshal result: %w", err)
143+
// extractMetric will parse the result and return the metrics which we input to the function
144+
func (r *KeptnElasticProvider) extractMetric(result map[string]interface{}, metricPathStr string) (string, error) {
145+
convertedResult := convertResultTOMap(result)
146+
for k, v := range convertedResult {
147+
if strings.Contains(k, metricPathStr) {
148+
return fmt.Sprintf("%f", v), nil
149+
}
149150
}
151+
return "", nil
152+
}
150153

151-
if err := json.Unmarshal(jsonData, &response); err != nil {
152-
return "", fmt.Errorf("failed to unmarshal result into struct: %w", err)
154+
// convertResultTOMap recursively converts map[string]interface{} to map[string]float64
155+
func convertResultTOMap(input map[string]interface{}) map[string]float64 {
156+
output := make(map[string]float64)
157+
for key, value := range input {
158+
switch v := value.(type) {
159+
case float64:
160+
output[key] = v
161+
case float32:
162+
output[key] = float64(v)
163+
case int:
164+
output[key] = float64(v)
165+
case int32:
166+
output[key] = float64(v)
167+
case int64:
168+
output[key] = float64(v)
169+
case map[string]interface{}:
170+
nestedMap := convertResultTOMap(v)
171+
for nestedKey, nestedValue := range nestedMap {
172+
output[key+"."+nestedKey] = nestedValue
173+
}
174+
default:
175+
}
153176
}
154-
155-
value := fmt.Sprintf("%d", response.Hits.Total.Value)
156-
return value, nil
177+
return output
157178
}

0 commit comments

Comments
 (0)