Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/config/saturation_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type AnalyzerScoreConfig struct {
Name string `yaml:"name"`
Enabled *bool `yaml:"enabled,omitempty"` // default true
Score float64 `yaml:"score,omitempty"` // default 1.0
ScaleUpThreshold *float64 `yaml:"scaleUpThreshold,omitempty"` // overrides global
ScaleUpThreshold *float64 `yaml:"scaleUpThreshold,omitempty"` // overrides global
ScaleDownBoundary *float64 `yaml:"scaleDownBoundary,omitempty"` // overrides global
}

Expand Down
86 changes: 82 additions & 4 deletions internal/controller/configmap_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@ package controller

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/config"
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/datastore"
Expand Down Expand Up @@ -92,11 +99,62 @@ func (r *ConfigMapReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

// SetupWithManager sets up the controller with the Manager.
// Instead of watching all ConfigMaps and filtering with predicates + cache-level
// label selectors, this creates a dedicated informer per well-known ConfigMap name.
// Each informer uses a server-side field selector (metadata.name=<name>) so only
// matching objects are sent over the wire, avoiding the need for labels on ConfigMaps
// and preventing the controller from caching every ConfigMap in the cluster.
func (r *ConfigMapReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.ConfigMap{}).
WithEventFilter(ConfigMapPredicate(r.Datastore, r.Config)).
Complete(r)
clientset, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return fmt.Errorf("creating clientset for ConfigMap informers: %w", err)
}

wellKnownNames := []string{
config.SaturationConfigMapName(),
config.DefaultScaleToZeroConfigMapName,
config.QMAnalyzerConfigMapName(),
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Scope informers to the watch namespace if set, otherwise watch all namespaces.
watchNS := r.Config.WatchNamespace()
if watchNS == "" {
watchNS = metav1.NamespaceAll
}

bldr := ctrl.NewControllerManagedBy(mgr).
Named("configmap").
WithEventFilter(ConfigMapPredicate(r.Datastore, r.Config))
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

for _, name := range wellKnownNames {
cmName := name // capture loop variable
lw := toolscache.NewFilteredListWatchFromClient(
clientset.CoreV1().RESTClient(),
"configmaps",
watchNS,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", cmName).String()
},
)

informer := toolscache.NewSharedIndexInformer(
lw,
&corev1.ConfigMap{},
0,
toolscache.Indexers{},
)

if err := mgr.Add(&informerRunnable{informer: informer}); err != nil {
return fmt.Errorf("registering informer for ConfigMap %q: %w", cmName, err)
}

bldr.WatchesRawSource(&source.Informer{
Informer: informer,
Handler: &handler.EnqueueRequestForObject{},
})
}

return bldr.Complete(r)
}

// handleConfigMapDeletion handles ConfigMap deletion events.
Expand Down Expand Up @@ -215,3 +273,23 @@ func (r *ConfigMapReconciler) handleQMAnalyzerConfigMap(ctx context.Context, cm
logger.Info("Updated namespace-local queueing model config from ConfigMap", "namespace", namespace, "entries", count)
}
}

// informerRunnable wraps a SharedIndexInformer so it starts in the "Others"
// runnable group (NeedLeaderElection=false), which the manager starts before
// the LeaderElection group where controllers live. This ensures informer.Run()
// is called before source.Informer.Start() calls WaitForCacheSync.
type informerRunnable struct {
informer toolscache.SharedIndexInformer
}

// Start runs the informer until the context is cancelled.
func (ir *informerRunnable) Start(ctx context.Context) error {
ir.informer.Run(ctx.Done())
return nil
}

// NeedLeaderElection returns false so the manager places this runnable in the
// "Others" group, which starts before the LeaderElection group (controllers).
func (ir *informerRunnable) NeedLeaderElection() bool {
return false
}
3 changes: 1 addition & 2 deletions internal/controller/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ func ConfigMapPredicate(ds datastore.Datastore, cfg *config.Config) predicate.Pr
namespace := obj.GetNamespace()
systemNamespace := config.SystemNamespace()

// Well-known ConfigMap names
// Well-known ConfigMap names that have reconcile handlers
wellKnownNames := map[string]bool{
config.ConfigMapName(): true,
config.SaturationConfigMapName(): true,
config.DefaultScaleToZeroConfigMapName: true,
config.QMAnalyzerConfigMapName(): true,
Expand Down
1 change: 0 additions & 1 deletion internal/engines/pipeline/enforcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,3 @@ func updateDecisionAction(d *interfaces.VariantDecision, optimizerName string) {
}
d.Reason = fmt.Sprintf("V2 %s (optimizer: %s, enforced)", d.Action, optimizerName)
}

16 changes: 8 additions & 8 deletions internal/engines/pipeline/greedy_score_optimizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,10 +967,10 @@ var _ = Describe("GreedyByScoreOptimizer", func() {
ModelID: "model-1",
Namespace: "default",
Result: &interfaces.AnalyzerResult{
ModelID: "model-1",
Namespace: "default",
AnalyzedAt: time.Now(),
SpareCapacity: 50000,
ModelID: "model-1",
Namespace: "default",
AnalyzedAt: time.Now(),
SpareCapacity: 50000,
VariantCapacities: []interfaces.VariantCapacity{
{VariantName: "expensive", AcceleratorName: "A100", Cost: 15.0, ReplicaCount: 3, PerReplicaCapacity: 20000},
{VariantName: "cheap", AcceleratorName: "A100", Cost: 5.0, ReplicaCount: 3, PerReplicaCapacity: 10000},
Expand Down Expand Up @@ -998,10 +998,10 @@ var _ = Describe("GreedyByScoreOptimizer", func() {
ModelID: "model-1",
Namespace: "default",
Result: &interfaces.AnalyzerResult{
ModelID: "model-1",
Namespace: "default",
AnalyzedAt: time.Now(),
SpareCapacity: 80000, // enough to remove all
ModelID: "model-1",
Namespace: "default",
AnalyzedAt: time.Now(),
SpareCapacity: 80000, // enough to remove all
VariantCapacities: []interfaces.VariantCapacity{
{VariantName: "keep-alive", AcceleratorName: "A100", Cost: 15.0, ReplicaCount: 2, PerReplicaCapacity: 20000},
{VariantName: "expendable", AcceleratorName: "A100", Cost: 5.0, ReplicaCount: 3, PerReplicaCapacity: 10000},
Expand Down
2 changes: 1 addition & 1 deletion internal/engines/saturation/engine_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (e *Engine) runAnalyzersAndScore(
switch aw.Name {
case "saturation":
totalWeighted += baseResult.RequiredCapacity * aw.Score
// future: case "throughput", "slo"
// future: case "throughput", "slo"
}
}

Expand Down
Loading