Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion charts/kubeflow-trainer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ manager:
| manager.volumeMounts | list | `[]` | Volume mounts for manager containers. |
| manager.resources | object | `{}` | Pod resource requests and limits for manager containers. |
| manager.securityContext | object | `{"allowPrivilegeEscalation":false,"capabilities":{"drop":["ALL"]},"runAsNonRoot":true,"seccompProfile":{"type":"RuntimeDefault"}}` | Security context for manager containers. |
| manager.config | object | `{"certManagement":{"enable":true,"webhookSecretName":"","webhookServiceName":""},"controller":{"groupKindConcurrency":{"clusterTrainingRuntime":1,"trainJob":5,"trainingRuntime":1}},"featureGates":{},"health":{"healthProbeBindAddress":":8081","livenessEndpointName":"healthz","readinessEndpointName":"readyz"},"leaderElection":{"leaderElect":true,"leaseDuration":"15s","renewDeadline":"10s","resourceName":"trainer.kubeflow.org","resourceNamespace":"","retryPeriod":"2s"},"metrics":{"bindAddress":":8443","secureServing":true},"statusServer":{"burst":10,"port":10443,"qps":5},"webhook":{"host":"","port":9443}}` | Controller manager configuration. This configuration is used to generate the ConfigMap for the controller manager. |
| manager.config | object | `{"certManagement":{"enable":true,"webhookSecretName":"","webhookServiceName":""},"clientConnection":{"burst":100,"qps":50},"controller":{"groupKindConcurrency":{"clusterTrainingRuntime":1,"trainJob":5,"trainingRuntime":1}},"featureGates":{},"health":{"healthProbeBindAddress":":8081","livenessEndpointName":"healthz","readinessEndpointName":"readyz"},"leaderElection":{"leaderElect":true,"leaseDuration":"15s","renewDeadline":"10s","resourceName":"trainer.kubeflow.org","resourceNamespace":"","retryPeriod":"2s"},"metrics":{"bindAddress":":8443","secureServing":true},"statusServer":{"burst":10,"port":10443,"qps":5},"webhook":{"host":"","port":9443}}` | Controller manager configuration. This configuration is used to generate the ConfigMap for the controller manager. |
| manager.config.clientConnection.qps | int | `50` | QPS rate limit for the manager's Kubernetes API client |
| manager.config.clientConnection.burst | int | `100` | Burst rate limit for the manager's Kubernetes API client |
| manager.config.statusServer.port | int | `10443` | Port that the TrainJob status server serves on. |
| manager.config.statusServer.qps | int | `5` | QPS rate limit for the TrainJob Status Server api client |
| manager.config.statusServer.burst | int | `10` | Burst rate limit for the TrainJob Status Server api client |
Expand Down
5 changes: 5 additions & 0 deletions charts/kubeflow-trainer/templates/manager/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ data:
webhookServiceName: {{ if .Values.manager.config.certManagement.webhookServiceName }}{{ .Values.manager.config.certManagement.webhookServiceName }}{{ else }}{{ include "trainer.webhook.service.name" . }}{{ end }}
webhookSecretName: {{ if .Values.manager.config.certManagement.webhookSecretName }}{{ .Values.manager.config.certManagement.webhookSecretName }}{{ else }}{{ include "trainer.webhook.secret.name" . }}{{ end }}

# Client connection configuration
clientConnection:
qps: {{ .Values.manager.config.clientConnection.qps }}
burst: {{ .Values.manager.config.clientConnection.burst }}

statusServer:
port: {{ .Values.manager.config.statusServer.port }}
qps: {{ .Values.manager.config.statusServer.qps }}
Expand Down
5 changes: 5 additions & 0 deletions charts/kubeflow-trainer/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ manager:
# webhookServiceName and webhookSecretName are auto-generated if not specified
webhookServiceName: ""
webhookSecretName: ""
clientConnection:
# -- QPS rate limit for the manager's Kubernetes API client
qps: 50
# -- Burst rate limit for the manager's Kubernetes API client
burst: 100
statusServer:
# -- Port that the TrainJob status server serves on.
port: 10443
Expand Down
7 changes: 5 additions & 2 deletions cmd/trainer-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,11 @@ func main() {
}
}

setupLog.Info("Creating manager")
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), options)
restCfg := ctrl.GetConfigOrDie()
config.ApplyClientConnection(restCfg, &cfg)

setupLog.Info("Creating manager", "qps", restCfg.QPS, "burst", restCfg.Burst)
mgr, err := ctrl.NewManager(restCfg, options)
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -133,6 +134,19 @@ func Load(scheme *runtime.Scheme, configFile string, enableHTTP2 bool) (ctrl.Opt
return options, cfg, nil
}

// ApplyClientConnection sets QPS and burst on the given rest.Config
// from the Configuration's clientConnection settings.
func ApplyClientConnection(restCfg *rest.Config, cfg *configapi.Configuration) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the rationale for having a separate function for this rather than doing this as part of Load?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - re-reading this I see the rest config is created in main.go directly. A separate function makes sense.

if cfg.ClientConnection != nil {
if cfg.ClientConnection.QPS != nil {
restCfg.QPS = *cfg.ClientConnection.QPS
}
if cfg.ClientConnection.Burst != nil {
restCfg.Burst = int(*cfg.ClientConnection.Burst)
}
}
}

// IsCertManagementEnabled returns true if certificate management is enabled.
// Returns true by default if not explicitly disabled.
func IsCertManagementEnabled(cfg *configapi.Configuration) bool {
Expand Down
120 changes: 120 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
componentconfigv1alpha1 "k8s.io/component-base/config/v1alpha1"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -798,6 +799,125 @@ func TestIsCertManagementEnabled(t *testing.T) {
}
}

func TestApplyClientConnection(t *testing.T) {
testcases := map[string]struct {
cfg configapi.Configuration
initialQPS float32
initialBurst int
wantQPS float32
wantBurst int
}{
"nil ClientConnection keeps rest.Config defaults": {
cfg: configapi.Configuration{},
initialQPS: 5,
initialBurst: 10,
wantQPS: 5,
wantBurst: 10,
},
"default values override client-go defaults": {
cfg: configapi.Configuration{
ClientConnection: &configapi.ClientConnection{
QPS: ptr.To[float32](50),
Burst: ptr.To[int32](100),
},
},
initialQPS: 5,
initialBurst: 10,
wantQPS: 50,
wantBurst: 100,
},
"custom values applied": {
cfg: configapi.Configuration{
ClientConnection: &configapi.ClientConnection{
QPS: ptr.To[float32](200),
Burst: ptr.To[int32](400),
},
},
wantQPS: 200,
wantBurst: 400,
},
"only QPS set preserves existing burst": {
cfg: configapi.Configuration{
ClientConnection: &configapi.ClientConnection{
QPS: ptr.To[float32](75),
},
},
initialBurst: 10,
wantQPS: 75,
wantBurst: 10,
},
"only burst set preserves existing QPS": {
cfg: configapi.Configuration{
ClientConnection: &configapi.ClientConnection{
Burst: ptr.To[int32](150),
},
},
initialQPS: 5,
wantQPS: 5,
wantBurst: 150,
},
"zero QPS is valid and applied": {
cfg: configapi.Configuration{
ClientConnection: &configapi.ClientConnection{
QPS: ptr.To[float32](0),
Burst: ptr.To[int32](0),
},
},
initialQPS: 5,
initialBurst: 10,
wantQPS: 0,
wantBurst: 0,
},
"empty ClientConnection struct preserves existing values": {
cfg: configapi.Configuration{
ClientConnection: &configapi.ClientConnection{},
},
initialQPS: 5,
initialBurst: 10,
wantQPS: 5,
wantBurst: 10,
},
}

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
restCfg := &rest.Config{
QPS: tc.initialQPS,
Burst: tc.initialBurst,
}
ApplyClientConnection(restCfg, &tc.cfg)
if restCfg.QPS != tc.wantQPS {
t.Errorf("QPS = %v, want %v", restCfg.QPS, tc.wantQPS)
}
if restCfg.Burst != tc.wantBurst {
t.Errorf("Burst = %v, want %v", restCfg.Burst, tc.wantBurst)
}
})
}
}

func TestLoadAndApplyClientConnection(t *testing.T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this effectively re-testing logic we already cover in TestApplyClientConnection and TestLoad?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @robert-bell ! Addressed all your feedback ✅

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please review !

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @abhijeet-dhumal. I was more wondering whether this test is needed?

testScheme := runtime.NewScheme()
if err := configapi.AddToScheme(testScheme); err != nil {
t.Fatal(err)
}

_, cfg, err := Load(testScheme, "", false)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

restCfg := &rest.Config{QPS: 5, Burst: 10}
ApplyClientConnection(restCfg, &cfg)

if restCfg.QPS != 50 {
t.Errorf("QPS = %v, want 50", restCfg.QPS)
}
if restCfg.Burst != 100 {
t.Errorf("Burst = %v, want 100", restCfg.Burst)
}
}

func TestLoadHTTP2(t *testing.T) {
testScheme := runtime.NewScheme()
if err := configapi.AddToScheme(testScheme); err != nil {
Expand Down
Loading