Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/kube-ovn-v2/templates/rbac/ovn-CR.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ rules:
verbs:
- get
- list
- watch
- apiGroups:
- "policy.networking.k8s.io"
resources:
Expand Down
1 change: 1 addition & 0 deletions charts/kube-ovn/templates/ovn-CR.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ rules:
verbs:
- get
- list
- watch
- apiGroups:
- "policy.networking.k8s.io"
resources:
Expand Down
1 change: 1 addition & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3593,6 +3593,7 @@ rules:
verbs:
- get
- list
- watch
- apiGroups:
- "policy.networking.k8s.io"
resources:
Expand Down
18 changes: 5 additions & 13 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ type Controller struct {
csrSynced cache.InformerSynced
addOrUpdateCsrQueue workqueue.TypedRateLimitingInterface[string]

vmiMigrationSynced cache.InformerSynced
addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string]
deleteVMQueue workqueue.TypedRateLimitingInterface[string]
Comment thread
oilbeater marked this conversation as resolved.
kubevirtInformerFactory informer.KubeVirtInformerFactory

recorder record.EventRecorder
Expand Down Expand Up @@ -367,7 +367,6 @@ func Run(ctx context.Context, config *Configuration) {
anpInformer := anpInformerFactory.Policy().V1alpha1().AdminNetworkPolicies()
banpInformer := anpInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies()
csrInformer := informerFactory.Certificates().V1().CertificateSigningRequests()
vmiMigrationInformer := kubevirtInformerFactory.VirtualMachineInstanceMigration()

numKeyLocks := max(runtime.NumCPU()*2, config.WorkerNum*2)
controller := &Controller{
Expand Down Expand Up @@ -550,8 +549,8 @@ func Run(ctx context.Context, config *Configuration) {
csrSynced: csrInformer.Informer().HasSynced,
addOrUpdateCsrQueue: newTypedRateLimitingQueue[string]("AddOrUpdateCSR", custCrdRateLimiter),

vmiMigrationSynced: vmiMigrationInformer.HasSynced,
addOrUpdateVMIMigrationQueue: newTypedRateLimitingQueue[string]("AddOrUpdateVMIMigration", nil),
deleteVMQueue: newTypedRateLimitingQueue[string]("DeleteVM", nil),
Comment thread
oilbeater marked this conversation as resolved.
kubevirtInformerFactory: kubevirtInformerFactory,

recorder: recorder,
Expand Down Expand Up @@ -638,6 +637,7 @@ func Run(ctx context.Context, config *Configuration) {
controller.deployInformerFactory.Start(ctx.Done())
controller.kubeovnInformerFactory.Start(ctx.Done())
controller.anpInformerFactory.Start(ctx.Done())
controller.StartKubevirtInformerFactory(ctx, kubevirtInformerFactory)
Comment thread
oilbeater marked this conversation as resolved.

klog.Info("Waiting for informer caches to sync")
cacheSyncs := []cache.InformerSynced{
Expand Down Expand Up @@ -913,16 +913,6 @@ func Run(ctx context.Context, config *Configuration) {
}
}

if config.EnableLiveMigrationOptimize {
if _, err = vmiMigrationInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueAddVMIMigration,
UpdateFunc: controller.enqueueUpdateVMIMigration,
}); err != nil {
util.LogFatalAndExit(err, "failed to add VMI Migration event handler")
}
controller.StartMigrationInformerFactory(ctx, kubevirtInformerFactory)
}

controller.Run(ctx)
}

Expand Down Expand Up @@ -1374,6 +1364,8 @@ func (c *Controller) startWorkers(ctx context.Context) {
go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done())
}

go wait.Until(runWorker("delete vm", c.deleteVMQueue, c.handleDeleteVM), time.Second, ctx.Done())
Comment thread
oilbeater marked this conversation as resolved.

go wait.Until(c.dbStatus, 15*time.Second, ctx.Done())
}

Expand Down
83 changes: 73 additions & 10 deletions pkg/controller/kubevirt.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -34,6 +35,48 @@ func (c *Controller) enqueueUpdateVMIMigration(oldObj, newObj any) {
}
}

func (c *Controller) enqueueDeleteVM(obj any) {
key := cache.MetaObjectToName(obj.(*kubevirtv1.VirtualMachine)).String()
klog.Infof("enqueue add VM %s", key)
c.deleteVMQueue.Add(key)
}

func (c *Controller) handleDeleteVM(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid vm key: %s", key))
return nil
}
vmKey := fmt.Sprintf("%s/%s", namespace, name)

ports, err := c.OVNNbClient.ListNormalLogicalSwitchPorts(true, map[string]string{"pod": vmKey})
if err != nil {
klog.Errorf("failed to list lsps of vm %s: %v", vmKey, err)
return err
}

for _, port := range ports {
if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), port.Name, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", port.Name, err)
return err
}
}

subnetName := port.ExternalIDs["ls"]
if subnetName != "" {
c.ipam.ReleaseAddressByNic(vmKey, port.Name, subnetName)
}

if err := c.OVNNbClient.DeleteLogicalSwitchPort(port.Name); err != nil {
klog.Errorf("failed to delete lsp %s, %v", port.Name, err)
return err
}
}

return nil
}

func (c *Controller) handleAddOrUpdateVMIMigration(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand Down Expand Up @@ -127,27 +170,47 @@ func (c *Controller) handleAddOrUpdateVMIMigration(key string) error {
return nil
}

func (c *Controller) isVMIMigrationCRDInstalled() bool {
_, err := c.config.ExtClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), "virtualmachineinstancemigrations.kubevirt.io", metav1.GetOptions{})
if err != nil {
return false
func (c *Controller) isKubevirtCRDInstalled() bool {
for _, crd := range util.KubeVirtCRD {
_, err := c.config.ExtClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), crd, metav1.GetOptions{})
if err != nil {
return false
}
}
klog.Info("Found KubeVirt VMI Migration CRD")
klog.Info("Found KubeVirt CRDs")
return true
}

func (c *Controller) StartMigrationInformerFactory(ctx context.Context, kubevirtInformerFactory informer.KubeVirtInformerFactory) {
func (c *Controller) StartKubevirtInformerFactory(ctx context.Context, kubevirtInformerFactory informer.KubeVirtInformerFactory) {
ticker := time.NewTicker(10 * time.Second)
go func() {
defer ticker.Stop()
for {
select {
case <-ticker.C:
if c.isVMIMigrationCRDInstalled() {
klog.Info("Start VMI migration informer")
if c.isKubevirtCRDInstalled() {
klog.Info("Start kubevirt informer")
vmiMigrationInformer := kubevirtInformerFactory.VirtualMachineInstanceMigration()
vmInformer := kubevirtInformerFactory.VirtualMachine()

kubevirtInformerFactory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), c.vmiMigrationSynced) {
util.LogFatalAndExit(nil, "failed to wait for vmi migration caches to sync")
if !cache.WaitForCacheSync(ctx.Done(), vmiMigrationInformer.HasSynced, vmInformer.HasSynced) {
util.LogFatalAndExit(nil, "failed to wait for kubevirt caches to sync")
}

if c.config.EnableLiveMigrationOptimize {
if _, err := vmiMigrationInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueAddVMIMigration,
UpdateFunc: c.enqueueUpdateVMIMigration,
}); err != nil {
util.LogFatalAndExit(err, "failed to add VMI Migration event handler")
}
}

if _, err := vmInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.enqueueDeleteVM,
}); err != nil {
util.LogFatalAndExit(err, "failed to add vm event handler")
}
return
Comment thread
oilbeater marked this conversation as resolved.
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/informer/kubevirt.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type KubeVirtInformerFactory interface {

// Watches VirtualMachineInstanceMigration objects
VirtualMachineInstanceMigration() cache.SharedIndexInformer

// Watches VirtualMachine objects
VirtualMachine() cache.SharedIndexInformer
Comment thread
oilbeater marked this conversation as resolved.
}

type kubeInformerFactory struct {
Expand Down Expand Up @@ -150,6 +153,13 @@ func (f *kubeInformerFactory) VirtualMachineInstanceMigration() cache.SharedInde
})
}

func (f *kubeInformerFactory) VirtualMachine() cache.SharedIndexInformer {
return f.getInformer("vmInformer", func() cache.SharedIndexInformer {
lw := cache.NewListWatchFromClient(f.restClient, "virtualmachines", k8sv1.NamespaceAll, fields.Everything())
return cache.NewSharedIndexInformer(lw, &kubev1.VirtualMachine{}, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
})
Comment thread
oilbeater marked this conversation as resolved.
}

// resyncPeriod computes the time interval a shared informer waits before resyncing with the api server
func resyncPeriod(minResyncPeriod time.Duration) time.Duration {
// #nosec no need for better randomness
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,5 @@ const (
MasqueradeExternalLBAccessMac = "00:00:00:01:00:01"
MasqueradeCheckIP = "0.0.0.0"
)

var KubeVirtCRD = []string{"virtualmachineinstancemigrations.kubevirt.io", "virtualmachines.kubevirt.io"}
Loading