4
4
"context"
5
5
"errors"
6
6
"fmt"
7
- "strconv"
8
7
"strings"
9
8
"time"
10
9
@@ -18,7 +17,6 @@ import (
18
17
)
19
18
20
19
const (
21
- defaultTimezone = "UTC"
22
20
multiMetricsQueryPrefix = "query."
23
21
)
24
22
@@ -30,23 +28,23 @@ type sumologicScaler struct {
30
28
}
31
29
32
30
type sumologicMetadata struct {
33
- accessID string `keda:"name=access_id , order=authParams"`
34
- accessKey string `keda:"name=access_key , order=authParams"`
35
- host string `keda:"name=host, order=triggerMetadata"`
36
- unsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional"`
37
- queryType string `keda:"name=queryType, order=triggerMetadata"`
38
- query string `keda:"name=query, order=triggerMetadata"`
39
- queries map [string ]string `keda:"name=query.*, order=triggerMetadata"` // Only for metrics queries
40
- resultQueryRowID string `keda:"name=resultQueryRowID, order=triggerMetadata"` // Only for metrics queries
41
- quantization time.Duration `keda:"name=quantization, order=triggerMetadata"` // Only for metrics queries
42
- rollup string `keda:"name=rollup, order=triggerMetadata, optional"` // Only for metrics queries
43
- resultField string `keda:"name=resultField, order=triggerMetadata"` // Only for logs queries
44
- timerange time.Duration `keda:"name=timerange, order=triggerMetadata"`
45
- timezone string `keda:"name=timezone, order=triggerMetadata, optional "`
46
- activationThreshold float64 `keda:"name=activationThreshold, order=triggerMetadata, default=0"`
47
- queryAggregator string `keda:"name=queryAggregator, order=triggerMetadata, optional "`
48
- threshold float64 `keda:"name=threshold, order=triggerMetadata"`
49
- triggerIndex int
31
+ AccessID string `keda:"name=accessID , order=authParams" mapstructure:"accessID" validate:"required "`
32
+ AccessKey string `keda:"name=accessKey , order=authParams" mapstructure:"accessKey" validate:"required "`
33
+ Host string `keda:"name=host, order=triggerMetadata" mapstructure:"host" validate:"required "`
34
+ UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional" mapstructure:"unsafeSsl "`
35
+ QueryType string `keda:"name=queryType, order=triggerMetadata, enum=logs;metrics" mapstructure:"queryType" validate:"required,oneof=logs metrics "`
36
+ Query string `keda:"name=query, order=triggerMetadata, optional" mapstructure:"query "`
37
+ Queries map [string ]string `keda:"name=query.*, order=triggerMetadata, optional" mapstructure:"queries"` // Only for metrics queries
38
+ ResultQueryRowID string `keda:"name=resultQueryRowID, order=triggerMetadata, optional" mapstructure:"resultQueryRowID"` // Only for metrics queries
39
+ Quantization time.Duration `keda:"name=quantization, order=triggerMetadata, optional" mapstructure:"quantization"` // Only for metrics queries
40
+ Rollup string `keda:"name=rollup, order=triggerMetadata, enum=Avg;Sum;Count;Min;Max, default=Avg" mapstructure:"rollup"` // Only for metrics queries
41
+ ResultField string `keda:"name=resultField, order=triggerMetadata, optional" mapstructure:"resultField"` // Only for logs queries
42
+ Timerange time.Duration `keda:"name=timerange, order=triggerMetadata" mapstructure:"timerange" validate:"required "`
43
+ Timezone string `keda:"name=timezone, order=triggerMetadata, default=UTC" mapstructure:"timezone "`
44
+ ActivationThreshold float64 `keda:"name=activationThreshold, order=triggerMetadata, default=0" mapstructure:"activationThreshold "`
45
+ QueryAggregator string `keda:"name=queryAggregator, order=triggerMetadata, enum=Latest;Avg;Sum;Count;Min;Max, default=Avg" mapstructure:"queryAggregator "`
46
+ Threshold float64 `keda:"name=threshold, order=triggerMetadata" mapstructure:"threshold" validate:"required "`
47
+ TriggerIndex int
50
48
}
51
49
52
50
func NewSumologicScaler (config * scalersconfig.ScalerConfig ) (Scaler , error ) {
@@ -57,10 +55,10 @@ func NewSumologicScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
57
55
}
58
56
59
57
client , err := sumologic .NewClient (& sumologic.Config {
60
- Host : meta .host ,
61
- AccessID : meta .accessID ,
62
- AccessKey : meta .accessKey ,
63
- UnsafeSsl : meta .unsafeSsl ,
58
+ Host : meta .Host ,
59
+ AccessID : meta .AccessID ,
60
+ AccessKey : meta .AccessKey ,
61
+ UnsafeSsl : meta .UnsafeSsl ,
64
62
}, config )
65
63
if err != nil {
66
64
return nil , fmt .Errorf ("failed to create sumologic client: %w" , err )
@@ -81,143 +79,53 @@ func NewSumologicScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
81
79
82
80
func parseSumoMetadata (config * scalersconfig.ScalerConfig ) (* sumologicMetadata , error ) {
83
81
meta := sumologicMetadata {}
84
- meta .triggerIndex = config .TriggerIndex
85
-
86
- if err := validateAuthParams (config .AuthParams ); err != nil {
87
- return nil , err
88
- }
89
- meta .accessID = config .AuthParams ["accessID" ]
90
- meta .accessKey = config .AuthParams ["accessKey" ]
91
-
92
- if err := validateMetadata (config .TriggerMetadata ); err != nil {
93
- return nil , err
82
+ if err := config .TypedConfig (& meta ); err != nil {
83
+ return nil , fmt .Errorf ("error decoding metadata: %w" , err )
94
84
}
95
- meta .host = config .TriggerMetadata ["host" ]
96
- meta .queryType = config .TriggerMetadata ["queryType" ]
85
+ meta .TriggerIndex = config .TriggerIndex
97
86
98
- query := config .TriggerMetadata ["query" ]
99
87
queries , err := parseMultiMetricsQueries (config .TriggerMetadata )
100
88
if err != nil {
101
89
return nil , err
102
90
}
91
+ meta .Queries = queries
103
92
104
- if meta .queryType == "logs" {
105
- if query == "" {
93
+ if meta .QueryType == "logs" {
94
+ if meta . Query == "" {
106
95
return nil , errors .New ("missing required metadata: query" )
107
96
}
108
- if len (queries ) != 0 {
97
+ if len (meta . Queries ) != 0 {
109
98
return nil , errors .New ("invalid metadata, query.<RowId> not supported for logs queryType" )
110
99
}
111
- meta .query = query
112
-
113
- if resultField , exists := config .TriggerMetadata ["resultField" ]; ! exists || resultField == "" {
114
- return nil , fmt .Errorf ("resultField is required when queryType is 'logs'" )
100
+ if meta .ResultField == "" {
101
+ return nil , errors .New ("missing required metadata: resultField (required for logs queryType)" )
115
102
}
116
- meta .resultField = config .TriggerMetadata ["resultField" ]
117
103
}
118
104
119
- if meta .queryType == "metrics" {
120
- switch {
121
- case query == "" && len (queries ) == 0 :
122
- return nil , errors .New ("missing metadata, please define either of query or query.<RowId> for metrics queryType" )
123
- case query != "" && len (queries ) != 0 :
124
- return nil , errors .New ("incorrect metadata, please only define either query or query.<RowId> for metrics queryType, not both" )
125
- case query != "" :
126
- meta .query = query
127
- default :
128
- meta .queries = queries
129
- if config .TriggerMetadata ["resultQueryRowID" ] == "" {
130
- return nil , errors .New ("missing required metadata: resultQueryRowID" )
131
- }
132
- meta .resultQueryRowID = config .TriggerMetadata ["resultQueryRowID" ]
105
+ if meta .QueryType == "metrics" {
106
+ if meta .Query == "" && len (meta .Queries ) == 0 {
107
+ return nil , errors .New ("missing metadata: either of query or query.<RowId> must be defined for metrics queryType" )
133
108
}
134
-
135
- if config .TriggerMetadata ["quantization" ] == "" {
136
- return nil , errors .New ("missing required metadata: quantization for metrics queryType" )
109
+ if meta .Query != "" && len (meta .Queries ) != 0 {
110
+ return nil , errors .New ("invalid metadata: only one of query or query.<RowId> must be defined for metrics queryType" )
137
111
}
138
- quantization , err := strconv .Atoi (config .TriggerMetadata ["quantization" ])
139
- if err != nil {
140
- return nil , fmt .Errorf ("invalid metadata, quantization: %w" , err )
141
- }
142
- meta .quantization = time .Duration (quantization ) * time .Second
143
-
144
- if rollup , exists := config .TriggerMetadata ["rollup" ]; exists {
145
- if err := sumologic .IsValidRollupType (rollup ); err != nil {
146
- return nil , err
112
+ if len (meta .Queries ) > 0 {
113
+ if meta .ResultQueryRowID == "" {
114
+ return nil , errors .New ("missing required metadata: resultQueryRowID for multi-metrics query" )
115
+ }
116
+ if _ , ok := meta .Queries [meta .ResultQueryRowID ]; ! ok {
117
+ return nil , fmt .Errorf ("resultQueryRowID '%s' not found in queries" , meta .ResultQueryRowID )
147
118
}
148
- meta .rollup = rollup
149
- } else {
150
- meta .rollup = sumologic .DefaultRollup
151
- }
152
- }
153
-
154
- if config .TriggerMetadata ["timerange" ] == "" {
155
- return nil , errors .New ("missing required metadata: timerange" )
156
- }
157
- timerange , err := strconv .Atoi (config .TriggerMetadata ["timerange" ])
158
- if err != nil {
159
- return nil , fmt .Errorf ("invalid timerange: %w" , err )
160
- }
161
- meta .timerange = time .Duration (timerange )
162
-
163
- if config .TriggerMetadata ["timezone" ] == "" {
164
- meta .timezone = defaultTimezone // Default to UTC if not provided
165
- } else {
166
- meta .timezone = config .TriggerMetadata ["timezone" ]
167
- }
168
-
169
- meta .activationThreshold = 0
170
- if val , ok := config .TriggerMetadata ["activationThreshold" ]; ok {
171
- activationThreshold , err := strconv .ParseFloat (val , 64 )
172
- if err != nil {
173
- return nil , fmt .Errorf ("activationThreshold parsing error: %w" , err )
174
- }
175
- meta .activationThreshold = activationThreshold
176
- }
177
-
178
- if queryAggregator , ok := config .TriggerMetadata ["queryAggregator" ]; ok && queryAggregator != "" {
179
- if err := sumologic .IsValidQueryAggregation (queryAggregator ); err != nil {
180
- return nil , err
181
119
}
182
- meta .queryAggregator = queryAggregator
183
- } else {
184
- meta .queryAggregator = sumologic .DefaultQueryAggregator
185
- }
186
120
187
- if val , ok := config .TriggerMetadata ["threshold" ]; ok {
188
- threshold , err := strconv .ParseFloat (val , 64 )
189
- if err != nil {
190
- return nil , fmt .Errorf ("threshold parsing error: %w" , err )
121
+ if meta .Quantization == 0 {
122
+ return nil , errors .New ("missing required metadata: quantization for metrics queryType" )
191
123
}
192
- meta .threshold = threshold
193
124
}
194
125
195
126
return & meta , nil
196
127
}
197
128
198
- func validateAuthParams (authParams map [string ]string ) error {
199
- if authParams ["accessID" ] == "" {
200
- return errors .New ("missing required auth params: accessID" )
201
- }
202
- if authParams ["accessKey" ] == "" {
203
- return errors .New ("missing required auth params: accessKey" )
204
- }
205
- return nil
206
- }
207
-
208
- func validateMetadata (metadata map [string ]string ) error {
209
- if metadata ["host" ] == "" {
210
- return errors .New ("missing required metadata: host" )
211
- }
212
- if metadata ["queryType" ] == "" {
213
- return errors .New ("missing required metadata: queryType" )
214
- }
215
- if metadata ["queryType" ] != "logs" && metadata ["queryType" ] != "metrics" {
216
- return fmt .Errorf ("invalid queryType: %s, must be 'logs' or 'metrics'" , metadata ["queryType" ])
217
- }
218
- return nil
219
- }
220
-
221
129
func parseMultiMetricsQueries (triggerMetadata map [string ]string ) (map [string ]string , error ) {
222
130
queries := make (map [string ]string )
223
131
for key , value := range triggerMetadata {
@@ -236,12 +144,12 @@ func parseMultiMetricsQueries(triggerMetadata map[string]string) (map[string]str
236
144
}
237
145
238
146
func (s * sumologicScaler ) GetMetricSpecForScaling (_ context.Context ) []v2.MetricSpec {
239
- metricName := kedautil .NormalizeString (fmt .Sprintf ("sumologic-%s" , s .metadata .queryType ))
147
+ metricName := kedautil .NormalizeString (fmt .Sprintf ("sumologic-%s" , s .metadata .QueryType ))
240
148
externalMetric := & v2.ExternalMetricSource {
241
149
Metric : v2.MetricIdentifier {
242
- Name : GenerateMetricNameWithIndex (s .metadata .triggerIndex , metricName ),
150
+ Name : GenerateMetricNameWithIndex (s .metadata .TriggerIndex , metricName ),
243
151
},
244
- Target : GetMetricTargetMili (s .metricType , s .metadata .threshold ),
152
+ Target : GetMetricTargetMili (s .metricType , s .metadata .Threshold ),
245
153
}
246
154
return []v2.MetricSpec {{
247
155
External : externalMetric ,
@@ -258,29 +166,29 @@ func (s *sumologicScaler) Close(_ context.Context) error {
258
166
}
259
167
260
168
func (s * sumologicScaler ) GetMetricsAndActivity (_ context.Context , metricName string ) ([]external_metrics.ExternalMetricValue , bool , error ) {
261
- var metric external_metrics.ExternalMetricValue
262
- var num float64
263
- var err error
264
-
265
- num , err = s .client .GetQueryResult (
266
- s .metadata .queryType ,
267
- s .metadata .query ,
268
- s .metadata .queries ,
269
- s .metadata .resultQueryRowID ,
270
- s .metadata .quantization ,
271
- s .metadata .rollup ,
272
- s .metadata .resultField ,
273
- s .metadata .timerange ,
274
- s .metadata .timezone ,
275
- s .metadata .queryAggregator ,
276
- )
169
+ num , err := s .executeQuery ()
277
170
if err != nil {
278
171
s .logger .Error (err , "error getting metrics from sumologic" )
279
172
return []external_metrics.ExternalMetricValue {}, false , fmt .Errorf ("error getting metrics from sumologic: %w" , err )
280
173
}
281
174
282
- metric = GenerateMetricInMili (metricName , num )
283
- isActive := num > s .metadata .activationThreshold
175
+ metric : = GenerateMetricInMili (metricName , num )
176
+ isActive := num > s .metadata .ActivationThreshold
284
177
285
178
return []external_metrics.ExternalMetricValue {metric }, isActive , nil
286
179
}
180
+
181
+ func (s * sumologicScaler ) executeQuery () (float64 , error ) {
182
+ return s .client .GetQueryResult (
183
+ s .metadata .QueryType ,
184
+ s .metadata .Query ,
185
+ s .metadata .Queries ,
186
+ s .metadata .ResultQueryRowID ,
187
+ s .metadata .Quantization ,
188
+ s .metadata .Rollup ,
189
+ s .metadata .ResultField ,
190
+ s .metadata .Timerange ,
191
+ s .metadata .Timezone ,
192
+ s .metadata .QueryAggregator ,
193
+ )
194
+ }
0 commit comments