Skip to content

Commit e773c0e

Browse files
author
Daniecki, Jozef
committed
add conccurrency flags
1 parent d4c6e0b commit e773c0e

3 files changed

Lines changed: 16 additions & 4 deletions

File tree

cmd/connect-controller/main.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ func main() {
6161
var connectionProbeTimeout time.Duration
6262
var profilerAddress string
6363
var enableContentionProfiling bool
64+
var concurrency int
65+
var kubeApiQPS float64
66+
var kubeApiBurst int
6467
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
6568
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
6669
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
@@ -81,6 +84,10 @@ func main() {
8184
flag.DurationVar(&connectionProbeTimeout, "connection-probe-timeout", 5*time.Minute, "The timeout duration for connection probes.")
8285
flag.StringVar(&profilerAddress, "profiler-address", "", "Bind address to expose the pprof profiler (e.g. localhost:6060)")
8386
flag.BoolVar(&enableContentionProfiling, "contention-profiling", false, "Enable block profiling")
87+
flag.IntVar(&concurrency, "concurrency", 1, "Maximum number of concurrent workers processing ClusterConnect resources")
88+
flag.Float64Var(&kubeApiQPS, "kube-api-qps", 20, "Maximum queries per second from the controller client to the Kubernetes API server.")
89+
flag.IntVar(&kubeApiBurst, "kube-api-burst", 30, "Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")
90+
8491
opts := zap.Options{
8592
Development: true,
8693
}
@@ -181,8 +188,11 @@ func main() {
181188
config.GetCertificate = metricsCertWatcher.GetCertificate
182189
})
183190
}
191+
restConfig := ctrl.GetConfigOrDie()
192+
restConfig.QPS = float32(kubeApiQPS)
193+
restConfig.Burst = kubeApiBurst
184194

185-
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
195+
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
186196
Scheme: scheme,
187197
Metrics: metricsServerOptions,
188198
WebhookServer: webhookServer,
@@ -220,7 +230,7 @@ func main() {
220230
if err = (&controller.ClusterConnectReconciler{
221231
Client: mgr.GetClient(),
222232
Scheme: mgr.GetScheme(),
223-
}).SetupWithManager(ctx, mgr, connectionProbeTimeout); err != nil {
233+
}).SetupWithManager(ctx, mgr, connectionProbeTimeout, concurrency); err != nil {
224234
setupLog.Error(err, "unable to create controller", "controller", "ClusterConnect")
225235
os.Exit(1)
226236
}

internal/controller/clusterconnect_controller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sigs.k8s.io/cluster-api/util/patch"
2626
ctrl "sigs.k8s.io/controller-runtime"
2727
"sigs.k8s.io/controller-runtime/pkg/client"
28+
"sigs.k8s.io/controller-runtime/pkg/controller"
2829
cutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
2930
"sigs.k8s.io/controller-runtime/pkg/event"
3031
"sigs.k8s.io/controller-runtime/pkg/handler"
@@ -126,7 +127,7 @@ func clusterRefIdxFunc(o client.Object) []string {
126127
}
127128

128129
// SetupWithManager sets up the controller with the Manager.
129-
func (r *ClusterConnectReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, connectionTimeout time.Duration) error {
130+
func (r *ClusterConnectReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, connectionTimeout time.Duration, concurrency int) error {
130131
if r.Client == nil {
131132
return errors.New("Client must not be nil")
132133
}
@@ -136,6 +137,7 @@ func (r *ClusterConnectReconciler) SetupWithManager(ctx context.Context, mgr ctr
136137
c, err := ctrl.NewControllerManagedBy(mgr).
137138
For(&v1alpha1.ClusterConnect{}).
138139
Named("cluster/clusterconnect").
140+
WithOptions(controller.Options{MaxConcurrentReconciles: concurrency}).
139141
Build(r)
140142

141143
if err != nil {

internal/controller/suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ var _ = BeforeSuite(func() {
105105
err = (&ClusterConnectReconciler{
106106
Client: k8sManager.GetClient(),
107107
Scheme: k8sManager.GetScheme(),
108-
}).SetupWithManager(ctx, k8sManager, 1*time.Second)
108+
}).SetupWithManager(ctx, k8sManager, 1*time.Second, 1)
109109
Expect(err).ToNot(HaveOccurred())
110110

111111
go func() {

0 commit comments

Comments
 (0)