diff --git a/charts/kubeflow-trainer/README.md b/charts/kubeflow-trainer/README.md index 36d40c19af..218160b587 100644 --- a/charts/kubeflow-trainer/README.md +++ b/charts/kubeflow-trainer/README.md @@ -136,7 +136,9 @@ manager: | manager.volumeMounts | list | `[]` | Volume mounts for manager containers. | | manager.resources | object | `{}` | Pod resource requests and limits for manager containers. | | manager.securityContext | object | `{"allowPrivilegeEscalation":false,"capabilities":{"drop":["ALL"]},"runAsNonRoot":true,"seccompProfile":{"type":"RuntimeDefault"}}` | Security context for manager containers. | -| manager.config | object | `{"certManagement":{"enable":true,"webhookSecretName":"","webhookServiceName":""},"controller":{"groupKindConcurrency":{"clusterTrainingRuntime":1,"trainJob":5,"trainingRuntime":1}},"featureGates":{},"health":{"healthProbeBindAddress":":8081","livenessEndpointName":"healthz","readinessEndpointName":"readyz"},"leaderElection":{"leaderElect":true,"leaseDuration":"15s","renewDeadline":"10s","resourceName":"trainer.kubeflow.org","resourceNamespace":"","retryPeriod":"2s"},"metrics":{"bindAddress":":8443","secureServing":true},"statusServer":{"burst":10,"port":10443,"qps":5},"webhook":{"host":"","port":9443}}` | Controller manager configuration. This configuration is used to generate the ConfigMap for the controller manager. | +| manager.config | object | `{"certManagement":{"enable":true,"webhookSecretName":"","webhookServiceName":""},"clientConnection":{"burst":100,"qps":50},"controller":{"groupKindConcurrency":{"clusterTrainingRuntime":1,"trainJob":5,"trainingRuntime":1}},"featureGates":{},"health":{"healthProbeBindAddress":":8081","livenessEndpointName":"healthz","readinessEndpointName":"readyz"},"leaderElection":{"leaderElect":true,"leaseDuration":"15s","renewDeadline":"10s","resourceName":"trainer.kubeflow.org","resourceNamespace":"","retryPeriod":"2s"},"metrics":{"bindAddress":":8443","secureServing":true},"statusServer":{"burst":10,"port":10443,"qps":5},"webhook":{"host":"","port":9443}}` | Controller manager configuration. This configuration is used to generate the ConfigMap for the controller manager. | +| manager.config.clientConnection.qps | int | `50` | QPS rate limit for the manager's Kubernetes API client | +| manager.config.clientConnection.burst | int | `100` | Burst rate limit for the manager's Kubernetes API client | | manager.config.statusServer.port | int | `10443` | Port that the TrainJob status server serves on. | | manager.config.statusServer.qps | int | `5` | QPS rate limit for the TrainJob Status Server api client | | manager.config.statusServer.burst | int | `10` | Burst rate limit for the TrainJob Status Server api client | diff --git a/charts/kubeflow-trainer/templates/manager/configmap.yaml b/charts/kubeflow-trainer/templates/manager/configmap.yaml index a9f76c1ff1..5f706c4fc2 100644 --- a/charts/kubeflow-trainer/templates/manager/configmap.yaml +++ b/charts/kubeflow-trainer/templates/manager/configmap.yaml @@ -63,6 +63,11 @@ data: webhookServiceName: {{ if .Values.manager.config.certManagement.webhookServiceName }}{{ .Values.manager.config.certManagement.webhookServiceName }}{{ else }}{{ include "trainer.webhook.service.name" . }}{{ end }} webhookSecretName: {{ if .Values.manager.config.certManagement.webhookSecretName }}{{ .Values.manager.config.certManagement.webhookSecretName }}{{ else }}{{ include "trainer.webhook.secret.name" . }}{{ end }} + # Client connection configuration + clientConnection: + qps: {{ .Values.manager.config.clientConnection.qps }} + burst: {{ .Values.manager.config.clientConnection.burst }} + statusServer: port: {{ .Values.manager.config.statusServer.port }} qps: {{ .Values.manager.config.statusServer.qps }} diff --git a/charts/kubeflow-trainer/values.yaml b/charts/kubeflow-trainer/values.yaml index 6d32e47214..384adce351 100644 --- a/charts/kubeflow-trainer/values.yaml +++ b/charts/kubeflow-trainer/values.yaml @@ -138,6 +138,11 @@ manager: # webhookServiceName and webhookSecretName are auto-generated if not specified webhookServiceName: "" webhookSecretName: "" + clientConnection: + # -- QPS rate limit for the manager's Kubernetes API client + qps: 50 + # -- Burst rate limit for the manager's Kubernetes API client + burst: 100 statusServer: # -- Port that the TrainJob status server serves on. port: 10443 diff --git a/cmd/trainer-controller-manager/main.go b/cmd/trainer-controller-manager/main.go index 3871dc84d2..4bb46890a8 100644 --- a/cmd/trainer-controller-manager/main.go +++ b/cmd/trainer-controller-manager/main.go @@ -118,8 +118,11 @@ func main() { } } - setupLog.Info("Creating manager") - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), options) + restCfg := ctrl.GetConfigOrDie() + config.ApplyClientConnection(restCfg, &cfg) + + setupLog.Info("Creating manager", "qps", restCfg.QPS, "burst", restCfg.Burst) + mgr, err := ctrl.NewManager(restCfg, options) if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) diff --git a/pkg/config/config.go b/pkg/config/config.go index 70e08dd579..8f65f03d1a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -133,6 +134,19 @@ func Load(scheme *runtime.Scheme, configFile string, enableHTTP2 bool) (ctrl.Opt return options, cfg, nil } +// ApplyClientConnection sets QPS and burst on the given rest.Config +// from the Configuration's clientConnection settings. +func ApplyClientConnection(restCfg *rest.Config, cfg *configapi.Configuration) { + if cfg.ClientConnection != nil { + if cfg.ClientConnection.QPS != nil { + restCfg.QPS = *cfg.ClientConnection.QPS + } + if cfg.ClientConnection.Burst != nil { + restCfg.Burst = int(*cfg.ClientConnection.Burst) + } + } +} + // IsCertManagementEnabled returns true if certificate management is enabled. // Returns true by default if not explicitly disabled. func IsCertManagementEnabled(cfg *configapi.Configuration) bool { diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index b043230ae2..8efca1ca67 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -27,6 +27,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" componentconfigv1alpha1 "k8s.io/component-base/config/v1alpha1" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" @@ -798,6 +799,125 @@ func TestIsCertManagementEnabled(t *testing.T) { } } +func TestApplyClientConnection(t *testing.T) { + testcases := map[string]struct { + cfg configapi.Configuration + initialQPS float32 + initialBurst int + wantQPS float32 + wantBurst int + }{ + "nil ClientConnection keeps rest.Config defaults": { + cfg: configapi.Configuration{}, + initialQPS: 5, + initialBurst: 10, + wantQPS: 5, + wantBurst: 10, + }, + "default values override client-go defaults": { + cfg: configapi.Configuration{ + ClientConnection: &configapi.ClientConnection{ + QPS: ptr.To[float32](50), + Burst: ptr.To[int32](100), + }, + }, + initialQPS: 5, + initialBurst: 10, + wantQPS: 50, + wantBurst: 100, + }, + "custom values applied": { + cfg: configapi.Configuration{ + ClientConnection: &configapi.ClientConnection{ + QPS: ptr.To[float32](200), + Burst: ptr.To[int32](400), + }, + }, + wantQPS: 200, + wantBurst: 400, + }, + "only QPS set preserves existing burst": { + cfg: configapi.Configuration{ + ClientConnection: &configapi.ClientConnection{ + QPS: ptr.To[float32](75), + }, + }, + initialBurst: 10, + wantQPS: 75, + wantBurst: 10, + }, + "only burst set preserves existing QPS": { + cfg: configapi.Configuration{ + ClientConnection: &configapi.ClientConnection{ + Burst: ptr.To[int32](150), + }, + }, + initialQPS: 5, + wantQPS: 5, + wantBurst: 150, + }, + "zero QPS is valid and applied": { + cfg: configapi.Configuration{ + ClientConnection: &configapi.ClientConnection{ + QPS: ptr.To[float32](0), + Burst: ptr.To[int32](0), + }, + }, + initialQPS: 5, + initialBurst: 10, + wantQPS: 0, + wantBurst: 0, + }, + "empty ClientConnection struct preserves existing values": { + cfg: configapi.Configuration{ + ClientConnection: &configapi.ClientConnection{}, + }, + initialQPS: 5, + initialBurst: 10, + wantQPS: 5, + wantBurst: 10, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + restCfg := &rest.Config{ + QPS: tc.initialQPS, + Burst: tc.initialBurst, + } + ApplyClientConnection(restCfg, &tc.cfg) + if restCfg.QPS != tc.wantQPS { + t.Errorf("QPS = %v, want %v", restCfg.QPS, tc.wantQPS) + } + if restCfg.Burst != tc.wantBurst { + t.Errorf("Burst = %v, want %v", restCfg.Burst, tc.wantBurst) + } + }) + } +} + +func TestLoadAndApplyClientConnection(t *testing.T) { + testScheme := runtime.NewScheme() + if err := configapi.AddToScheme(testScheme); err != nil { + t.Fatal(err) + } + + _, cfg, err := Load(testScheme, "", false) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + restCfg := &rest.Config{QPS: 5, Burst: 10} + ApplyClientConnection(restCfg, &cfg) + + if restCfg.QPS != 50 { + t.Errorf("QPS = %v, want 50", restCfg.QPS) + } + if restCfg.Burst != 100 { + t.Errorf("Burst = %v, want 100", restCfg.Burst) + } +} + func TestLoadHTTP2(t *testing.T) { testScheme := runtime.NewScheme() if err := configapi.AddToScheme(testScheme); err != nil {