|
8 | 8 | "math/rand" |
9 | 9 | "os" |
10 | 10 | "strings" |
| 11 | + "time" |
11 | 12 |
|
12 | 13 | "github.com/platform9/vjailbreak/v2v-helper/pkg/utils" |
13 | 14 | corev1 "k8s.io/api/core/v1" |
@@ -233,24 +234,48 @@ func (r *Reporter) GetCutoverLabel() (string, error) { |
233 | 234 |
|
234 | 235 | func (r *Reporter) WatchPodLabels(ctx context.Context, ch chan<- string) { |
235 | 236 | go func() { |
236 | | - watch, err := r.Clientset.CoreV1().Pods(r.PodNamespace).Watch(ctx, metav1.ListOptions{ |
237 | | - FieldSelector: fmt.Sprintf("metadata.name=%s", r.PodName), |
238 | | - }) |
239 | | - if err != nil { |
240 | | - fmt.Printf("Failed to watch pod labels: %v\n", err) |
241 | | - } |
242 | | - defer watch.Stop() |
243 | | - originalStartCutover := "no" |
244 | | - for event := range watch.ResultChan() { |
245 | | - pod, ok := event.Object.(*corev1.Pod) |
246 | | - if !ok { |
247 | | - continue |
248 | | - } |
249 | | - if cutover, ok := pod.Labels["startCutover"]; ok { |
250 | | - if cutover != originalStartCutover { |
251 | | - ch <- cutover |
252 | | - originalStartCutover = cutover |
| 237 | + for { |
| 238 | + select { |
| 239 | + case <-ctx.Done(): |
| 240 | + fmt.Printf("Error: Context canceled for pod %s: %v\n", r.PodName, ctx.Err()) |
| 241 | + return |
| 242 | + default: |
| 243 | + fmt.Printf("Info: Starting watch for pod %s in namespace %s\n", r.PodName, r.PodNamespace) |
| 244 | + timeoutSeconds := int64(172800) |
| 245 | + watch, err := r.Clientset.CoreV1().Pods(r.PodNamespace).Watch(ctx, metav1.ListOptions{ |
| 246 | + FieldSelector: fmt.Sprintf("metadata.name=%s", r.PodName), |
| 247 | + TimeoutSeconds: &timeoutSeconds, |
| 248 | + }) |
| 249 | + if err != nil { |
| 250 | + fmt.Printf("Error: Failed to start watch for pod %s: %v\n", r.PodName, err) |
| 251 | + time.Sleep(5 * time.Second) |
| 252 | + continue |
| 253 | + } |
| 254 | + fmt.Printf("Info: Watch established for pod %s with timeout %d seconds\n", r.PodName, timeoutSeconds) |
| 255 | + defer watch.Stop() |
| 256 | + originalStartCutover := "no" |
| 257 | + fmt.Printf("Info: Entering event loop for pod %s\n", r.PodName) |
| 258 | + for event := range watch.ResultChan() { |
| 259 | + pod, ok := event.Object.(*corev1.Pod) |
| 260 | + if !ok { |
| 261 | + fmt.Printf("Error: Received non-pod event for pod %s: %v\n", r.PodName, event.Object) |
| 262 | + continue |
| 263 | + } |
| 264 | + if cutover, ok := pod.Labels["startCutover"]; ok { |
| 265 | + if cutover != originalStartCutover { |
| 266 | + fmt.Printf("Info: Label changed for pod %s: %s -> %s\n", r.PodName, originalStartCutover, cutover) |
| 267 | + select { |
| 268 | + case ch <- cutover: |
| 269 | + fmt.Printf("Info: Sent label %s for pod %s to channel\n", cutover, r.PodName) |
| 270 | + default: |
| 271 | + fmt.Printf("Error: Channel blocked when sending label %s for pod %s\n", cutover, r.PodName) |
| 272 | + } |
| 273 | + originalStartCutover = cutover |
| 274 | + } |
| 275 | + } |
253 | 276 | } |
| 277 | + fmt.Printf("Info: Watch channel closed for pod %s after ~%d seconds, retrying...\n", r.PodName, timeoutSeconds) |
| 278 | + time.Sleep(5 * time.Second) |
254 | 279 | } |
255 | 280 | } |
256 | 281 | }() |
|
0 commit comments