Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 66 additions & 18 deletions cluster-autoscaler/capacitybuffer/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ import (
// PodTemplateRefIndex is the name of the index for buffers referencing a pod template
const PodTemplateRefIndex = "podTemplateRef"

// ScalableRefIndex is the name of the index for buffers referencing a scalable object
const ScalableRefIndex = "scalableRef"

// CapacityBufferClient represents client for v1 capacitybuffer CRD.
type CapacityBufferClient struct {
buffersClient capacitybuffer.Interface
Expand All @@ -71,9 +74,14 @@ type CapacityBufferClient struct {
rqLister corev1listers.ResourceQuotaLister

// Informers
bufferInformer cache.SharedIndexInformer
podTemplateInformer cache.SharedIndexInformer
resourceQuotaInformer cache.SharedIndexInformer
bufferInformer cache.SharedIndexInformer
podTemplateInformer cache.SharedIndexInformer
resourceQuotaInformer cache.SharedIndexInformer
deploymentInformer cache.SharedIndexInformer
replicaSetInformer cache.SharedIndexInformer
statefulSetInformer cache.SharedIndexInformer
jobInformer cache.SharedIndexInformer
replicationControllerInformer cache.SharedIndexInformer
}

// NewCapacityBufferClient returns a capacityBufferClient.
Expand Down Expand Up @@ -152,6 +160,16 @@ func NewCapacityBufferClientFromClients(buffersClient capacitybuffer.Interface,
}
return []string{}, nil
},
ScalableRefIndex: func(obj interface{}) ([]string, error) {
buffer, ok := obj.(*v1.CapacityBuffer)
if !ok {
return []string{}, nil
}
if buffer.Spec.ScalableRef != nil {
return []string{buffer.Spec.ScalableRef.Name}, nil
}
return []string{}, nil
},
})
if err != nil {
return nil, fmt.Errorf("failed to add indexers: %v", err)
Expand All @@ -168,21 +186,26 @@ func NewCapacityBufferClientFromClients(buffersClient capacitybuffer.Interface,

factory := informers.NewSharedInformerFactory(kubernetesClient, defaultResyncPeriod)
bufferClient := &CapacityBufferClient{
buffersClient: buffersClient,
kubernetesClient: kubernetesClient,
scaleGetter: scaleGetter,
scaleMapper: scaleMapper,
buffersLister: buffersLister,
podTemplateLister: factory.Core().V1().PodTemplates().Lister(),
replicaSetsLister: factory.Apps().V1().ReplicaSets().Lister(),
statefulSetsLister: factory.Apps().V1().StatefulSets().Lister(),
jobsLister: factory.Batch().V1().Jobs().Lister(),
deploymentLister: factory.Apps().V1().Deployments().Lister(),
replicationContLister: factory.Core().V1().ReplicationControllers().Lister(),
rqLister: factory.Core().V1().ResourceQuotas().Lister(),
bufferInformer: bufferInformer,
podTemplateInformer: factory.Core().V1().PodTemplates().Informer(),
resourceQuotaInformer: factory.Core().V1().ResourceQuotas().Informer(),
buffersClient: buffersClient,
kubernetesClient: kubernetesClient,
scaleGetter: scaleGetter,
scaleMapper: scaleMapper,
buffersLister: buffersLister,
podTemplateLister: factory.Core().V1().PodTemplates().Lister(),
replicaSetsLister: factory.Apps().V1().ReplicaSets().Lister(),
statefulSetsLister: factory.Apps().V1().StatefulSets().Lister(),
jobsLister: factory.Batch().V1().Jobs().Lister(),
deploymentLister: factory.Apps().V1().Deployments().Lister(),
replicationContLister: factory.Core().V1().ReplicationControllers().Lister(),
rqLister: factory.Core().V1().ResourceQuotas().Lister(),
bufferInformer: bufferInformer,
podTemplateInformer: factory.Core().V1().PodTemplates().Informer(),
resourceQuotaInformer: factory.Core().V1().ResourceQuotas().Informer(),
deploymentInformer: factory.Apps().V1().Deployments().Informer(),
replicaSetInformer: factory.Apps().V1().ReplicaSets().Informer(),
statefulSetInformer: factory.Apps().V1().StatefulSets().Informer(),
jobInformer: factory.Batch().V1().Jobs().Informer(),
replicationControllerInformer: factory.Core().V1().ReplicationControllers().Informer(),
}
factory.Start(stopChannel)
informersSynced = factory.WaitForCacheSync(stopChannel)
Expand All @@ -209,6 +232,31 @@ func (c *CapacityBufferClient) GetResourceQuotaInformer() cache.SharedIndexInfor
return c.resourceQuotaInformer
}

// GetDeploymentInformer returns the informer for Deployment resource.
func (c *CapacityBufferClient) GetDeploymentInformer() cache.SharedIndexInformer {
return c.deploymentInformer
}

// GetReplicaSetInformer returns the informer for ReplicaSet resource.
func (c *CapacityBufferClient) GetReplicaSetInformer() cache.SharedIndexInformer {
return c.replicaSetInformer
}

// GetStatefulSetInformer returns the informer for StatefulSet resource.
func (c *CapacityBufferClient) GetStatefulSetInformer() cache.SharedIndexInformer {
return c.statefulSetInformer
}

// GetJobInformer returns the informer for Job resource.
func (c *CapacityBufferClient) GetJobInformer() cache.SharedIndexInformer {
return c.jobInformer
}

// GetReplicationControllerInformer returns the informer for ReplicationController resource.
func (c *CapacityBufferClient) GetReplicationControllerInformer() cache.SharedIndexInformer {
return c.replicationControllerInformer
}

// ListCapacityBuffers lists all Capacity buffer.
func (c *CapacityBufferClient) ListCapacityBuffers(namespace string) ([]*v1.CapacityBuffer, error) {
if c.buffersLister == nil {
Expand Down
154 changes: 153 additions & 1 deletion cluster-autoscaler/capacitybuffer/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
Expand All @@ -39,6 +40,7 @@ import (
filters "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/filters"
cbmetrics "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/metrics"
translators "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/translators"
scalableobject "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/translators/scalable_objects"
updater "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/updater"
"k8s.io/utils/clock"
)
Expand Down Expand Up @@ -205,7 +207,126 @@ func (c *bufferController) configureEventHandlers() {
c.enqueueBuffersReferencingPodTemplate(obj)
},
})
// TODO: scalable objects

// Deployment Informer
c.client.GetDeploymentInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.DeploymentKind)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldMeta, err := meta.Accessor(oldObj)
if err != nil {
return
}
newMeta, err := meta.Accessor(newObj)
if err != nil {
return
}
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
return
}
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupApps, scalableobject.DeploymentKind)
},
DeleteFunc: func(obj interface{}) {
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.DeploymentKind)
},
})

// ReplicaSet Informer
c.client.GetReplicaSetInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.ReplicaSetKind)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldMeta, err := meta.Accessor(oldObj)
if err != nil {
return
}
newMeta, err := meta.Accessor(newObj)
if err != nil {
return
}
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
return
}
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupApps, scalableobject.ReplicaSetKind)
},
DeleteFunc: func(obj interface{}) {
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.ReplicaSetKind)
},
})

// StatefulSet Informer
c.client.GetStatefulSetInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.StatefulSetKind)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldMeta, err := meta.Accessor(oldObj)
if err != nil {
return
}
newMeta, err := meta.Accessor(newObj)
if err != nil {
return
}
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
return
}
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupApps, scalableobject.StatefulSetKind)
},
DeleteFunc: func(obj interface{}) {
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.StatefulSetKind)
},
})

// Job Informer
c.client.GetJobInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupBatch, scalableobject.JobKind)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldMeta, err := meta.Accessor(oldObj)
if err != nil {
return
}
newMeta, err := meta.Accessor(newObj)
if err != nil {
return
}
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
return
}
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupBatch, scalableobject.JobKind)
},
DeleteFunc: func(obj interface{}) {
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupBatch, scalableobject.JobKind)
},
})

// ReplicationController Informer
c.client.GetReplicationControllerInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupCore, scalableobject.ReplicationControllerKind)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldMeta, err := meta.Accessor(oldObj)
if err != nil {
return
}
newMeta, err := meta.Accessor(newObj)
if err != nil {
return
}
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
return
}
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupCore, scalableobject.ReplicationControllerKind)
},
DeleteFunc: func(obj interface{}) {
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupCore, scalableobject.ReplicationControllerKind)
},
})
}

func (c *bufferController) enqueueNamespace(obj interface{}) {
Expand Down Expand Up @@ -253,6 +374,37 @@ func (c *bufferController) enqueueBuffersReferencingPodTemplate(obj interface{})
}
}

func (c *bufferController) enqueueBuffersReferencingScalableObject(obj interface{}, apiGroup, kind string) {
object, err := meta.Accessor(obj)
if err != nil {
// handle tombstone
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
if cast, ok := tombstone.Obj.(metav1.Object); ok {
object = cast
}
}
}
if object == nil {
return
}

// Use indexer to find buffers referencing this scalable object
buffers, err := c.client.GetBufferInformer().GetIndexer().ByIndex(cbclient.ScalableRefIndex, object.GetName())
if err != nil {
runtime.HandleError(fmt.Errorf("error looking up buffers for scalable object %s/%s: %w", kind, object.GetName(), err))
return
}

for _, obj := range buffers {
buffer := obj.(*v1.CapacityBuffer)
if buffer.Namespace == object.GetNamespace() && buffer.Spec.ScalableRef != nil &&
buffer.Spec.ScalableRef.Kind == kind && buffer.Spec.ScalableRef.APIGroup == apiGroup {
c.queue.Add(buffer.Namespace)
return // we reconcile the whole namespace, so finding one buffer is enough to trigger it.
}
}
}

// Run to run the controller reconcile loop
func (c *bufferController) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
Expand Down
Loading