[AutoScaler] implement prometheus metrics client#84
[AutoScaler] implement prometheus metrics client#84TomerShor merged 17 commits intov3io:developmentfrom
Conversation
cf7e195 to
7f0f482
Compare
7f0f482 to
6482f50
Compare
bd6ad0a to
3944b48
Compare
2f52fc9 to
1b9b74b
Compare
| "github.com/prometheus/common/model" | ||
| ) | ||
|
|
||
| type PrometheusClient struct { |
There was a problem hiding this comment.
to align with K8sCustomMetricsClient:
| type PrometheusClient struct { | |
| type PrometheusMetricsClient struct { |
There was a problem hiding this comment.
Fixed here - CR 1 - namings, consts and handler functions
| continue | ||
| } | ||
|
|
||
| resourceNameRegex := pc.buildResourceNameRegex(resources) |
There was a problem hiding this comment.
no need to run it on every iteration
There was a problem hiding this comment.
Fixed here - CR 1 - namings, consts and handler functions
| "error", err) | ||
| continue | ||
| } | ||
|
|
There was a problem hiding this comment.
This is not very good in terms of O complexity.
resourceNameRegex := pc.buildResourceNameRegex(resources) iterates over resources on each iteration of queryTemplates without no reason.
Then, in windowSizes loop it does all the same as extractWindowSizesForMetric does again but even with worse complexity. And on every iteration we iterate over everything. Iterate over this list once and put the data into a map, where key is a metrics name, this way we can avoid unnecessary iterations.
There was a problem hiding this comment.
Fixed here - CR 2 - one iteration over resources
|
|
||
| func NewPrometheusClient(parentLogger logger.Logger, prometheusURL, namespace string, templates []scalertypes.QueryTemplate) (*PrometheusClient, error) { | ||
| if len(templates) == 0 { | ||
| return nil, errors.New("Failed to created Prometheus client: query template cannot be empty") |
There was a problem hiding this comment.
and it all the places below
| return nil, errors.New("Failed to created Prometheus client: query template cannot be empty") | |
| return nil, errors.New("Failed to create Prometheus client: query template cannot be empty") |
There was a problem hiding this comment.
Fixed here - CR 1 - namings, consts and handler functions
| labelNames := []model.LabelName{ | ||
| "function", // For nuclio functions (nuclio_processor_handled_events_total) - maps to nucliofunction resource | ||
| "service_name", // For deployments (num_of_requests, jupyter_kernel_busyness) - maps to deployment resource | ||
| "pod", // For pod-based metrics (DCGM_FI_DEV_GPU_UTIL) - maps to pod resource |
There was a problem hiding this comment.
Fixed here - CR 1 - namings, consts and handler functions
| return nil, errors.Wrapf(err, "Failed to render query for metricName=%s, windowSize=%s", metricName, windowSize) | ||
| } | ||
|
|
||
| rawResult, warnings, err := pc.apiClient.Query(context.Background(), query, time.Now()) |
There was a problem hiding this comment.
retry if failed?
Also, I'm not sure about recreating context.Background() every time, especially that it's without timeout (so no actual purpose).
I suggest creating one ctx object per whole func and using semaphore to send requests to prometheus.
There was a problem hiding this comment.
I fixed the context per iteration here - CR 3 - parallel querying and use single context per iteration
Regarding retries, I don’t think they’re needed at this level. If a request fails, it will be tried again in the next iteration anyway. The existing Kubernetes custom-metrics client also doesn’t use retries, so adding them here would make this implementation behave differently. WDYT?
There was a problem hiding this comment.
@weilerN what do you mean by:
adding them here would make this implementation behave differently
?
how often do we usually run? I think this flow is relatively critical as it scales things that might use GPUs and which affects the final cost of used resources. So if smth is wrong with network temporarily, I think it's better to retry now than later, isn't it?
There was a problem hiding this comment.
It run by default every 1m link.
I agree that adding a retry mechanism makes sense. That said, I think it’s out of scope for this CR, so I suggest we track it separately by opening a tech-debt ticket to address retries in a more comprehensive way.
ac8f30b to
549c27e
Compare
| for resultChan != nil || errChan != nil { | ||
| select { | ||
| case result, ok := <-resultChan: | ||
| if !ok { | ||
| resultChan = nil | ||
| continue | ||
| } |
There was a problem hiding this comment.
Isn't it the same as:
| for resultChan != nil || errChan != nil { | |
| select { | |
| case result, ok := <-resultChan: | |
| if !ok { | |
| resultChan = nil | |
| continue | |
| } | |
| for { | |
| select { | |
| case result, ok := <-resultChan: | |
| if !ok { | |
| return | |
| } |
There was a problem hiding this comment.
It's not the same, because in your case the goroutine will exist on the first channel that will be closed, regardless of the second channel and I want that it will exist only after the 2 channels will be closed.
I agree that for { is better than for resultChan != nil || errChan != nil { so I will change it and add comments.
There was a problem hiding this comment.
| case err, ok := <-errChan: | ||
| if !ok { | ||
| errChan = nil | ||
| continue | ||
| } |
There was a problem hiding this comment.
There was a problem hiding this comment.
Pull request overview
This PR introduces a Prometheus-based metrics client implementation to support scaling decisions based on Prometheus metrics, refactoring the metrics collection architecture to be more flexible.
Changes:
- Added a new
PrometheusMetricsClientthat implements theMetricsClientinterface with support for query templates, window size filtering, and automatic rounding of fractional values - Refactored the
MetricsClientinterface to acceptResourceobjects instead of pre-computed metric names, moving metric name extraction logic into individual client implementations - Exported helper functions (
ShortDurationString,GetKubernetesMetricName) to support both K8s and Prometheus clients
Reviewed changes
Copilot reviewed 7 out of 8 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/scalertypes/types.go | Added QueryTemplate struct and methods, updated MetricsClient interface signature, exported utility functions for metric name generation |
| pkg/autoscaler/metricsclient/prometheus.go | New Prometheus client implementation with query templating, concurrent metric fetching, and value rounding logic |
| pkg/autoscaler/metricsclient/prometheus_test.go | Comprehensive unit tests for Prometheus client using mock HTTP server |
| pkg/autoscaler/metricsclient/k8s.go | Updated K8s custom metrics client to accept resources and extract metric names internally |
| pkg/autoscaler/metricsclient/factory.go | Added factory support for creating Prometheus client |
| pkg/autoscaler/autoscaler.go | Simplified by removing getMetricNames function and passing resources directly to metrics client |
| go.mod, go.sum | Added Prometheus client dependencies and updated transitive dependencies |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| type windowSizeLookup map[string]map[string]struct{} | ||
|
|
||
| // metricLookup maps metricName → windowSizeLookup | ||
| type metricLookup map[string]windowSizeLookup |
There was a problem hiding this comment.
what is the point of adding these types instead of returning the native types directly?
There was a problem hiding this comment.
I think it improve the readability of the nested structure
|
|
||
| // GetResourceMetrics retrieves metrics for multiple resources | ||
| func (pc *PrometheusMetricsClient) GetResourceMetrics(resources []scalertypes.Resource) (map[string]map[string]int, error) { | ||
| ctx, cancel := context.WithTimeout(context.Background(), pc.interval) |
There was a problem hiding this comment.
So if it takes longer than an interval and the order in which we execute requests is always the same, are we at risk of failing every time with the same functions?
There was a problem hiding this comment.
If a single function have a consistent error it won't be STZ.
I updated the code so a single function failure won't fail the all other resources' STZ here - CR 5 - failure only if all goroutines are failing, log failure on the…
| resultCh := make(chan result, 1) | ||
| defer close(resultCh) | ||
|
|
||
| go func() { | ||
| metrics, err := pc.getResourceMetrics(ctx, metricToWindowSizes) | ||
| resultCh <- result{metrics: metrics, err: err} | ||
| }() | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| return nil, errors.Wrap(ctx.Err(), "timeout waiting for resource metrics") | ||
| case res := <-resultCh: | ||
| return res.metrics, res.err |
There was a problem hiding this comment.
The need for this select is basically limit the execution for interval.
I added some comments in the code here - CR 5 - failure only if all goroutines are failing, log failure on the…
| wg := sync.WaitGroup{} | ||
|
|
||
| for metricName, queryTemplate := range pc.queryTemplates { | ||
| windowSizeToResources := metricToWindowSizes[metricName] | ||
|
|
||
| for windowSize, resourcesInWindowSize := range windowSizeToResources { | ||
| wg.Add(1) |
There was a problem hiding this comment.
use semaphore with limited number of goroutines, no need to run all at once, go scheduler will spend more time switching if number of functions is large
There was a problem hiding this comment.
The number of goroutines is limited to the number of windowSize.
The default is 6 link so I would prefer to keep the wg use as is
| } | ||
|
|
||
| var collectedErrors []error | ||
| collectorDone := make(chan struct{}) |
There was a problem hiding this comment.
if we need at least one error to fail, why getting all the errors. I suggest having one atomic var (firstErr or whatever) and use CompareAndSwap to set it. If at least one error occurred, fail. Why do we store each error?
There was a problem hiding this comment.
|
Hey all, I’ve summarized the feedback into the following decision: If there is a partial failure during execution (e.g., failing to query a specific window size, failing to parse a response, etc.), the error will be logged and the overall routine will continue. The metrics client will return an error only if all metrics fail, or if there is a conflict between two metrics for the same resource and it cannot determine which one to use. We can further discuss how to handle the second case if needed. |
4b3d22f to
e89e0b1
Compare
### 📝 Description This code changes are supporting the metricsClient changes in the scaler repository. This PR introduce the configurations additions and init the scaler with the metrics client. --- ### 🛠️ Changes Made - Add `MetricsClient` configuration to the platform config - Update the `createAutoScaler` to use the metricsClient --- ### ✅ Checklist - [ ] I updated the documentation (if applicable) - [x] I have tested the changes in this PR --- ### 🧪 Testing - Dev test - backward compatibility: nothing is changed with the new scaler and no configuration changes - prometheus client: added the relevant configurations and validates that functions STZ as expected --- ### 🔗 References - Ticket link: https://iguazio.atlassian.net/browse/NUC-713 - Design docs links: https://iguazio.atlassian.net/wiki/spaces/PLAT/pages/627572831/Nuclio+STZ+using+Prometheus+client+-+HLD#Support-Prometheus-client-during-scaler-init - Merge blocked by: v3io/scaler#84 --- ### 🚨 Breaking Changes? - [ ] Yes (explain below) - [x] No <!-- If yes, describe what needs to be changed downstream: --> --- ### 🔍️ Additional Notes <!-- Anything else reviewers should know (follow-up tasks, known issues, affected areas etc.). --> <!-- ### 📸 Screenshots / Logs -->
📝 Description
This PR introduces a Prometheus-based metrics client to the autoscaler and refactors the metrics collection flow to support it.
🛠️ Changes Made
Prometheus clientthat implements MetricsClient interface some key functionalities:nominated STZresources instead of precomputed metric names.✅ Checklist
🧪 Testing
🔗 References
🚨 Breaking Changes?
🔍️ Additional Notes