Skip to content

Commit 2370dbb

Browse files
committed
Wrap main in a controller-runtime Manager
Start refactoring cluster autoscaler's main to use Manager from controller-runtime. For now, that's needed to use Client that's used by CapacityQuota provider. In the future, main should be refactored, so the manager handles leader election, metrics, healthcheck and pprof servers and graceful shutdown. Additionally, that change allows us to write controller-runtime style controllers.
1 parent 293d88c commit 2370dbb

File tree

3 files changed

+78
-18
lines changed

3 files changed

+78
-18
lines changed

cluster-autoscaler/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ require (
223223
golang.org/x/text v0.29.0 // indirect
224224
golang.org/x/time v0.9.0 // indirect
225225
golang.org/x/tools v0.36.0 // indirect
226+
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
226227
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
227228
google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a // indirect
228229
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect

cluster-autoscaler/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
195195
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
196196
github.com/euank/go-kmsg-parser v2.0.0+incompatible h1:cHD53+PLQuuQyLZeriD1V/esuG4MuU0Pjs5y6iknohY=
197197
github.com/euank/go-kmsg-parser v2.0.0+incompatible/go.mod h1:MhmAMZ8V4CYH4ybgdRwPr2TU5ThnS43puaKEMpja1uw=
198+
github.com/evanphx/json-patch v5.9.0+incompatible h1:fBXyNpNMuTTDdquAq/uisOr2lShz4oaXpDTX2bLe7ls=
199+
github.com/evanphx/json-patch v5.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
198200
github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU=
199201
github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM=
200202
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=

cluster-autoscaler/main.go

Lines changed: 75 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import (
2626
"time"
2727

2828
"github.com/spf13/pflag"
29-
29+
"k8s.io/apimachinery/pkg/runtime"
30+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31+
cqv1alpha1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacityquota/autoscaling.x-k8s.io/v1alpha1"
3032
capacityclient "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/client"
3133
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common"
3234
"k8s.io/autoscaler/cluster-autoscaler/config/flags"
@@ -41,7 +43,12 @@ import (
4143
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
4244
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
4345
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
46+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
4447
"k8s.io/kubernetes/pkg/features"
48+
ctrl "sigs.k8s.io/controller-runtime"
49+
"sigs.k8s.io/controller-runtime/pkg/cache"
50+
"sigs.k8s.io/controller-runtime/pkg/manager"
51+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
4552

4653
"k8s.io/apimachinery/pkg/api/meta"
4754
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -85,6 +92,16 @@ import (
8592
"k8s.io/klog/v2"
8693
)
8794

95+
var (
96+
scheme = runtime.NewScheme()
97+
)
98+
99+
func init() {
100+
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
101+
utilruntime.Must(cqv1alpha1.AddToScheme(scheme))
102+
// TODO: add other CRDs
103+
}
104+
88105
func registerSignalHandlers(autoscaler core.Autoscaler) {
89106
sigs := make(chan os.Signal, 1)
90107
signal.Notify(sigs, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGQUIT)
@@ -100,7 +117,7 @@ func registerSignalHandlers(autoscaler core.Autoscaler) {
100117
}()
101118
}
102119

103-
func buildAutoscaler(ctx context.Context, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, *loop.LoopTrigger, error) {
120+
func buildAutoscaler(ctx context.Context, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter, mgr manager.Manager) (core.Autoscaler, *loop.LoopTrigger, error) {
104121
// Get AutoscalingOptions from flags.
105122
autoscalingOptions := flags.AutoscalingOptions()
106123

@@ -133,6 +150,8 @@ func buildAutoscaler(ctx context.Context, debuggingSnapshotter debuggingsnapshot
133150
DeleteOptions: deleteOptions,
134151
DrainabilityRules: drainabilityRules,
135152
ScaleUpOrchestrator: orchestrator.New(),
153+
KubeClientNew: mgr.GetClient(),
154+
KubeCache: mgr.GetCache(),
136155
}
137156

138157
opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
@@ -304,12 +323,29 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho
304323
ctx, cancel := context.WithCancel(context.Background())
305324
defer cancel()
306325

307-
autoscaler, trigger, err := buildAutoscaler(ctx, debuggingSnapshotter)
326+
restConfig := kube_util.GetKubeConfig(autoscalingOpts.KubeClientOpts)
327+
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
328+
Scheme: scheme,
329+
Cache: cache.Options{
330+
DefaultTransform: cache.TransformStripManagedFields(),
331+
},
332+
// TODO: migrate leader election, metrics, healthcheck, pprof servers to Manager
333+
LeaderElection: false,
334+
Metrics: metricsserver.Options{BindAddress: "0"},
335+
HealthProbeBindAddress: "0",
336+
PprofBindAddress: "0",
337+
})
338+
if err != nil {
339+
klog.Fatalf("Failed to create manager: %v", err)
340+
}
341+
342+
autoscaler, trigger, err := buildAutoscaler(ctx, debuggingSnapshotter, mgr)
308343
if err != nil {
309344
klog.Fatalf("Failed to create autoscaler: %v", err)
310345
}
311346

312347
// Register signal handlers for graceful shutdown.
348+
// TODO: replace with ctrl.SetupSignalHandlers() and handle graceful shutdown with context
313349
registerSignalHandlers(autoscaler)
314350

315351
// Start updating health check endpoint.
@@ -320,22 +356,42 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho
320356
klog.Fatalf("Failed to autoscaler background components: %v", err)
321357
}
322358

323-
// Autoscale ad infinitum.
324-
if autoscalingOpts.FrequentLoopsEnabled {
325-
// We need to have two timestamps because the scaleUp activity alternates between processing ProvisioningRequests,
326-
// so we need to pass the older timestamp (previousRun) to trigger.Wait to run immediately if only one of the activities is productive.
327-
lastRun := time.Now()
328-
previousRun := time.Now()
329-
for {
330-
trigger.Wait(previousRun)
331-
previousRun, lastRun = lastRun, time.Now()
332-
loop.RunAutoscalerOnce(autoscaler, healthCheck, lastRun)
333-
}
334-
} else {
335-
for {
336-
time.Sleep(autoscalingOpts.ScanInterval)
337-
loop.RunAutoscalerOnce(autoscaler, healthCheck, time.Now())
359+
err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
360+
// Autoscale ad infinitum.
361+
if autoscalingOpts.FrequentLoopsEnabled {
362+
// We need to have two timestamps because the scaleUp activity alternates between processing ProvisioningRequests,
363+
// so we need to pass the older timestamp (previousRun) to trigger.Wait to run immediately if only one of the activities is productive.
364+
lastRun := time.Now()
365+
previousRun := time.Now()
366+
for {
367+
select {
368+
case <-ctx.Done():
369+
// TODO: handle graceful shutdown with context
370+
return nil
371+
default:
372+
trigger.Wait(previousRun)
373+
previousRun, lastRun = lastRun, time.Now()
374+
loop.RunAutoscalerOnce(autoscaler, healthCheck, lastRun)
375+
}
376+
}
377+
} else {
378+
for {
379+
select {
380+
case <-ctx.Done():
381+
// TODO: handle graceful shutdown with context
382+
return nil
383+
case <-time.After(autoscalingOpts.ScanInterval):
384+
loop.RunAutoscalerOnce(autoscaler, healthCheck, time.Now())
385+
}
386+
}
338387
}
388+
}))
389+
if err != nil {
390+
klog.Fatalf("Failed to add runnable to manager: %v", err)
391+
}
392+
393+
if err := mgr.Start(ctx); err != nil {
394+
klog.Fatalf("Manager exited with error: %v", err)
339395
}
340396
}
341397

@@ -373,6 +429,7 @@ func main() {
373429
if err := logsapi.ValidateAndApply(loggingConfig, featureGate); err != nil {
374430
klog.Fatalf("Failed to validate and apply logging configuration: %v", err)
375431
}
432+
ctrl.SetLogger(klog.NewKlogr())
376433

377434
healthCheck := metrics.NewHealthCheck(autoscalingOpts.MaxInactivityTime, autoscalingOpts.MaxFailingTime)
378435

0 commit comments

Comments
 (0)