Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 42 additions & 17 deletions v2v-helper/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/rand"
"os"
"strings"
"time"

"github.com/platform9/vjailbreak/v2v-helper/pkg/utils"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -233,24 +234,48 @@ func (r *Reporter) GetCutoverLabel() (string, error) {

func (r *Reporter) WatchPodLabels(ctx context.Context, ch chan<- string) {
go func() {
watch, err := r.Clientset.CoreV1().Pods(r.PodNamespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", r.PodName),
})
if err != nil {
fmt.Printf("Failed to watch pod labels: %v\n", err)
}
defer watch.Stop()
originalStartCutover := "no"
for event := range watch.ResultChan() {
pod, ok := event.Object.(*corev1.Pod)
if !ok {
continue
}
if cutover, ok := pod.Labels["startCutover"]; ok {
if cutover != originalStartCutover {
ch <- cutover
originalStartCutover = cutover
for {
select {
case <-ctx.Done():
fmt.Printf("Error: Context canceled for pod %s: %v\n", r.PodName, ctx.Err())
return
default:
fmt.Printf("Info: Starting watch for pod %s in namespace %s\n", r.PodName, r.PodNamespace)
timeoutSeconds := int64(172800)
watch, err := r.Clientset.CoreV1().Pods(r.PodNamespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", r.PodName),
TimeoutSeconds: &timeoutSeconds,
})
if err != nil {
fmt.Printf("Error: Failed to start watch for pod %s: %v\n", r.PodName, err)
time.Sleep(5 * time.Second)
continue
}
Comment on lines +245 to +253
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spai-p9 Won't this create infinite watchers?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will hog resources on the pod

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only creates one watcher and the inner loop is a blocking for loop which does not exits until the channel gets closed, hence only one watch is present at a given particular time. the outer for loop is to ensure that if a channel gets closed due to some reason we create a new watch so we don't miss any update.

fmt.Printf("Info: Watch established for pod %s with timeout %d seconds\n", r.PodName, timeoutSeconds)
defer watch.Stop()
Comment thread
OmkarDeshpande7 marked this conversation as resolved.
originalStartCutover := "no"
fmt.Printf("Info: Entering event loop for pod %s\n", r.PodName)
for event := range watch.ResultChan() {
pod, ok := event.Object.(*corev1.Pod)
if !ok {
fmt.Printf("Error: Received non-pod event for pod %s: %v\n", r.PodName, event.Object)
continue
}
if cutover, ok := pod.Labels["startCutover"]; ok {
if cutover != originalStartCutover {
fmt.Printf("Info: Label changed for pod %s: %s -> %s\n", r.PodName, originalStartCutover, cutover)
select {
case ch <- cutover:
fmt.Printf("Info: Sent label %s for pod %s to channel\n", cutover, r.PodName)
default:
fmt.Printf("Error: Channel blocked when sending label %s for pod %s\n", cutover, r.PodName)
}
originalStartCutover = cutover
}
}
}
fmt.Printf("Info: Watch channel closed for pod %s after ~%d seconds, retrying...\n", r.PodName, timeoutSeconds)
time.Sleep(5 * time.Second)
}
}
}()
Expand Down
Loading