Skip to content

Commit 0bf8475

Browse files
authored
Optimize init pool: Reduce VM/Pod API calls (#577)
* pool/initMaps: Add pagination to getManagedNamespaces to reduce memory usage Introduce pagination with 500 namespaces per page when gathering managed namespaces during initialization. This prevents loading all namespaces into memory at once in clusters with thousands of namespaces. Signed-off-by: Ram Lavi <ralavi@redhat.com> * Refactor getManagedNamespaces to return a set Change getManagedNamespaces to return map[string]struct{} instead of []string, making it more idiomatic Go. This set will be used in future commits to easily check whether a ns is managed or not. Signed-off-by: Ram Lavi <ralavi@redhat.com> * Optimize VM initialization with cluster-wide listing Currently during VM pool map init, only managed namespaces are called, but for each we do an API call, which can be slow in clusters with large number of namespaces. Replace per-namespace VM listing with cluster-wide listing and in-memory filtering, while keeping the pagination to avoid over using the memory. This dramatically reduces API calls during initialization. The Trade-off is fetching VMs from unmanaged namespaces then filters them out, but the network overhead is negligible compared to the latency savings from fewer round trips. Signed-off-by: Ram Lavi <ralavi@redhat.com> * Increase pagination limit to 500 for VM initialization Increase the pagination limit from 100 to 500 when fetching VMs during initialization. This further reduces API calls by fetching larger batches. Signed-off-by: Ram Lavi <ralavi@redhat.com> * Optimize Pod initialization with cluster-wide listing Currently during Pod pool map init, only managed namespaces are called, but for each we do an API call, which can be slow in clusters with large number of namespaces. Replace per-namespace Pod listing with cluster-wide listing and in-memory filtering, while keeping the pagination to avoid over using the memory. This dramatically reduces API calls during initialization. The Trade-off is fetching Pods from unmanaged namespaces then filters them out, but the network overhead is negligible compared to the latency savings from fewer round trips. Signed-off-by: Ram Lavi <ralavi@redhat.com> * Increase pagination limit to 500 for Pod initialization Increase the pagination limit from 100 to 500 when fetching Pods during initialization. This further reduces API calls by fetching larger batches. Signed-off-by: Ram Lavi <ralavi@redhat.com> --------- Signed-off-by: Ram Lavi <ralavi@redhat.com>
1 parent d6411fd commit 0bf8475

File tree

3 files changed

+87
-62
lines changed

3 files changed

+87
-62
lines changed

pkg/pool-manager/pod_pool.go

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (p *PoolManager) allocatePodFromPool(network *multus.NetworkSelectionElemen
193193
return macAddr.String(), nil
194194
}
195195

196-
// paginatePodsInManagedNamespaces performs pod list requests with pagination, but only for managed namespaces
196+
// paginatePodsInManagedNamespaces performs pod list requests with pagination cluster-wide, filtering for managed namespaces
197197
func (p *PoolManager) paginatePodsInManagedNamespaces(limit int64, f func(pods *corev1.PodList) error) error {
198198
managedNamespaces, err := p.getManagedNamespaces(podsWebhookName)
199199
if err != nil {
@@ -205,48 +205,54 @@ func (p *PoolManager) paginatePodsInManagedNamespaces(limit int64, f func(pods *
205205
return nil
206206
}
207207

208-
for _, namespace := range managedNamespaces {
209-
log.V(1).Info("processing pods in managed namespace", "namespace", namespace)
210-
err := p.paginatePodsInNamespace(namespace, limit, f)
211-
if err != nil {
212-
return errors.Wrapf(err, "failed to process pods in namespace %s", namespace)
213-
}
214-
}
215-
216-
return nil
217-
}
218-
219-
// paginatePodsInNamespace performs pods list request with pagination for a specific namespace
220-
func (p *PoolManager) paginatePodsInNamespace(namespace string, limit int64, f func(pods *corev1.PodList) error) error {
221208
continueFlag := ""
209+
totalProcessed := 0
210+
totalFiltered := 0
211+
222212
for {
223-
pods := corev1.PodList{}
224-
err := p.kubeClient.List(context.TODO(), &pods, &client.ListOptions{
225-
Namespace: namespace,
226-
Limit: limit,
227-
Continue: continueFlag,
213+
pods := &corev1.PodList{}
214+
err := p.kubeClient.List(context.TODO(), pods, &client.ListOptions{
215+
Limit: limit,
216+
Continue: continueFlag,
228217
})
229218
if err != nil {
230-
return err
219+
return errors.Wrap(err, "failed to list pods cluster-wide")
231220
}
232221

233-
err = f(&pods)
234-
if err != nil {
235-
return err
222+
filteredPods := &corev1.PodList{}
223+
for _, pod := range pods.Items {
224+
if _, ok := managedNamespaces[pod.Namespace]; ok {
225+
filteredPods.Items = append(filteredPods.Items, pod)
226+
}
227+
}
228+
229+
totalProcessed += len(pods.Items)
230+
totalFiltered += len(filteredPods.Items)
231+
232+
if len(filteredPods.Items) > 0 {
233+
err = f(filteredPods)
234+
if err != nil {
235+
return err
236+
}
236237
}
237238

238239
continueFlag = pods.GetContinue()
239-
log.V(1).Info("limit Pod list in namespace", "namespace", namespace, "pods len", len(pods.Items), "remaining", pods.GetRemainingItemCount(), "continue", continueFlag)
240240
if continueFlag == "" {
241241
break
242242
}
243243
}
244+
245+
log.Info("completed pod listing",
246+
"totalPodsProcessed", totalProcessed,
247+
"totalPodsInManagedNamespaces", totalFiltered,
248+
"managedNamespaces", len(managedNamespaces))
249+
244250
return nil
245251
}
246252

247253
func (p *PoolManager) initPodMap() error {
248254
log.V(1).Info("start InitMaps to reserve existing mac addresses before allocation new ones")
249-
err := p.paginatePodsInManagedNamespaces(100, func(pods *corev1.PodList) error {
255+
err := p.paginatePodsInManagedNamespaces(500, func(pods *corev1.PodList) error {
250256
for _, pod := range pods.Items {
251257
log.V(1).Info("InitMaps for pod", "podName", pod.Name, "podNamespace", pod.Namespace)
252258
if pod.Annotations == nil {

pkg/pool-manager/pool.go

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (p *PoolManager) IsReady() bool {
146146
}
147147

148148
// getManagedNamespaces pre-computes which namespaces are managed by kubemacpool for a specific webhook
149-
func (p *PoolManager) getManagedNamespaces(webhookName string) ([]string, error) {
149+
func (p *PoolManager) getManagedNamespaces(webhookName string) (map[string]struct{}, error) {
150150
log.V(1).Info("computing managed namespaces for initialization", "webhookName", webhookName)
151151

152152
webhook, err := p.lookupWebhookInMutatingWebhookConfig(mutatingWebhookConfigName, webhookName)
@@ -159,25 +159,38 @@ func (p *PoolManager) getManagedNamespaces(webhookName string) ([]string, error)
159159
return nil, errors.Wrapf(err, "failed to get opt-mode for webhook %s", webhookName)
160160
}
161161

162-
namespaces := &v1.NamespaceList{}
163-
err = p.kubeClient.List(context.TODO(), namespaces)
164-
if err != nil {
165-
return nil, errors.Wrapf(err, "failed to list namespaces for webhook %s", webhookName)
166-
}
162+
managedNamespaces := make(map[string]struct{})
163+
continueFlag := ""
164+
const pageSize int64 = 500
167165

168-
var managedNamespaces []string
169-
for _, ns := range namespaces.Items {
170-
managed, err := isNamespaceManagedFromObject(&ns, webhook.NamespaceSelector, vmOptMode)
166+
for {
167+
namespaces := &v1.NamespaceList{}
168+
err = p.kubeClient.List(context.TODO(), namespaces, &client.ListOptions{
169+
Limit: pageSize,
170+
Continue: continueFlag,
171+
})
171172
if err != nil {
172-
log.Error(err, "failed to check if namespace is managed, skipping", "namespace", ns.Name, "webhookName", webhookName)
173-
continue
173+
return nil, errors.Wrapf(err, "failed to list namespaces for webhook %s", webhookName)
174+
}
175+
176+
for _, ns := range namespaces.Items {
177+
managed, err := isNamespaceManagedFromObject(&ns, webhook.NamespaceSelector, vmOptMode)
178+
if err != nil {
179+
log.Error(err, "failed to check if namespace is managed, skipping", "namespace", ns.Name, "webhookName", webhookName)
180+
continue
181+
}
182+
if managed {
183+
managedNamespaces[ns.Name] = struct{}{}
184+
}
174185
}
175-
if managed {
176-
managedNamespaces = append(managedNamespaces, ns.Name)
186+
187+
continueFlag = namespaces.GetContinue()
188+
if continueFlag == "" {
189+
break
177190
}
178191
}
179192

180-
log.Info("computed managed namespaces", "webhookName", webhookName, "count", len(managedNamespaces), "namespaces", managedNamespaces)
193+
log.Info("computed managed namespaces", "webhookName", webhookName, "count", len(managedNamespaces))
181194
return managedNamespaces, nil
182195
}
183196

pkg/pool-manager/virtualmachine_pool.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ func (p *PoolManager) initMacMapFromCluster(parentLogger logr.Logger) error {
309309
return nil
310310
}
311311

312-
// paginateVmsInManagedNamespaces performs VM list requests with pagination, but only for managed namespaces
312+
// paginateVmsInManagedNamespaces performs VM list requests with pagination cluster-wide, filtering for managed namespaces
313313
func (p *PoolManager) paginateVmsInManagedNamespaces(limit int64, vmsFunc func(vms *kubevirt.VirtualMachineList) error) error {
314314
managedNamespaces, err := p.getManagedNamespaces(virtualMachnesWebhookName)
315315
if err != nil {
@@ -321,49 +321,55 @@ func (p *PoolManager) paginateVmsInManagedNamespaces(limit int64, vmsFunc func(v
321321
return nil
322322
}
323323

324-
for _, namespace := range managedNamespaces {
325-
log.V(1).Info("processing VMs in managed namespace", "namespace", namespace)
326-
err := p.paginateVmsInNamespace(namespace, limit, vmsFunc)
327-
if err != nil {
328-
return errors.Wrapf(err, "failed to process VMs in namespace %s", namespace)
329-
}
330-
}
331-
332-
return nil
333-
}
334-
335-
// paginateVmsInNamespace performs VM list request with pagination for a specific namespace
336-
func (p *PoolManager) paginateVmsInNamespace(namespace string, limit int64, vmsFunc func(vms *kubevirt.VirtualMachineList) error) error {
337324
continueFlag := ""
325+
totalProcessed := 0
326+
totalFiltered := 0
327+
338328
for {
339329
vms := &kubevirt.VirtualMachineList{}
340330
err := p.kubeClient.List(context.TODO(), vms, &client.ListOptions{
341-
Namespace: namespace,
342-
Limit: limit,
343-
Continue: continueFlag,
331+
Limit: limit,
332+
Continue: continueFlag,
344333
})
345334
if err != nil {
346-
return err
335+
return errors.Wrap(err, "failed to list VMs cluster-wide")
347336
}
348337

349-
err = vmsFunc(vms)
350-
if err != nil {
351-
return err
338+
filteredVMs := &kubevirt.VirtualMachineList{}
339+
for _, vm := range vms.Items {
340+
if _, ok := managedNamespaces[vm.Namespace]; ok {
341+
filteredVMs.Items = append(filteredVMs.Items, vm)
342+
}
343+
}
344+
345+
totalProcessed += len(vms.Items)
346+
totalFiltered += len(filteredVMs.Items)
347+
348+
if len(filteredVMs.Items) > 0 {
349+
err = vmsFunc(filteredVMs)
350+
if err != nil {
351+
return err
352+
}
352353
}
353354

354355
continueFlag = vms.GetContinue()
355-
log.V(1).Info("limit vms list in namespace", "namespace", namespace, "vms len", len(vms.Items), "remaining", vms.GetRemainingItemCount(), "continue", continueFlag)
356356
if continueFlag == "" {
357357
break
358358
}
359359
}
360+
361+
log.Info("completed VM listing",
362+
"totalVMsProcessed", totalProcessed,
363+
"totalVMsInManagedNamespaces", totalFiltered,
364+
"managedNamespaces", len(managedNamespaces))
365+
360366
return nil
361367
}
362368

363369
// forEachManagedVmInterfaceInClusterRunFunction gets all the macs from all the supported interfaces in all the managed cluster vms, and runs
364370
// a function vmInterfacesFunc on it
365371
func (p *PoolManager) forEachManagedVmInterfaceInClusterRunFunction(vmInterfacesFunc func(vmFullName string, iface kubevirt.Interface, networks map[string]kubevirt.Network) error) error {
366-
err := p.paginateVmsInManagedNamespaces(100, func(vms *kubevirt.VirtualMachineList) error {
372+
err := p.paginateVmsInManagedNamespaces(500, func(vms *kubevirt.VirtualMachineList) error {
367373
logger := log.WithName("forEachManagedVmInterfaceInClusterRunFunction")
368374
for _, vm := range vms.Items {
369375
vmFullName := VmNamespaced(&vm)

0 commit comments

Comments
 (0)