Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions charts/kubeflow-trainer/templates/manager/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ data:
webhookServiceName: {{ if .Values.manager.config.certManagement.webhookServiceName }}{{ .Values.manager.config.certManagement.webhookServiceName }}{{ else }}{{ include "trainer.webhook.service.name" . }}{{ end }}
webhookSecretName: {{ if .Values.manager.config.certManagement.webhookSecretName }}{{ .Values.manager.config.certManagement.webhookSecretName }}{{ else }}{{ include "trainer.webhook.secret.name" . }}{{ end }}

# Client connection configuration
clientConnection:
qps: {{ .Values.manager.config.clientConnection.qps | default 50 }}
burst: {{ .Values.manager.config.clientConnection.burst | default 100 }}
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Helm's default here will treat explicit 0 as empty, so users cannot set clientConnection.qps/burst to 0 even though the API allows it; render the values directly (relying on values.yaml defaults) or gate on key existence (hasKey) instead of default so explicit 0 is preserved.

Suggested change
qps: {{ .Values.manager.config.clientConnection.qps | default 50 }}
burst: {{ .Values.manager.config.clientConnection.burst | default 100 }}
qps: {{ dig "manager" "config" "clientConnection" "qps" 50 .Values }}
burst: {{ dig "manager" "config" "clientConnection" "burst" 100 .Values }}

Copilot uses AI. Check for mistakes.
Comment thread
abhijeet-dhumal marked this conversation as resolved.
Outdated

statusServer:
port: {{ .Values.manager.config.statusServer.port }}
qps: {{ .Values.manager.config.statusServer.qps }}
Expand Down
5 changes: 5 additions & 0 deletions charts/kubeflow-trainer/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ manager:
# webhookServiceName and webhookSecretName are auto-generated if not specified
webhookServiceName: ""
webhookSecretName: ""
clientConnection:
# -- QPS rate limit for the manager's Kubernetes API client
qps: 50
# -- Burst rate limit for the manager's Kubernetes API client
burst: 100
statusServer:
# -- Port that the TrainJob status server serves on.
port: 10443
Expand Down
7 changes: 5 additions & 2 deletions cmd/trainer-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,11 @@ func main() {
}
}

setupLog.Info("Creating manager")
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), options)
restCfg := ctrl.GetConfigOrDie()
config.ApplyClientConnection(restCfg, &cfg)

setupLog.Info("Creating manager", "qps", restCfg.QPS, "burst", restCfg.Burst)
mgr, err := ctrl.NewManager(restCfg, options)
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -133,6 +134,19 @@ func Load(scheme *runtime.Scheme, configFile string, enableHTTP2 bool) (ctrl.Opt
return options, cfg, nil
}

// ApplyClientConnection sets QPS and burst on the given rest.Config
// from the Configuration's clientConnection settings.
func ApplyClientConnection(restCfg *rest.Config, cfg *configapi.Configuration) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the rationale for having a separate function for this rather than doing this as part of Load?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - re-reading this I see the rest config is created in main.go directly. A separate function makes sense.

if cfg.ClientConnection != nil {
if cfg.ClientConnection.QPS != nil {
restCfg.QPS = *cfg.ClientConnection.QPS
}
if cfg.ClientConnection.Burst != nil {
restCfg.Burst = int(*cfg.ClientConnection.Burst)
}
}
}

// IsCertManagementEnabled returns true if certificate management is enabled.
// Returns true by default if not explicitly disabled.
func IsCertManagementEnabled(cfg *configapi.Configuration) bool {
Expand Down
165 changes: 165 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
componentconfigv1alpha1 "k8s.io/component-base/config/v1alpha1"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -798,6 +799,170 @@ func TestIsCertManagementEnabled(t *testing.T) {
}
}

func TestApplyClientConnection(t *testing.T) {
testcases := []struct {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the convention is to use map-based test cases rather than list - could you udpate pls?

Define test cases as maps instead of slices to avoid dependencies on the running order. Map key should be equal to the test case name.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @robert-bell , great catch 🙌

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done ✅ - Converted both TestApplyClientConnection and TestLoadAndApplyClientConnection to use map-based test cases.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name string
cfg configapi.Configuration
initialQPS float32
initialBurst int
wantQPS float32
wantBurst int
}{
{
name: "nil ClientConnection keeps rest.Config defaults",
cfg: configapi.Configuration{},
initialQPS: 5,
initialBurst: 10,
wantQPS: 5,
wantBurst: 10,
},
{
name: "default values override client-go defaults",
cfg: configapi.Configuration{
ClientConnection: &configapi.ClientConnection{
QPS: ptr.To[float32](50),
Burst: ptr.To[int32](100),
},
},
initialQPS: 5,
initialBurst: 10,
wantQPS: 50,
wantBurst: 100,
},
{
name: "custom values applied",
cfg: configapi.Configuration{
ClientConnection: &configapi.ClientConnection{
QPS: ptr.To[float32](200),
Burst: ptr.To[int32](400),
},
},
wantQPS: 200,
wantBurst: 400,
},
{
name: "only QPS set preserves existing burst",
cfg: configapi.Configuration{
ClientConnection: &configapi.ClientConnection{
QPS: ptr.To[float32](75),
},
},
initialBurst: 10,
wantQPS: 75,
wantBurst: 10,
},
{
name: "only burst set preserves existing QPS",
cfg: configapi.Configuration{
ClientConnection: &configapi.ClientConnection{
Burst: ptr.To[int32](150),
},
},
initialQPS: 5,
wantQPS: 5,
wantBurst: 150,
},
{
name: "zero QPS is valid and applied",
cfg: configapi.Configuration{
ClientConnection: &configapi.ClientConnection{
QPS: ptr.To[float32](0),
Burst: ptr.To[int32](0),
},
},
initialQPS: 5,
initialBurst: 10,
wantQPS: 0,
wantBurst: 0,
},
{
name: "empty ClientConnection struct preserves existing values",
cfg: configapi.Configuration{
ClientConnection: &configapi.ClientConnection{},
},
initialQPS: 5,
initialBurst: 10,
wantQPS: 5,
wantBurst: 10,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
restCfg := &rest.Config{
QPS: tc.initialQPS,
Burst: tc.initialBurst,
}
ApplyClientConnection(restCfg, &tc.cfg)
if restCfg.QPS != tc.wantQPS {
t.Errorf("QPS = %v, want %v", restCfg.QPS, tc.wantQPS)
}
if restCfg.Burst != tc.wantBurst {
t.Errorf("Burst = %v, want %v", restCfg.Burst, tc.wantBurst)
}
})
}
}

func TestLoadAndApplyClientConnection(t *testing.T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this effectively re-testing logic we already cover in TestApplyClientConnection and TestLoad?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @robert-bell ! Addressed all your feedback ✅

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please review !

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @abhijeet-dhumal. I was more wondering whether this test is needed?

testScheme := runtime.NewScheme()
if err := configapi.AddToScheme(testScheme); err != nil {
t.Fatal(err)
}

tmpDir := t.TempDir()

customQPSConfig := filepath.Join(tmpDir, "custom-qps.yaml")
if err := os.WriteFile(customQPSConfig, []byte(`
apiVersion: config.trainer.kubeflow.org/v1alpha1
kind: Configuration
clientConnection:
qps: 100
burst: 200
`), os.FileMode(0600)); err != nil {
t.Fatal(err)
}

testcases := []struct {
name string
configFile string
wantQPS float32
wantBurst int
}{
{
name: "default config applies default QPS/burst",
configFile: "",
wantQPS: 50,
wantBurst: 100,
},
{
name: "custom config applies custom QPS/burst",
configFile: customQPSConfig,
wantQPS: 100,
wantBurst: 200,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
_, cfg, err := Load(testScheme, tc.configFile, false)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

restCfg := &rest.Config{QPS: 5, Burst: 10}
ApplyClientConnection(restCfg, &cfg)

if restCfg.QPS != tc.wantQPS {
t.Errorf("QPS = %v, want %v", restCfg.QPS, tc.wantQPS)
}
if restCfg.Burst != tc.wantBurst {
t.Errorf("Burst = %v, want %v", restCfg.Burst, tc.wantBurst)
}
})
}
}

func TestLoadHTTP2(t *testing.T) {
testScheme := runtime.NewScheme()
if err := configapi.AddToScheme(testScheme); err != nil {
Expand Down
Loading