Skip to content

Commit 2322b61

Browse files
committed
CR 3 - parallel querying and use single context per iteration
1 parent 6568731 commit 2322b61

File tree

3 files changed

+141
-48
lines changed

3 files changed

+141
-48
lines changed

pkg/autoscaler/metricsclient/factory.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ func NewMetricsClient(logger logger.Logger,
4343
logger,
4444
autoScalerConf.MetricsClientOptions.URL,
4545
autoScalerConf.Namespace,
46-
autoScalerConf.MetricsClientOptions.QueryTemplates)
46+
autoScalerConf.MetricsClientOptions.QueryTemplates,
47+
autoScalerConf.ScaleInterval.Duration)
4748
if err != nil {
4849
return nil, errors.Wrap(err, "Failed to create Prometheus metric client")
4950
}

pkg/autoscaler/metricsclient/prometheus.go

Lines changed: 138 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"fmt"
2727
"math"
2828
"strings"
29+
"sync"
2930
"text/template"
3031
"time"
3132

@@ -50,14 +51,22 @@ type windowSizeLookup map[string]map[string]struct{}
5051
// metricLookup maps metricName → windowSizeLookup
5152
type metricLookup map[string]windowSizeLookup
5253

54+
// metricResult holds the result of a single metric query for a resource
55+
type metricResult struct {
56+
resourceName string
57+
fullMetricName string
58+
metricValue int
59+
}
60+
5361
type PrometheusMetricsClient struct {
5462
logger logger.Logger
5563
apiClient prometheusv1.API
5664
namespace string
5765
queryTemplates map[string]*template.Template
66+
interval time.Duration
5867
}
5968

60-
func NewPrometheusClient(parentLogger logger.Logger, prometheusURL, namespace string, templates []scalertypes.QueryTemplate) (*PrometheusMetricsClient, error) {
69+
func NewPrometheusClient(parentLogger logger.Logger, prometheusURL, namespace string, templates []scalertypes.QueryTemplate, interval time.Duration) (*PrometheusMetricsClient, error) {
6170
if len(templates) == 0 {
6271
return nil, errors.New("query template cannot be empty")
6372
}
@@ -94,78 +103,160 @@ func NewPrometheusClient(parentLogger logger.Logger, prometheusURL, namespace st
94103
apiClient: prometheusv1.NewAPI(client),
95104
namespace: namespace,
96105
queryTemplates: queryTemplates,
106+
interval: interval,
97107
}, nil
98108
}
99109

100110
// GetResourceMetrics retrieves metrics for multiple resources
101111
func (pc *PrometheusMetricsClient) GetResourceMetrics(resources []scalertypes.Resource) (map[string]map[string]int, error) {
102-
metricsByResource := make(map[string]map[string]int)
112+
ctx, cancel := context.WithTimeout(context.Background(), pc.interval)
113+
defer cancel()
103114
metricToWindowSizes := pc.buildMetricLookup(resources)
104115

105-
for metricName, queryTemplate := range pc.queryTemplates {
106-
windowSizeToResources := metricToWindowSizes[metricName]
116+
// fetch metrics in a goroutine to enable timeout handling via context.
117+
type result struct {
118+
metrics map[string]map[string]int
119+
err error
120+
}
121+
resultCh := make(chan result, 1)
122+
defer close(resultCh)
123+
124+
go func() {
125+
metrics, err := pc.getResourceMetrics(ctx, metricToWindowSizes)
126+
resultCh <- result{metrics: metrics, err: err}
127+
}()
128+
129+
select {
130+
case <-ctx.Done():
131+
return nil, errors.Wrap(ctx.Err(), "timeout waiting for resource metrics")
132+
case res := <-resultCh:
133+
return res.metrics, res.err
134+
}
135+
}
107136

108-
for windowSize, resourcesInWindowSize := range windowSizeToResources {
109-
// create resource name regex for Prometheus query based on the resources in this window size
110-
resourceNameRegex := pc.createResourceNameRegex(resourcesInWindowSize)
111-
fullMetricName := scalertypes.GetKubernetesMetricName(metricName, windowSize)
112-
query, err := pc.renderQuery(queryTemplate, windowSize, resourceNameRegex)
113-
if err != nil {
114-
return nil, errors.Wrapf(err, "failed to render query for metricName=%s, windowSize=%s", metricName, windowSize)
115-
}
137+
func (pc *PrometheusMetricsClient) getResourceMetrics(ctx context.Context, metricToWindowSizes metricLookup) (map[string]map[string]int, error) {
138+
metricsByResource := make(map[string]map[string]int)
116139

117-
rawResult, warnings, err := pc.apiClient.Query(context.Background(), query, time.Now())
118-
if err != nil {
119-
return nil, errors.Wrapf(err, "failed to execute Prometheus query for metricName=%s, windowSize=%s", metricName, windowSize)
120-
}
140+
resultChan := make(chan *metricResult)
141+
errChan := make(chan error)
142+
wg := sync.WaitGroup{}
121143

122-
if len(warnings) > 0 {
123-
pc.logger.WarnWith("Prometheus query returned warnings",
124-
"metricName", metricName,
125-
"windowSize", windowSize,
126-
"warnings", warnings)
127-
}
144+
for metricName, queryTemplate := range pc.queryTemplates {
145+
windowSizeToResources := metricToWindowSizes[metricName]
128146

129-
metricSamples, ok := rawResult.(model.Vector)
130-
if !ok {
131-
return nil, errors.Wrapf(err, "unexpected Prometheus result type for metricName=%s, windowSize=%s", metricName, windowSize)
132-
}
147+
for windowSize, resourcesInWindowSize := range windowSizeToResources {
148+
wg.Add(1)
149+
go func(resourcesInWindowSize map[string]struct{}, metricName, windowSize string, resultChan chan<- *metricResult, errChan chan<- error) {
150+
defer wg.Done()
151+
// create resource name regex for Prometheus query based on the resources in this window size
152+
resourceNameRegex := pc.createResourceNameRegex(resourcesInWindowSize)
153+
fullMetricName := scalertypes.GetKubernetesMetricName(metricName, windowSize)
154+
query, err := pc.renderQuery(queryTemplate, windowSize, resourceNameRegex)
155+
if err != nil {
156+
errChan <- errors.Wrapf(err, "failed to render query: metricName=%s, windowSize=%s", metricName, windowSize)
157+
return
158+
}
133159

134-
for _, metricSample := range metricSamples {
135-
resourceName, err := pc.extractResourceName(metricSample.Metric)
160+
rawResult, warnings, err := pc.apiClient.Query(ctx, query, time.Now())
136161
if err != nil {
137-
return nil, errors.Wrapf(err, "failed to extract resource name from the prometheus metric's labels. metricName=%s, windowSize=%s", metricName, windowSize)
162+
errChan <- errors.Wrapf(err, "failed to execute Prometheus query: metricName=%s, windowSize=%s", metricName, windowSize)
163+
return
138164
}
139165

140-
if _, exists := resourcesInWindowSize[resourceName]; !exists {
141-
pc.logger.DebugWith("Received metric for unconfigured resource, skipping",
142-
"resourceName", resourceName,
166+
if len(warnings) > 0 {
167+
pc.logger.WarnWith("Prometheus query returned warnings",
143168
"metricName", metricName,
144-
"windowSize", windowSize)
145-
continue
169+
"windowSize", windowSize,
170+
"warnings", warnings)
146171
}
147172

148-
// Round up values to ensure any fractional value > 0 becomes at least 1
149-
// This prevents incorrect scale-to-zero decisions for resources with low activity
150-
metricValue := int(math.Ceil(float64(metricSample.Value)))
173+
metricSamples, ok := rawResult.(model.Vector)
174+
if !ok {
175+
errChan <- errors.Errorf("unexpected Prometheus result type for metricName=%s, windowSize=%s", metricName, windowSize)
176+
return
177+
}
151178

152-
if _, exists := metricsByResource[resourceName]; !exists {
153-
metricsByResource[resourceName] = make(map[string]int)
179+
for _, metricSample := range metricSamples {
180+
resourceName, err := pc.extractResourceName(metricSample.Metric)
181+
if err != nil {
182+
errChan <- errors.Wrapf(err, "failed to extract resource name from the prometheus metric's labels. metricName=%s, windowSize=%s", metricName, windowSize)
183+
return
184+
}
185+
186+
if _, exists := resourcesInWindowSize[resourceName]; !exists {
187+
pc.logger.DebugWith("Received metric for unconfigured resource, skipping",
188+
"resourceName", resourceName,
189+
"metricName", metricName,
190+
"windowSize", windowSize)
191+
continue
192+
}
193+
194+
// Round up values to ensure any fractional value > 0 becomes at least 1
195+
// This prevents incorrect scale-to-zero decisions for resources with low activity
196+
metricValue := int(math.Ceil(float64(metricSample.Value)))
197+
198+
pc.logger.DebugWith("Retrieved metric",
199+
"resourceName", resourceName,
200+
"metricName", fullMetricName,
201+
"windowSize", windowSize,
202+
"value", metricValue)
203+
204+
// finished processing this metric sample, send the result
205+
resultChan <- &metricResult{
206+
resourceName: resourceName,
207+
fullMetricName: fullMetricName,
208+
metricValue: metricValue,
209+
}
154210
}
211+
}(resourcesInWindowSize, metricName, windowSize, resultChan, errChan)
212+
}
213+
}
155214

156-
pc.logger.DebugWith("Retrieved metric",
157-
"resourceName", resourceName,
158-
"metricName", fullMetricName,
159-
"windowSize", windowSize,
160-
"value", metricValue)
215+
var collectedErrors []error
216+
collectorDone := make(chan struct{})
217+
// Collect results and errors
218+
go func(resultChan <-chan *metricResult, errChan <-chan error) {
219+
defer close(collectorDone)
220+
for resultChan != nil || errChan != nil {
221+
select {
222+
case result, ok := <-resultChan:
223+
if !ok {
224+
resultChan = nil
225+
continue
226+
}
227+
if _, exists := metricsByResource[result.resourceName]; !exists {
228+
metricsByResource[result.resourceName] = make(map[string]int)
229+
}
230+
if _, exists := metricsByResource[result.resourceName][result.fullMetricName]; exists {
231+
collectedErrors = append(collectedErrors, errors.Errorf("duplicate metric value for resource: resourceName=%s, metricName=%s", result.resourceName, result.fullMetricName))
232+
continue
233+
}
234+
metricsByResource[result.resourceName][result.fullMetricName] = result.metricValue
161235

162-
if _, exists := metricsByResource[resourceName][fullMetricName]; exists {
163-
return nil, errors.Errorf("Cannot have more than one metricSample value per resource: resource=%s, metricSample=%s", resourceName, fullMetricName)
236+
case err, ok := <-errChan:
237+
if !ok {
238+
errChan = nil
239+
continue
164240
}
165-
metricsByResource[resourceName][fullMetricName] = metricValue
241+
if err == nil {
242+
continue
243+
}
244+
collectedErrors = append(collectedErrors, err)
166245
}
167246
}
247+
}(resultChan, errChan)
248+
249+
// wait for all queries to complete
250+
wg.Wait()
251+
close(resultChan)
252+
close(errChan)
253+
// wait for collector to finish processing results
254+
<-collectorDone
255+
256+
if len(collectedErrors) > 0 {
257+
return nil, errors.Errorf("failed to get resource metrics: %v", collectedErrors)
168258
}
259+
169260
return metricsByResource, nil
170261
}
171262

pkg/autoscaler/metricsclient/prometheus_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ func (suite *PrometheusClientTestSuite) TestGetResourceMetrics() {
241241
Template: testQueryTemplateWithResources,
242242
},
243243
},
244+
10*time.Second,
244245
)
245246
suite.Require().NoError(err)
246247

0 commit comments

Comments
 (0)