Skip to content

Commit 9f4a222

Browse files
committed
Wire up context, add stop channels, etc
1 parent 72fdebb commit 9f4a222

File tree

10 files changed

+74
-35
lines changed

10 files changed

+74
-35
lines changed

vertical-pod-autoscaler/pkg/admission-controller/main.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
"flag"
2122
"fmt"
2223
"net/http"
@@ -99,12 +100,14 @@ func main() {
99100

100101
config := common.CreateKubeConfigOrDie(commonFlags.KubeConfig, float32(commonFlags.KubeApiQps), int(commonFlags.KubeApiBurst))
101102

103+
ctx := context.Background()
104+
102105
vpaClient := vpa_clientset.NewForConfigOrDie(config)
103106
vpaLister := vpa_api_util.NewVpasLister(vpaClient, make(chan struct{}), commonFlags.VpaObjectNamespace)
104107
kubeClient := kube_client.NewForConfigOrDie(config)
105108
factory := informers.NewSharedInformerFactory(kubeClient, defaultResyncPeriod)
106-
targetSelectorFetcher := target.NewVpaTargetSelectorFetcher(config, kubeClient, factory)
107-
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
109+
targetSelectorFetcher := target.NewVpaTargetSelectorFetcher(ctx, config, kubeClient, factory)
110+
controllerFetcher := controllerfetcher.NewControllerFetcher(ctx, config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
108111
podPreprocessor := pod.NewDefaultPreProcessor()
109112
vpaPreprocessor := vpa.NewDefaultPreProcessor()
110113
var limitRangeCalculator limitrange.LimitRangeCalculator

vertical-pod-autoscaler/pkg/recommender/app/app.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ func (app *RecommenderApp) Run(ctx context.Context, leaderElection componentbase
101101
metrics_recommender.Register()
102102
metrics_quality.Register()
103103
metrics_resources.Register()
104-
server.Initialize(&app.config.CommonFlags.EnableProfiling, healthCheck, &app.config.Address)
104+
server.InitializeWithContext(ctx, &app.config.CommonFlags.EnableProfiling, healthCheck, &app.config.Address)
105105

106106
if !leaderElection.LeaderElect {
107-
return app.run(ctx, healthCheck)
107+
return app.run(ctx, stopCh, healthCheck)
108108
} else {
109109
id, err := os.Hostname()
110110
if err != nil {
@@ -138,7 +138,7 @@ func (app *RecommenderApp) Run(ctx context.Context, leaderElection componentbase
138138
ReleaseOnCancel: true,
139139
Callbacks: leaderelection.LeaderCallbacks{
140140
OnStartedLeading: func(_ context.Context) {
141-
if err := app.run(ctx, healthCheck); err != nil {
141+
if err := app.run(ctx, stopCh, healthCheck); err != nil {
142142
klog.Fatalf("Error running recommender: %v", err)
143143
}
144144
},
@@ -152,16 +152,12 @@ func (app *RecommenderApp) Run(ctx context.Context, leaderElection componentbase
152152
return nil
153153
}
154154

155-
func (app *RecommenderApp) run(ctx context.Context, healthCheck *metrics.HealthCheck) error {
156-
// Create a stop channel that will be used to signal shutdown
157-
stopCh := make(chan struct{})
158-
defer close(stopCh)
159-
155+
func (app *RecommenderApp) run(ctx context.Context, stopCh chan struct{}, healthCheck *metrics.HealthCheck) error {
160156
config := common.CreateKubeConfigOrDie(app.config.CommonFlags.KubeConfig, float32(app.config.CommonFlags.KubeApiQps), int(app.config.CommonFlags.KubeApiBurst))
161157
kubeClient := kube_client.NewForConfigOrDie(config)
162158
clusterState := model.NewClusterState(aggregateContainerStateGCInterval)
163159
factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(app.config.CommonFlags.VpaObjectNamespace))
164-
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
160+
controllerFetcher := controllerfetcher.NewControllerFetcher(ctx, config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
165161
podLister, oomObserver := input.NewPodListerAndOOMObserver(ctx, kubeClient, app.config.CommonFlags.VpaObjectNamespace, stopCh)
166162

167163
factory.Start(stopCh)
@@ -223,7 +219,7 @@ func (app *RecommenderApp) run(ctx context.Context, healthCheck *metrics.HealthC
223219
VpaLister: vpa_api_util.NewVpasLister(vpa_clientset.NewForConfigOrDie(config), stopCh, app.config.CommonFlags.VpaObjectNamespace),
224220
VpaCheckpointLister: vpa_api_util.NewVpaCheckpointLister(vpa_clientset.NewForConfigOrDie(config), stopCh, app.config.CommonFlags.VpaObjectNamespace),
225221
ClusterState: clusterState,
226-
SelectorFetcher: target.NewVpaTargetSelectorFetcher(config, kubeClient, factory),
222+
SelectorFetcher: target.NewVpaTargetSelectorFetcher(ctx, config, kubeClient, factory),
227223
MemorySaveMode: app.config.MemorySaver,
228224
ControllerFetcher: controllerFetcher,
229225
RecommenderName: app.config.RecommenderName,

vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,12 @@ func WatchEvictionEventsWithRetries(ctx context.Context, kubeClient kube_client.
138138
// Wait between attempts, retrying too often breaks API server.
139139
waitTime := wait.Jitter(evictionWatchRetryWait, evictionWatchJitterFactor)
140140
klog.V(1).InfoS("An attempt to watch eviction events finished", "waitTime", waitTime)
141-
time.Sleep(waitTime)
141+
// Use a timer that can be interrupted by context cancellation
142+
select {
143+
case <-ctx.Done():
144+
return
145+
case <-time.After(waitTime):
146+
}
142147
}
143148
}
144149
}()

vertical-pod-autoscaler/pkg/target/controller_fetcher/controller_fetcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (f *controllerFetcher) Start(ctx context.Context, loopPeriod time.Duration)
112112
}
113113

114114
// NewControllerFetcher returns a new instance of controllerFetcher
115-
func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory, betweenRefreshes, lifeTime time.Duration, jitterFactor float64) *controllerFetcher {
115+
func NewControllerFetcher(ctx context.Context, config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory, betweenRefreshes, lifeTime time.Duration, jitterFactor float64) *controllerFetcher {
116116
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
117117
if err != nil {
118118
klog.ErrorS(err, "Could not create discoveryClient")
@@ -122,9 +122,9 @@ func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface,
122122
restClient := kubeClient.CoreV1().RESTClient()
123123
cachedDiscoveryClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
124124
mapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient)
125-
go wait.Until(func() {
125+
go wait.UntilWithContext(ctx, func(ctx context.Context) {
126126
mapper.Reset()
127-
}, discoveryResetPeriod, make(chan struct{}))
127+
}, discoveryResetPeriod)
128128

129129
informersMap := map[wellKnownController]cache.SharedIndexInformer{
130130
daemonSet: factory.Apps().V1().DaemonSets().Informer(),

vertical-pod-autoscaler/pkg/target/fetcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ const (
6767
)
6868

6969
// NewVpaTargetSelectorFetcher returns new instance of VpaTargetSelectorFetcher
70-
func NewVpaTargetSelectorFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory) VpaTargetSelectorFetcher {
70+
func NewVpaTargetSelectorFetcher(ctx context.Context, config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory) VpaTargetSelectorFetcher {
7171
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
7272
if err != nil {
7373
klog.ErrorS(err, "Could not create discoveryClient")
@@ -77,9 +77,9 @@ func NewVpaTargetSelectorFetcher(config *rest.Config, kubeClient kube_client.Int
7777
restClient := kubeClient.CoreV1().RESTClient()
7878
cachedDiscoveryClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
7979
mapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient)
80-
go wait.Until(func() {
80+
go wait.UntilWithContext(ctx, func(ctx context.Context) {
8181
mapper.Reset()
82-
}, discoveryResetPeriod, make(chan struct{}))
82+
}, discoveryResetPeriod)
8383

8484
informersMap := map[wellKnownController]cache.SharedIndexInformer{
8585
daemonSet: factory.Apps().V1().DaemonSets().Informer(),

vertical-pod-autoscaler/pkg/updater/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,12 +175,13 @@ func defaultLeaderElectionConfiguration() componentbaseconfig.LeaderElectionConf
175175
func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
176176
stopCh := make(chan struct{})
177177
defer close(stopCh)
178+
ctx := context.Background()
178179
config := common.CreateKubeConfigOrDie(commonFlag.KubeConfig, float32(commonFlag.KubeApiQps), int(commonFlag.KubeApiBurst))
179180
kubeClient := kube_client.NewForConfigOrDie(config)
180181
vpaClient := vpa_clientset.NewForConfigOrDie(config)
181182
factory := informers.NewSharedInformerFactory(kubeClient, defaultResyncPeriod)
182-
targetSelectorFetcher := target.NewVpaTargetSelectorFetcher(config, kubeClient, factory)
183-
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
183+
targetSelectorFetcher := target.NewVpaTargetSelectorFetcher(ctx, config, kubeClient, factory)
184+
controllerFetcher := controllerfetcher.NewControllerFetcher(ctx, config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
184185
var limitRangeCalculator limitrange.LimitRangeCalculator
185186
limitRangeCalculator, err := limitrange.NewLimitsRangeCalculator(factory)
186187
if err != nil {

vertical-pod-autoscaler/pkg/utils/metrics/quality/quality.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,15 @@ var (
120120

121121
// Register initializes all VPA quality metrics
122122
func Register() {
123-
prometheus.MustRegister(usageRecommendationRelativeDiff)
124-
prometheus.MustRegister(usageMissingRecommendationCounter)
125-
prometheus.MustRegister(cpuRecommendationOverUsageDiff)
126-
prometheus.MustRegister(memoryRecommendationOverUsageDiff)
127-
prometheus.MustRegister(cpuRecommendationLowerOrEqualUsageDiff)
128-
prometheus.MustRegister(memoryRecommendationLowerOrEqualUsageDiff)
129-
prometheus.MustRegister(cpuRecommendations)
130-
prometheus.MustRegister(memoryRecommendations)
131-
prometheus.MustRegister(relativeRecommendationChange)
123+
_ = prometheus.Register(usageRecommendationRelativeDiff)
124+
_ = prometheus.Register(usageMissingRecommendationCounter)
125+
_ = prometheus.Register(cpuRecommendationOverUsageDiff)
126+
_ = prometheus.Register(memoryRecommendationOverUsageDiff)
127+
_ = prometheus.Register(cpuRecommendationLowerOrEqualUsageDiff)
128+
_ = prometheus.Register(memoryRecommendationLowerOrEqualUsageDiff)
129+
_ = prometheus.Register(cpuRecommendations)
130+
_ = prometheus.Register(memoryRecommendations)
131+
_ = prometheus.Register(relativeRecommendationChange)
132132
}
133133

134134
// observeUsageRecommendationRelativeDiff records relative diff between usage and

vertical-pod-autoscaler/pkg/utils/metrics/recommender/recommender.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,19 @@ type ObjectCounter struct {
115115

116116
// Register initializes all metrics for VPA Recommender
117117
func Register() {
118-
prometheus.MustRegister(vpaObjectCount, recommendationLatency, functionLatency, aggregateContainerStatesCount, metricServerResponses, prometheusClientRequestsCount, prometheusClientRequestsDuration)
118+
collectors := []prometheus.Collector{
119+
vpaObjectCount,
120+
recommendationLatency,
121+
functionLatency,
122+
aggregateContainerStatesCount,
123+
metricServerResponses,
124+
prometheusClientRequestsCount,
125+
prometheusClientRequestsDuration,
126+
}
127+
for _, c := range collectors {
128+
// Ignore AlreadyRegisteredError
129+
_ = prometheus.Register(c)
130+
}
119131
}
120132

121133
// NewExecutionTimer provides a timer for Recommender's RunOnce execution

vertical-pod-autoscaler/pkg/utils/metrics/resources/resources.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ var (
5656

5757
// Register initializes all metrics for VPA resources
5858
func Register() {
59-
prometheus.MustRegister(getResourcesCount)
59+
_ = prometheus.Register(getResourcesCount)
6060
}
6161

6262
// RecordGetResourcesCount records how many times VPA requested the resources (

vertical-pod-autoscaler/pkg/utils/server/server.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ limitations under the License.
1818
package server
1919

2020
import (
21+
"context"
2122
"net/http"
2223
"net/http/pprof"
2324

@@ -29,6 +30,12 @@ import (
2930

3031
// Initialize sets up Prometheus to expose metrics & (optionally) health-check and profiling on the given address
3132
func Initialize(enableProfiling *bool, healthCheck *metrics.HealthCheck, address *string) {
33+
InitializeWithContext(context.Background(), enableProfiling, healthCheck, address)
34+
}
35+
36+
// InitializeWithContext sets up Prometheus to expose metrics & (optionally) health-check and profiling on the given address.
37+
// The server will shut down gracefully when the context is canceled.
38+
func InitializeWithContext(ctx context.Context, enableProfiling *bool, healthCheck *metrics.HealthCheck, address *string) {
3239
go func() {
3340
mux := http.NewServeMux()
3441

@@ -45,8 +52,23 @@ func Initialize(enableProfiling *bool, healthCheck *metrics.HealthCheck, address
4552
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
4653
}
4754

48-
err := http.ListenAndServe(*address, mux)
49-
klog.ErrorS(err, "Failed to start metrics")
50-
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
55+
server := &http.Server{
56+
Addr: *address,
57+
Handler: mux,
58+
}
59+
60+
// Start server shutdown when context is canceled
61+
go func() {
62+
<-ctx.Done()
63+
if err := server.Shutdown(context.Background()); err != nil {
64+
klog.ErrorS(err, "Failed to shutdown metrics server")
65+
}
66+
}()
67+
68+
err := server.ListenAndServe()
69+
if err != nil && err != http.ErrServerClosed {
70+
klog.ErrorS(err, "Failed to start metrics")
71+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
72+
}
5173
}()
5274
}

0 commit comments

Comments
 (0)