Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions pkg/pool-manager/pod_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (p *PoolManager) allocatePodFromPool(network *multus.NetworkSelectionElemen
return macAddr.String(), nil
}

// paginatePodsInManagedNamespaces performs pod list requests with pagination, but only for managed namespaces
// paginatePodsInManagedNamespaces performs pod list requests with pagination cluster-wide, filtering for managed namespaces
func (p *PoolManager) paginatePodsInManagedNamespaces(limit int64, f func(pods *corev1.PodList) error) error {
managedNamespaces, err := p.getManagedNamespaces(podsWebhookName)
if err != nil {
Expand All @@ -205,48 +205,54 @@ func (p *PoolManager) paginatePodsInManagedNamespaces(limit int64, f func(pods *
return nil
}

for _, namespace := range managedNamespaces {
log.V(1).Info("processing pods in managed namespace", "namespace", namespace)
err := p.paginatePodsInNamespace(namespace, limit, f)
if err != nil {
return errors.Wrapf(err, "failed to process pods in namespace %s", namespace)
}
}

return nil
}

// paginatePodsInNamespace performs pods list request with pagination for a specific namespace
func (p *PoolManager) paginatePodsInNamespace(namespace string, limit int64, f func(pods *corev1.PodList) error) error {
continueFlag := ""
totalProcessed := 0
totalFiltered := 0

for {
pods := corev1.PodList{}
err := p.kubeClient.List(context.TODO(), &pods, &client.ListOptions{
Namespace: namespace,
Limit: limit,
Continue: continueFlag,
pods := &corev1.PodList{}
err := p.kubeClient.List(context.TODO(), pods, &client.ListOptions{
Limit: limit,
Continue: continueFlag,
})
if err != nil {
return err
return errors.Wrap(err, "failed to list pods cluster-wide")
}

err = f(&pods)
if err != nil {
return err
filteredPods := &corev1.PodList{}
for _, pod := range pods.Items {
if _, ok := managedNamespaces[pod.Namespace]; ok {
filteredPods.Items = append(filteredPods.Items, pod)
}
}

totalProcessed += len(pods.Items)
totalFiltered += len(filteredPods.Items)

if len(filteredPods.Items) > 0 {
err = f(filteredPods)
if err != nil {
return err
}
}

continueFlag = pods.GetContinue()
log.V(1).Info("limit Pod list in namespace", "namespace", namespace, "pods len", len(pods.Items), "remaining", pods.GetRemainingItemCount(), "continue", continueFlag)
if continueFlag == "" {
break
}
}

log.Info("completed pod listing",
"totalPodsProcessed", totalProcessed,
"totalPodsInManagedNamespaces", totalFiltered,
"managedNamespaces", len(managedNamespaces))

return nil
}

func (p *PoolManager) initPodMap() error {
log.V(1).Info("start InitMaps to reserve existing mac addresses before allocation new ones")
err := p.paginatePodsInManagedNamespaces(100, func(pods *corev1.PodList) error {
err := p.paginatePodsInManagedNamespaces(500, func(pods *corev1.PodList) error {
for _, pod := range pods.Items {
log.V(1).Info("InitMaps for pod", "podName", pod.Name, "podNamespace", pod.Namespace)
if pod.Annotations == nil {
Expand Down
41 changes: 27 additions & 14 deletions pkg/pool-manager/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (p *PoolManager) IsReady() bool {
}

// getManagedNamespaces pre-computes which namespaces are managed by kubemacpool for a specific webhook
func (p *PoolManager) getManagedNamespaces(webhookName string) ([]string, error) {
func (p *PoolManager) getManagedNamespaces(webhookName string) (map[string]struct{}, error) {
log.V(1).Info("computing managed namespaces for initialization", "webhookName", webhookName)

webhook, err := p.lookupWebhookInMutatingWebhookConfig(mutatingWebhookConfigName, webhookName)
Expand All @@ -159,25 +159,38 @@ func (p *PoolManager) getManagedNamespaces(webhookName string) ([]string, error)
return nil, errors.Wrapf(err, "failed to get opt-mode for webhook %s", webhookName)
}

namespaces := &v1.NamespaceList{}
err = p.kubeClient.List(context.TODO(), namespaces)
if err != nil {
return nil, errors.Wrapf(err, "failed to list namespaces for webhook %s", webhookName)
}
managedNamespaces := make(map[string]struct{})
continueFlag := ""
const pageSize int64 = 500

var managedNamespaces []string
for _, ns := range namespaces.Items {
managed, err := isNamespaceManagedFromObject(&ns, webhook.NamespaceSelector, vmOptMode)
for {
namespaces := &v1.NamespaceList{}
err = p.kubeClient.List(context.TODO(), namespaces, &client.ListOptions{
Limit: pageSize,
Continue: continueFlag,
})
if err != nil {
log.Error(err, "failed to check if namespace is managed, skipping", "namespace", ns.Name, "webhookName", webhookName)
continue
return nil, errors.Wrapf(err, "failed to list namespaces for webhook %s", webhookName)
}

for _, ns := range namespaces.Items {
managed, err := isNamespaceManagedFromObject(&ns, webhook.NamespaceSelector, vmOptMode)
if err != nil {
log.Error(err, "failed to check if namespace is managed, skipping", "namespace", ns.Name, "webhookName", webhookName)
continue
}
if managed {
managedNamespaces[ns.Name] = struct{}{}
}
}
if managed {
managedNamespaces = append(managedNamespaces, ns.Name)

continueFlag = namespaces.GetContinue()
if continueFlag == "" {
break
}
}

log.Info("computed managed namespaces", "webhookName", webhookName, "count", len(managedNamespaces), "namespaces", managedNamespaces)
log.Info("computed managed namespaces", "webhookName", webhookName, "count", len(managedNamespaces))
return managedNamespaces, nil
}

Expand Down
52 changes: 29 additions & 23 deletions pkg/pool-manager/virtualmachine_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (p *PoolManager) initMacMapFromCluster(parentLogger logr.Logger) error {
return nil
}

// paginateVmsInManagedNamespaces performs VM list requests with pagination, but only for managed namespaces
// paginateVmsInManagedNamespaces performs VM list requests with pagination cluster-wide, filtering for managed namespaces
func (p *PoolManager) paginateVmsInManagedNamespaces(limit int64, vmsFunc func(vms *kubevirt.VirtualMachineList) error) error {
managedNamespaces, err := p.getManagedNamespaces(virtualMachnesWebhookName)
if err != nil {
Expand All @@ -321,49 +321,55 @@ func (p *PoolManager) paginateVmsInManagedNamespaces(limit int64, vmsFunc func(v
return nil
}

for _, namespace := range managedNamespaces {
log.V(1).Info("processing VMs in managed namespace", "namespace", namespace)
err := p.paginateVmsInNamespace(namespace, limit, vmsFunc)
if err != nil {
return errors.Wrapf(err, "failed to process VMs in namespace %s", namespace)
}
}

return nil
}

// paginateVmsInNamespace performs VM list request with pagination for a specific namespace
func (p *PoolManager) paginateVmsInNamespace(namespace string, limit int64, vmsFunc func(vms *kubevirt.VirtualMachineList) error) error {
continueFlag := ""
totalProcessed := 0
totalFiltered := 0

for {
vms := &kubevirt.VirtualMachineList{}
err := p.kubeClient.List(context.TODO(), vms, &client.ListOptions{
Namespace: namespace,
Limit: limit,
Continue: continueFlag,
Limit: limit,
Continue: continueFlag,
})
if err != nil {
return err
return errors.Wrap(err, "failed to list VMs cluster-wide")
}

err = vmsFunc(vms)
if err != nil {
return err
filteredVMs := &kubevirt.VirtualMachineList{}
for _, vm := range vms.Items {
if _, ok := managedNamespaces[vm.Namespace]; ok {
filteredVMs.Items = append(filteredVMs.Items, vm)
}
}

totalProcessed += len(vms.Items)
totalFiltered += len(filteredVMs.Items)

if len(filteredVMs.Items) > 0 {
err = vmsFunc(filteredVMs)
if err != nil {
return err
}
}

continueFlag = vms.GetContinue()
log.V(1).Info("limit vms list in namespace", "namespace", namespace, "vms len", len(vms.Items), "remaining", vms.GetRemainingItemCount(), "continue", continueFlag)
if continueFlag == "" {
break
}
}

log.Info("completed VM listing",
"totalVMsProcessed", totalProcessed,
"totalVMsInManagedNamespaces", totalFiltered,
"managedNamespaces", len(managedNamespaces))

return nil
}

// forEachManagedVmInterfaceInClusterRunFunction gets all the macs from all the supported interfaces in all the managed cluster vms, and runs
// a function vmInterfacesFunc on it
func (p *PoolManager) forEachManagedVmInterfaceInClusterRunFunction(vmInterfacesFunc func(vmFullName string, iface kubevirt.Interface, networks map[string]kubevirt.Network) error) error {
err := p.paginateVmsInManagedNamespaces(100, func(vms *kubevirt.VirtualMachineList) error {
err := p.paginateVmsInManagedNamespaces(500, func(vms *kubevirt.VirtualMachineList) error {
logger := log.WithName("forEachManagedVmInterfaceInClusterRunFunction")
for _, vm := range vms.Items {
vmFullName := VmNamespaced(&vm)
Expand Down