Skip to content

Migrate virtual instances using virtctl (KubeVirt) #1094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ RUN set -ex \
&& cp /dist/kured_${TARGETOS}_${TARGETARCH}${SUFFIX}/kured /dist/kured;

FROM alpine:3.21.3@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c
RUN apk update --no-cache && apk upgrade --no-cache && apk add --no-cache ca-certificates tzdata
RUN echo testing https://dl-cdn.alpinelinux.org/alpine/edge/testing >> /etc/apk/repositories && \
apk update --no-cache && apk upgrade --no-cache && apk add --no-cache ca-certificates tzdata virtctl
COPY --from=bin /dist/kured /usr/bin/kured
ENTRYPOINT ["/usr/bin/kured"]
38 changes: 38 additions & 0 deletions cmd/kured/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/kubereboot/kured/pkg/checkers"
"github.com/kubereboot/kured/pkg/daemonsetlock"
"github.com/kubereboot/kured/pkg/delaytick"
"github.com/kubereboot/kured/pkg/evacuators"
"github.com/kubereboot/kured/pkg/reboot"
"github.com/kubereboot/kured/pkg/taints"
"github.com/kubereboot/kured/pkg/timewindow"
Expand Down Expand Up @@ -404,6 +405,38 @@ func stripQuotes(str string) string {
return str
}

func evacuate(client *kubernetes.Clientset, node *v1.Node) error {
var err error

nodename := node.GetName()

drainer := &kubectldrain.Helper{
Client: client,
Ctx: context.Background(),
GracePeriodSeconds: drainGracePeriod,
PodSelector: drainPodSelector,
SkipWaitForDeleteTimeoutSeconds: skipWaitForDeleteTimeoutSeconds,
Force: true,
DeleteEmptyDirData: true,
IgnoreAllDaemonSets: true,
ErrOut: os.Stderr,
Out: os.Stdout,
Timeout: drainTimeout,
}

if err = kubectldrain.RunCordonOrUncordon(drainer, node, true); err != nil {
log.Errorf("Error cordonning %s: %v", nodename, err)
return err
}

kubeVirtEvacuator, err := evacuators.NewKubeVirtEvacuator(nodename, client)
if err != nil {
return err
}

return kubeVirtEvacuator.Evacuate()
}

func drain(client *kubernetes.Clientset, node *v1.Node) error {
nodename := node.GetName()

Expand Down Expand Up @@ -676,6 +709,11 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
continue
}
}
// Evacuate VM
err = evacuate(client, node)
if err != nil {
log.Errorf("Error trying to live migrate VMs: %v", err)
}

err = drain(client, node)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions kured-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ metadata:
rules:
# Allow kured to read spec.unschedulable
# Allow kubectl to drain/uncordon
# Allow kubectl to migrate KubeVirt instances
#
# NB: These permissions are tightly coupled to the bundled version of kubectl; the ones below
# match https://github.com/kubernetes/kubernetes/blob/v1.19.4/staging/src/k8s.io/kubectl/pkg/cmd/drain/drain.go
Expand All @@ -22,6 +23,9 @@ rules:
- apiGroups: [""]
resources: ["pods/eviction"]
verbs: ["create"]
- apiGroups: [ "subresources.kubevirt.io" ]
resources: [ "virtualmachines/migrate" ]
verbs: ["update" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
6 changes: 6 additions & 0 deletions pkg/evacuators/evacuator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package evacuators

// Evacuator is an interface used to implement business logic needed by some components before rebooting a node
type Evacuator interface {
Evacuate() error
}
194 changes: 194 additions & 0 deletions pkg/evacuators/kubevirt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package evacuators

import (
"context"
"fmt"
"os/exec"
"sync"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

log "github.com/sirupsen/logrus"
)

// KubeVirtEvacuator implements Evacuator interface, managing KubeVirt instances
type KubeVirtEvacuator struct {
client *kubernetes.Clientset // Kubernetes client object inherited from the caller
nodeID string // Kubernetes Node ID
errors []error // Errors created by the evacuation threads

sleepingTime time.Duration // Time in seconds between each VM state change
timeoutCounter int // Number of times checking each VM state change

mutex sync.Mutex // Used to start non-threadsafe commands
}

// NewKubeVirtEvacuator is the constructor
func NewKubeVirtEvacuator(nodeID string, client *kubernetes.Clientset) (*KubeVirtEvacuator, error) {
var result KubeVirtEvacuator
var err error

if client == nil {
err = fmt.Errorf("NewKubeVirtEvacuator: the given clientset is nil")
}

if len(nodeID) == 0 {
err = fmt.Errorf("NewKubeVirtEvacuator: the given nodeID is empty")
}

result.nodeID = nodeID
result.client = client
result.sleepingTime = 30
result.timeoutCounter = 40

return &result, err
}

// Evacuate start the live migration process of the hosted virtual instances
func (k *KubeVirtEvacuator) Evacuate() (err error) {
log.Infof("Evacuate: migration configuration is %v retries every %v", k.timeoutCounter, k.sleepingTime*time.Second)

vms, err := k.getVMRunningOnNode()

if err == nil {
k.startAsyncEvacuation(vms)

for {
if k.timeoutCounter == 0 {
err = fmt.Errorf("Evacuate: timeout exceeded")
break
}

log.Infof("EvacuateVM: %v retries left. %v remaining instances on the node", k.timeoutCounter, vms.Size())

vms, err = k.getVMRunningOnNode()
if err != nil {
err = fmt.Errorf("%v errors occured", len(k.errors))
break
}

if vms.Size() == 0 {
log.Info("Evacuate: Completed.")
break
}

k.countDown()
}
}

return err
}

// startAsyncEvacuation starts one evacuateVM fonction per VM
func (k *KubeVirtEvacuator) startAsyncEvacuation(vms *v1.PodList) {
for _, vm := range vms.Items {
go k.evacuateVM(&vm)
}
}

// countDown counts down the timer
func (k *KubeVirtEvacuator) countDown() {
time.Sleep(k.sleepingTime * time.Second)
k.timeoutCounter = k.timeoutCounter - 1
}

// getVMRunningOnNode gets the virt-launcher pods running on the node
func (k *KubeVirtEvacuator) getVMRunningOnNode() (*v1.PodList, error) {
labelSelector := "kubevirt.io=virt-launcher"
fieldSelector := fmt.Sprintf("spec.nodeName=%s", k.nodeID)

return k.client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
FieldSelector: fieldSelector,
})
}

// evacuateVM starts and monitors the migration of the given virtual instance
func (k *KubeVirtEvacuator) evacuateVM(vm *v1.Pod) {
var newNode string
var err error

vmName := vm.Labels["kubevirt.io/vm"]

if len(vmName) > 2 {
logPrefix := fmt.Sprintf("evacuateVM: %s (ns: %s, pod %s).", vmName, vm.Namespace, vm.Name)
shellCommand := exec.Command("/usr/bin/virtctl", "migrate", vmName, "-n", vm.Namespace)

log.Infof("%s Evacuating from %s", logPrefix, k.nodeID)

k.execCommand(shellCommand)
if err != nil {
err = fmt.Errorf("%s %v", logPrefix, err)
} else {
for {
newNode, err = k.getNodeOfVM(vmName)
if err != nil {
break
}

if k.checkMigrationCompletion(logPrefix, newNode) {
time.Sleep(k.sleepingTime * time.Second)
} else {
break
}
}
}

k.appendError(err)
} else {
log.Infof("given pod %s (ns %s) has an empty VM name. Skipping", vm.Name, vm.Namespace)
}
}

// checkMigrationCompletion return true if the migration is completed
func (k *KubeVirtEvacuator) checkMigrationCompletion(logPrefix, newNode string) (result bool) {
if k.nodeID == newNode {
log.Infof("%s Still on %v", logPrefix, newNode)
} else {
log.Infof("%s Completed.", logPrefix)
result = true
}

return result
}

// appendError append the given error to the internal errors array in a threadsafe way
func (k *KubeVirtEvacuator) appendError(err error) {
if err != nil {
k.mutex.Lock()
k.errors = append(k.errors, err) // TODO: is append threadsafe?
k.mutex.Unlock()
}
}

// execCommand starts the given command in a threadsafe way
func (k *KubeVirtEvacuator) execCommand(command *exec.Cmd) (err error) {
k.mutex.Lock()
defer k.mutex.Unlock()

return command.Run()
}

// getNodeOfVM provides the node ID hosting the given virtual instance
func (k *KubeVirtEvacuator) getNodeOfVM(vmName string) (result string, err error) {
var podList *v1.PodList

if len(vmName) == 0 {
err = fmt.Errorf("getNodeOfVM: the given VM name is empty")
} else {
labelSelector := fmt.Sprintf("kubevirt.io/vm=%s", vmName)

podList, err = k.client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
})
}

if err == nil && podList != nil {
result = podList.Items[0].Spec.NodeName
}

return result, err
}