Skip to content

Commit 9522222

Browse files
committed
fix(controller): multi node snapshot
1 parent 4febb4b commit 9522222

5 files changed

Lines changed: 76 additions & 31 deletions

File tree

api/v1beta1/snapshot_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ const (
116116

117117
type SnapShotStatusNode struct {
118118
Name string `json:"name"`
119-
Addr string `json:"addr"`
119+
DeamonsetAddr string `json:"deamonsetAddr"`
120120
DeamonsetPort int32 `json:"deamonsetPort"`
121121
KubeletPort int32 `json:"kubeletPort"`
122122
}

config/crd/bases/stove8s.bud.studio_snapshots.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ spec:
4949
format: int64
5050
type: integer
5151
policy:
52+
description: SnapShotInputPolicy will(TODO:) add support for Replace,
53+
etc ...
5254
type: string
5355
required:
5456
- delay
@@ -109,7 +111,7 @@ spec:
109111
type: string
110112
node:
111113
properties:
112-
addr:
114+
deamonsetAddr:
113115
type: string
114116
deamonsetPort:
115117
format: int32
@@ -120,7 +122,7 @@ spec:
120122
name:
121123
type: string
122124
required:
123-
- addr
125+
- deamonsetAddr
124126
- deamonsetPort
125127
- kubeletPort
126128
- name

dist/chart/templates/crd/stove8s.bud.studio_snapshots.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ spec:
5555
format: int64
5656
type: integer
5757
policy:
58+
description: SnapShotInputPolicy will(TODO:) add support for Replace,
59+
etc ...
5860
type: string
5961
required:
6062
- delay
@@ -115,7 +117,7 @@ spec:
115117
type: string
116118
node:
117119
properties:
118-
addr:
120+
deamonsetAddr:
119121
type: string
120122
deamonsetPort:
121123
format: int32
@@ -126,7 +128,7 @@ spec:
126128
name:
127129
type: string
128130
required:
129-
- addr
131+
- deamonsetAddr
130132
- deamonsetPort
131133
- kubeletPort
132134
- name

dist/chart/templates/daemonset/service.yaml

Lines changed: 0 additions & 18 deletions
This file was deleted.

internal/controller/snapshot_controller.go

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ import (
2828
"os"
2929
"slices"
3030

31+
appsv1 "k8s.io/api/apps/v1"
3132
corev1 "k8s.io/api/core/v1"
3233
apierrors "k8s.io/apimachinery/pkg/api/errors"
34+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3335
"k8s.io/apimachinery/pkg/runtime"
3436
apitypes "k8s.io/apimachinery/pkg/types"
3537
ctrl "sigs.k8s.io/controller-runtime"
@@ -43,9 +45,10 @@ import (
4345
)
4446

4547
const (
46-
podCaCertPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
47-
podTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
48-
daemonsetNodePort = 31008
48+
podCaCertPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
49+
podTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
50+
daemonsetPort = 80
51+
daemonsetName = "stove8s-daemonset"
4952
)
5053

5154
// SnapShotReconciler reconciles a SnapShot object
@@ -193,15 +196,26 @@ func (r *SnapShotReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
193196
}
194197

195198
if snapshot.Status.Node == (stove8sv1beta1.SnapShotStatusNode{}) {
196-
nodeName, nodeAddr, kubeletPort, err := r.kubeletEndpointFromPod(ctx, pod)
199+
daemonSetPodIP, err := r.getDaemonSetPodIPOnNode(
200+
ctx,
201+
snapshot.Namespace,
202+
daemonsetName,
203+
pod.Spec.NodeName,
204+
)
205+
if err != nil {
206+
log.Error(err, "unable to get deamonset endpoint for pod")
207+
return ctrl.Result{}, err
208+
}
209+
nodeName, _, kubeletPort, err := r.kubeletEndpointFromPod(ctx, pod)
197210
if err != nil {
198211
log.Error(err, "unable to get kubelet endpoint for pod")
199212
return ctrl.Result{}, err
200213
}
214+
201215
snapshot.Status.Node = stove8sv1beta1.SnapShotStatusNode{
202216
Name: nodeName,
203-
Addr: nodeAddr,
204-
DeamonsetPort: daemonsetNodePort,
217+
DeamonsetAddr: daemonSetPodIP,
218+
DeamonsetPort: daemonsetPort,
205219
KubeletPort: kubeletPort,
206220
}
207221
if err := r.Status().Update(ctx, snapshot); err != nil {
@@ -311,7 +325,7 @@ func daemonsetStausFetch(
311325
jobID string,
312326
node stove8sv1beta1.SnapShotStatusNode,
313327
) (*oci.Status, error) {
314-
ociEndpoint := fmt.Sprintf("http://%s:%v/oci/%s", node.Addr, node.DeamonsetPort, jobID)
328+
ociEndpoint := fmt.Sprintf("http://%s:%v/oci/%s", node.DeamonsetAddr, node.DeamonsetPort, jobID)
315329
req, err := http.NewRequest(http.MethodGet, ociEndpoint, nil)
316330
if err != nil {
317331
return nil, err
@@ -353,7 +367,7 @@ func daemonsetInit(
353367
return "", err
354368
}
355369

356-
ociEndpoint := fmt.Sprintf("http://%s:%v/oci", node.Addr, node.DeamonsetPort)
370+
ociEndpoint := fmt.Sprintf("http://%s:%v/oci", node.DeamonsetAddr, node.DeamonsetPort)
357371
req, err := http.NewRequest(http.MethodPost, ociEndpoint, bytes.NewBuffer(jsonData))
358372
if err != nil {
359373
return "", err
@@ -397,6 +411,51 @@ func (r *SnapShotReconciler) kubeletEndpointFromPod(ctx context.Context, pod *co
397411
return name, addr, port, nil
398412
}
399413

414+
func (r *SnapShotReconciler) getDaemonSetPodIPOnNode(
415+
ctx context.Context,
416+
daemonSetNamespace string,
417+
daemonSetName string,
418+
nodeName string,
419+
) (string, error) {
420+
ds := &appsv1.DaemonSet{}
421+
err := r.Get(ctx, apitypes.NamespacedName{
422+
Namespace: daemonSetNamespace,
423+
Name: daemonSetName,
424+
}, ds)
425+
if err != nil {
426+
return "", fmt.Errorf("failed to get DaemonSet %s/%s: %w", daemonSetNamespace, daemonSetName, err)
427+
}
428+
429+
selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
430+
if err != nil {
431+
return "", fmt.Errorf("invalid selector in DaemonSet %s/%s: %w", daemonSetNamespace, daemonSetName, err)
432+
}
433+
434+
podList := &corev1.PodList{}
435+
err = r.List(ctx, podList,
436+
client.InNamespace(daemonSetName),
437+
client.MatchingLabelsSelector{Selector: selector},
438+
client.MatchingFields{".spec.nodeName": nodeName},
439+
)
440+
if err != nil {
441+
return "", fmt.Errorf("failed to list pods for DaemonSet %s on node %s: %w", daemonSetName, nodeName, err)
442+
}
443+
444+
if len(podList.Items) == 0 {
445+
return "", fmt.Errorf("no pod found for DaemonSet %s on node %s", daemonSetName, nodeName)
446+
} else if len(podList.Items) > 1 {
447+
return "", fmt.Errorf("Multiple pods found for DaemonSet %s on node %s", daemonSetName, nodeName)
448+
}
449+
450+
pod := &podList.Items[0]
451+
// looking for 0.0.0.0 to check if CNI fails (IP exhaustion, etc...)
452+
if pod.Status.PodIP == "" || pod.Status.PodIP == "0.0.0.0" {
453+
return "", fmt.Errorf("pod %s/%s has no assigned IP yet", daemonSetNamespace, pod.Name)
454+
}
455+
456+
return pod.Status.PodIP, nil
457+
}
458+
400459
func (r *SnapShotReconciler) checkpoint(ctx context.Context, pod *corev1.Pod, containerName string) (string, error) {
401460
_, nodeAddr, kubeletPort, err := r.kubeletEndpointFromPod(ctx, pod)
402461
if err != nil {

0 commit comments

Comments
 (0)