diff --git a/internal/config/saturation_scaling.go b/internal/config/saturation_scaling.go index 257bb3f9a..5d6b40699 100644 --- a/internal/config/saturation_scaling.go +++ b/internal/config/saturation_scaling.go @@ -67,7 +67,7 @@ type AnalyzerScoreConfig struct { Name string `yaml:"name"` Enabled *bool `yaml:"enabled,omitempty"` // default true Score float64 `yaml:"score,omitempty"` // default 1.0 - ScaleUpThreshold *float64 `yaml:"scaleUpThreshold,omitempty"` // overrides global + ScaleUpThreshold *float64 `yaml:"scaleUpThreshold,omitempty"` // overrides global ScaleDownBoundary *float64 `yaml:"scaleDownBoundary,omitempty"` // overrides global } diff --git a/internal/controller/configmap_reconciler.go b/internal/controller/configmap_reconciler.go index 04d2e1a0e..400946e73 100644 --- a/internal/controller/configmap_reconciler.go +++ b/internal/controller/configmap_reconciler.go @@ -18,14 +18,22 @@ package controller import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/config" "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/datastore" @@ -92,11 +100,75 @@ func (r *ConfigMapReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } // SetupWithManager sets up the controller with the Manager. +// Instead of watching all ConfigMaps and filtering with predicates + cache-level +// label selectors, this creates a dedicated informer per well-known ConfigMap name. +// Each informer uses a server-side field selector (metadata.name=) so only +// matching objects are sent over the wire, avoiding the need for labels on ConfigMaps +// and preventing the controller from caching every ConfigMap in the cluster. func (r *ConfigMapReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&corev1.ConfigMap{}). - WithEventFilter(ConfigMapPredicate(r.Datastore, r.Config)). - Complete(r) + clientset, err := kubernetes.NewForConfig(mgr.GetConfig()) + if err != nil { + return fmt.Errorf("creating clientset for ConfigMap informers: %w", err) + } + + wellKnownNames := []string{ + config.SaturationConfigMapName(), + config.DefaultScaleToZeroConfigMapName, + config.QMAnalyzerConfigMapName(), + } + + // Determine which namespaces to watch for ConfigMaps. + // In single-namespace mode, watch both the target namespace and the system + // namespace (where global ConfigMaps live). In multi-namespace mode, watch + // all namespaces. + watchNamespaces := []string{metav1.NamespaceAll} + if ns := r.Config.WatchNamespace(); ns != "" { + watchNamespaces = []string{ns} + if sysNS := config.SystemNamespace(); sysNS != ns { + watchNamespaces = append(watchNamespaces, sysNS) + } + } + + cmPredicate := ConfigMapPredicate(r.Datastore, r.Config) + + bldr := ctrl.NewControllerManagedBy(mgr). + Named("configmap") + + for _, name := range wellKnownNames { + for _, ns := range watchNamespaces { + cmName := name // capture loop variable + lw := toolscache.NewFilteredListWatchFromClient( + clientset.CoreV1().RESTClient(), + "configmaps", + ns, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", cmName).String() + }, + ) + + informer := toolscache.NewSharedIndexInformer( + lw, + &corev1.ConfigMap{}, + 0, + toolscache.Indexers{}, + ) + + if err := mgr.Add(&informerRunnable{informer: informer}); err != nil { + return fmt.Errorf("registering informer for ConfigMap %q in namespace %q: %w", cmName, ns, err) + } + + // Predicates must be passed directly to the source because + // WithEventFilter (globalPredicates) does not apply to raw sources + // in controller-runtime v0.22.5. + bldr.WatchesRawSource(&source.Informer{ + Informer: informer, + Handler: &handler.EnqueueRequestForObject{}, + Predicates: []predicate.Predicate{cmPredicate}, + }) + } + } + + return bldr.Complete(r) } // handleConfigMapDeletion handles ConfigMap deletion events. @@ -215,3 +287,23 @@ func (r *ConfigMapReconciler) handleQMAnalyzerConfigMap(ctx context.Context, cm logger.Info("Updated namespace-local queueing model config from ConfigMap", "namespace", namespace, "entries", count) } } + +// informerRunnable wraps a SharedIndexInformer so it starts in the "Others" +// runnable group (NeedLeaderElection=false), which the manager starts before +// the LeaderElection group where controllers live. This ensures informer.Run() +// is called before source.Informer.Start() calls WaitForCacheSync. +type informerRunnable struct { + informer toolscache.SharedIndexInformer +} + +// Start runs the informer until the context is cancelled. +func (ir *informerRunnable) Start(ctx context.Context) error { + ir.informer.Run(ctx.Done()) + return nil +} + +// NeedLeaderElection returns false so the manager places this runnable in the +// "Others" group, which starts before the LeaderElection group (controllers). +func (ir *informerRunnable) NeedLeaderElection() bool { + return false +} diff --git a/internal/controller/predicates.go b/internal/controller/predicates.go index 4fb30c3e2..440963910 100644 --- a/internal/controller/predicates.go +++ b/internal/controller/predicates.go @@ -34,9 +34,8 @@ func ConfigMapPredicate(ds datastore.Datastore, cfg *config.Config) predicate.Pr namespace := obj.GetNamespace() systemNamespace := config.SystemNamespace() - // Well-known ConfigMap names + // Well-known ConfigMap names that have reconcile handlers wellKnownNames := map[string]bool{ - config.ConfigMapName(): true, config.SaturationConfigMapName(): true, config.DefaultScaleToZeroConfigMapName: true, config.QMAnalyzerConfigMapName(): true, diff --git a/internal/engines/pipeline/enforcer.go b/internal/engines/pipeline/enforcer.go index 7336eefa8..9bf1dcaa5 100644 --- a/internal/engines/pipeline/enforcer.go +++ b/internal/engines/pipeline/enforcer.go @@ -185,4 +185,3 @@ func updateDecisionAction(d *interfaces.VariantDecision, optimizerName string) { } d.Reason = fmt.Sprintf("V2 %s (optimizer: %s, enforced)", d.Action, optimizerName) } - diff --git a/internal/engines/pipeline/greedy_score_optimizer_test.go b/internal/engines/pipeline/greedy_score_optimizer_test.go index 7fce61cad..67589f513 100644 --- a/internal/engines/pipeline/greedy_score_optimizer_test.go +++ b/internal/engines/pipeline/greedy_score_optimizer_test.go @@ -967,10 +967,10 @@ var _ = Describe("GreedyByScoreOptimizer", func() { ModelID: "model-1", Namespace: "default", Result: &interfaces.AnalyzerResult{ - ModelID: "model-1", - Namespace: "default", - AnalyzedAt: time.Now(), - SpareCapacity: 50000, + ModelID: "model-1", + Namespace: "default", + AnalyzedAt: time.Now(), + SpareCapacity: 50000, VariantCapacities: []interfaces.VariantCapacity{ {VariantName: "expensive", AcceleratorName: "A100", Cost: 15.0, ReplicaCount: 3, PerReplicaCapacity: 20000}, {VariantName: "cheap", AcceleratorName: "A100", Cost: 5.0, ReplicaCount: 3, PerReplicaCapacity: 10000}, @@ -998,10 +998,10 @@ var _ = Describe("GreedyByScoreOptimizer", func() { ModelID: "model-1", Namespace: "default", Result: &interfaces.AnalyzerResult{ - ModelID: "model-1", - Namespace: "default", - AnalyzedAt: time.Now(), - SpareCapacity: 80000, // enough to remove all + ModelID: "model-1", + Namespace: "default", + AnalyzedAt: time.Now(), + SpareCapacity: 80000, // enough to remove all VariantCapacities: []interfaces.VariantCapacity{ {VariantName: "keep-alive", AcceleratorName: "A100", Cost: 15.0, ReplicaCount: 2, PerReplicaCapacity: 20000}, {VariantName: "expendable", AcceleratorName: "A100", Cost: 5.0, ReplicaCount: 3, PerReplicaCapacity: 10000}, diff --git a/internal/engines/saturation/engine_v2.go b/internal/engines/saturation/engine_v2.go index 0e5a4e3a6..4631bc937 100644 --- a/internal/engines/saturation/engine_v2.go +++ b/internal/engines/saturation/engine_v2.go @@ -115,7 +115,7 @@ func (e *Engine) runAnalyzersAndScore( switch aw.Name { case "saturation": totalWeighted += baseResult.RequiredCapacity * aw.Score - // future: case "throughput", "slo" + // future: case "throughput", "slo" } }