-
Notifications
You must be signed in to change notification settings - Fork 526
Expand file tree
/
Copy pathkubevirt.go
More file actions
222 lines (194 loc) · 7.62 KB
/
kubevirt.go
File metadata and controls
222 lines (194 loc) · 7.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package controller
import (
"context"
"fmt"
"time"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
kubevirtv1 "kubevirt.io/api/core/v1"
"github.com/kubeovn/kube-ovn/pkg/informer"
"github.com/kubeovn/kube-ovn/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/util"
)
func (c *Controller) enqueueAddVMIMigration(obj any) {
key := cache.MetaObjectToName(obj.(*kubevirtv1.VirtualMachineInstanceMigration)).String()
klog.Infof("enqueue add VMI migration %s", key)
c.addOrUpdateVMIMigrationQueue.Add(key)
}
func (c *Controller) enqueueUpdateVMIMigration(oldObj, newObj any) {
oldVmi := oldObj.(*kubevirtv1.VirtualMachineInstanceMigration)
newVmi := newObj.(*kubevirtv1.VirtualMachineInstanceMigration)
if !newVmi.DeletionTimestamp.IsZero() ||
oldVmi.Status.Phase != newVmi.Status.Phase {
key := cache.MetaObjectToName(newVmi).String()
klog.Infof("enqueue update VMI migration %s", key)
c.addOrUpdateVMIMigrationQueue.Add(key)
}
}
func (c *Controller) enqueueDeleteVM(obj any) {
key := cache.MetaObjectToName(obj.(*kubevirtv1.VirtualMachine)).String()
klog.Infof("enqueue add VM %s", key)
c.deleteVMQueue.Add(key)
}
func (c *Controller) handleDeleteVM(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid vm key: %s", key))
return nil
}
vmKey := fmt.Sprintf("%s/%s", namespace, name)
ports, err := c.OVNNbClient.ListNormalLogicalSwitchPorts(true, map[string]string{"pod": vmKey})
if err != nil {
klog.Errorf("failed to list lsps of vm %s: %v", vmKey, err)
return err
}
for _, port := range ports {
if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), port.Name, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", port.Name, err)
return err
}
}
subnetName := port.ExternalIDs["ls"]
if subnetName != "" {
c.ipam.ReleaseAddressByNic(vmKey, port.Name, subnetName)
}
if err := c.OVNNbClient.DeleteLogicalSwitchPort(port.Name); err != nil {
klog.Errorf("failed to delete lsp %s, %v", port.Name, err)
return err
}
}
return nil
}
func (c *Controller) handleAddOrUpdateVMIMigration(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
vmiMigration, err := c.config.KubevirtClient.VirtualMachineInstanceMigration(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get VMI migration by key %s: %w", key, err))
return err
}
if vmiMigration.Status.MigrationState == nil {
klog.V(3).Infof("VirtualMachineInstanceMigration %s migration state is nil, skipping", key)
return nil
}
if vmiMigration.Status.MigrationState.Completed {
klog.V(3).Infof("VirtualMachineInstanceMigration %s migration state is completed, skipping", key)
return nil
}
vmi, err := c.config.KubevirtClient.VirtualMachineInstance(namespace).Get(context.TODO(), vmiMigration.Spec.VMIName, metav1.GetOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get VMI by name %s: %w", vmiMigration.Spec.VMIName, err))
return err
}
if vmi.Status.MigrationState == nil {
klog.Infof("VMI instance %s migration state is nil, skipping", key)
return nil
}
if vmi.Status.MigrationState.SourcePod == "" {
klog.Infof("VMI instance %s source pod is nil, skipping", key)
return nil
}
// use VirtualMachineInsance's MigrationState because VirtualMachineInsanceMigration's MigrationState is not updated util migration finished
klog.Infof("current vmiMigration %s status %s, target Node %s, source Node %s, target Pod %s, source Pod %s", key,
vmiMigration.Status.Phase,
vmi.Status.MigrationState.TargetNode,
vmi.Status.MigrationState.SourceNode,
vmi.Status.MigrationState.TargetPod,
vmi.Status.MigrationState.SourcePod)
sourcePodName := vmi.Status.MigrationState.SourcePod
sourcePod, err := c.config.KubeClient.CoreV1().Pods(namespace).Get(context.TODO(), sourcePodName, metav1.GetOptions{})
if err != nil {
err = fmt.Errorf("failed to get source pod %s, %w", sourcePodName, err)
klog.Error(err)
return err
}
podNets, err := c.getPodKubeovnNets(sourcePod)
if err != nil {
err = fmt.Errorf("failed to get pod nets %w", err)
klog.Error(err)
return err
}
needAllocatePodNets := needAllocateSubnets(sourcePod, podNets)
for _, podNet := range needAllocatePodNets {
portName := ovs.PodNameToPortName(vmiMigration.Spec.VMIName, vmiMigration.Namespace, podNet.ProviderName)
srcNodeName := vmi.Status.MigrationState.SourceNode
targetNodeName := vmi.Status.MigrationState.TargetNode
switch vmiMigration.Status.Phase {
case kubevirtv1.MigrationRunning:
klog.Infof("migrate start set options for lsp %s from %s to %s", portName, srcNodeName, targetNodeName)
if err := c.OVNNbClient.SetLogicalSwitchPortMigrateOptions(portName, srcNodeName, targetNodeName); err != nil {
err = fmt.Errorf("failed to set migrate options for lsp %s, %w", portName, err)
klog.Error(err)
return err
}
case kubevirtv1.MigrationSucceeded:
klog.Infof("migrate end reset options for lsp %s from %s to %s, migrated succeed", portName, srcNodeName, targetNodeName)
if err := c.OVNNbClient.ResetLogicalSwitchPortMigrateOptions(portName, srcNodeName, targetNodeName, false); err != nil {
err = fmt.Errorf("failed to clean migrate options for lsp %s, %w", portName, err)
klog.Error(err)
return err
}
case kubevirtv1.MigrationFailed:
klog.Infof("migrate end reset options for lsp %s from %s to %s, migrated fail", portName, srcNodeName, targetNodeName)
if err := c.OVNNbClient.ResetLogicalSwitchPortMigrateOptions(portName, srcNodeName, targetNodeName, true); err != nil {
err = fmt.Errorf("failed to clean migrate options for lsp %s, %w", portName, err)
klog.Error(err)
return err
}
}
}
return nil
}
func (c *Controller) isKubevirtCRDInstalled() bool {
for _, crd := range util.KubeVirtCRD {
_, err := c.config.ExtClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), crd, metav1.GetOptions{})
if err != nil {
return false
}
}
klog.Info("Found KubeVirt CRDs")
return true
}
func (c *Controller) StartKubevirtInformerFactory(ctx context.Context, kubevirtInformerFactory informer.KubeVirtInformerFactory) {
ticker := time.NewTicker(10 * time.Second)
go func() {
defer ticker.Stop()
for {
select {
case <-ticker.C:
if c.isKubevirtCRDInstalled() {
klog.Info("Start kubevirt informer")
vmiMigrationInformer := kubevirtInformerFactory.VirtualMachineInstanceMigration()
vmInformer := kubevirtInformerFactory.VirtualMachine()
kubevirtInformerFactory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), vmiMigrationInformer.HasSynced, vmInformer.HasSynced) {
util.LogFatalAndExit(nil, "failed to wait for kubevirt caches to sync")
}
if c.config.EnableLiveMigrationOptimize {
if _, err := vmiMigrationInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueAddVMIMigration,
UpdateFunc: c.enqueueUpdateVMIMigration,
}); err != nil {
util.LogFatalAndExit(err, "failed to add VMI Migration event handler")
}
}
if _, err := vmInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.enqueueDeleteVM,
}); err != nil {
util.LogFatalAndExit(err, "failed to add vm event handler")
}
return
}
case <-ctx.Done():
return
}
}
}()
}