Skip to content

Commit c5d69e2

Browse files
committed
CR 5 - failure only if all goroutines are failing, log failure on the way
1 parent 6c5b832 commit c5d69e2

File tree

1 file changed

+41
-31
lines changed

1 file changed

+41
-31
lines changed

pkg/autoscaler/metricsclient/prometheus.go

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ func (pc *PrometheusMetricsClient) GetResourceMetrics(resources []scalertypes.Re
115115
defer cancel()
116116
metricToWindowSizes := pc.buildMetricLookup(resources)
117117

118-
// fetch metrics in a goroutine to enable timeout handling via context.
118+
// run getResourceMetrics in a goroutine so we can race it against the context timeout.
119+
// the select below returns whichever completes first: the metrics result or the timeout.
119120
type result struct {
120121
metrics map[string]map[string]int
121122
err error
@@ -140,28 +141,33 @@ func (pc *PrometheusMetricsClient) getResourceMetrics(ctx context.Context, metri
140141
metricsByResource := make(map[string]map[string]int)
141142

142143
resultChan := make(chan *metricResult)
143-
errChan := make(chan error)
144144
wg := sync.WaitGroup{}
145145

146146
for metricName, queryTemplate := range pc.queryTemplates {
147147
windowSizeToResources := metricToWindowSizes[metricName]
148148

149149
for windowSize, resourcesInWindowSize := range windowSizeToResources {
150150
wg.Add(1)
151-
go func(resourcesInWindowSize map[string]struct{}, metricName, windowSize string, resultChan chan<- *metricResult, errChan chan<- error) {
151+
go func(resourcesInWindowSize map[string]struct{}, metricName, windowSize string, resultChan chan<- *metricResult) {
152152
defer wg.Done()
153153
// create resource name regex for Prometheus query based on the resources in this window size
154154
resourceNameRegex := pc.createResourceNameRegex(resourcesInWindowSize)
155155
fullMetricName := scalertypes.GetKubernetesMetricName(metricName, windowSize)
156156
query, err := pc.renderQuery(queryTemplate, windowSize, resourceNameRegex)
157157
if err != nil {
158-
errChan <- errors.Wrapf(err, "failed to render query: metricName=%s, windowSize=%s", metricName, windowSize)
158+
pc.logger.WarnWith("Failed to render query, skipping",
159+
"metricName", metricName,
160+
"windowSize", windowSize,
161+
"error", err.Error())
159162
return
160163
}
161164

162165
rawResult, warnings, err := pc.apiClient.Query(ctx, query, time.Now())
163166
if err != nil {
164-
errChan <- errors.Wrapf(err, "failed to execute Prometheus query: metricName=%s, windowSize=%s", metricName, windowSize)
167+
pc.logger.WarnWith("Failed to execute Prometheus query, skipping",
168+
"metricName", metricName,
169+
"windowSize", windowSize,
170+
"error", err.Error())
165171
return
166172
}
167173

@@ -174,15 +180,20 @@ func (pc *PrometheusMetricsClient) getResourceMetrics(ctx context.Context, metri
174180

175181
metricSamples, ok := rawResult.(model.Vector)
176182
if !ok {
177-
errChan <- errors.Errorf("unexpected Prometheus result type for metricName=%s, windowSize=%s", metricName, windowSize)
183+
pc.logger.WarnWith("Unexpected Prometheus result type, skipping",
184+
"metricName", metricName,
185+
"windowSize", windowSize)
178186
return
179187
}
180188

181189
for _, metricSample := range metricSamples {
182190
resourceName, err := pc.extractResourceName(metricSample.Metric)
183191
if err != nil {
184-
errChan <- errors.Wrapf(err, "failed to extract resource name from the prometheus metric's labels. metricName=%s, windowSize=%s", metricName, windowSize)
185-
return
192+
pc.logger.WarnWith("Failed to extract resource name from prometheus metric labels, skipping",
193+
"metricName", metricName,
194+
"windowSize", windowSize,
195+
"error", err.Error())
196+
continue
186197
}
187198

188199
if _, exists := resourcesInWindowSize[resourceName]; !exists {
@@ -210,53 +221,52 @@ func (pc *PrometheusMetricsClient) getResourceMetrics(ctx context.Context, metri
210221
metricValue: metricValue,
211222
}
212223
}
213-
}(resourcesInWindowSize, metricName, windowSize, resultChan, errChan)
224+
}(resourcesInWindowSize, metricName, windowSize, resultChan)
214225
}
215226
}
216227

217-
var collectedErrors []error
228+
var collectorErr error
218229
collectorDone := make(chan struct{})
219-
// Collect results and errors
220-
go func(resultChan <-chan *metricResult, errChan <-chan error) {
230+
// Collect results
231+
go func(resultChan chan *metricResult) {
221232
defer close(collectorDone)
222-
for resultChan != nil || errChan != nil {
233+
for {
223234
select {
224235
case result, ok := <-resultChan:
225236
if !ok {
226-
resultChan = nil
227-
continue
237+
return
228238
}
229239
if _, exists := metricsByResource[result.resourceName]; !exists {
230240
metricsByResource[result.resourceName] = make(map[string]int)
231241
}
232-
if _, exists := metricsByResource[result.resourceName][result.fullMetricName]; exists {
233-
collectedErrors = append(collectedErrors, errors.Errorf("duplicate metric value for resource: resourceName=%s, metricName=%s", result.resourceName, result.fullMetricName))
234-
continue
242+
if existingValue, exists := metricsByResource[result.resourceName][result.fullMetricName]; exists {
243+
if existingValue == result.metricValue {
244+
continue
245+
}
246+
collectorErr = errors.Errorf("conflicting metric values for resource: resourceName=%s, metricName=%s, existingValue=%d, newValue=%d",
247+
result.resourceName, result.fullMetricName, existingValue, result.metricValue)
248+
return
235249
}
236250
metricsByResource[result.resourceName][result.fullMetricName] = result.metricValue
237251

238-
case err, ok := <-errChan:
239-
if !ok {
240-
errChan = nil
241-
continue
242-
}
243-
if err == nil {
244-
continue
245-
}
246-
collectedErrors = append(collectedErrors, err)
252+
case <-ctx.Done():
253+
return
247254
}
248255
}
249-
}(resultChan, errChan)
256+
}(resultChan)
250257

251258
// wait for all queries to complete
252259
wg.Wait()
253260
close(resultChan)
254-
close(errChan)
255261
// wait for collector to finish processing results
256262
<-collectorDone
257263

258-
if len(collectedErrors) > 0 {
259-
return nil, errors.Errorf("failed to get resource metrics: %v", collectedErrors)
264+
if collectorErr != nil {
265+
return nil, collectorErr
266+
}
267+
268+
if len(metricsByResource) == 0 {
269+
return nil, errors.New("no metrics retrieved for any resource")
260270
}
261271

262272
return metricsByResource, nil

0 commit comments

Comments
 (0)