From c74330acf2ee5b49baa1b323169101124d75812d Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Tue, 11 Mar 2025 16:36:59 +0200 Subject: [PATCH 01/22] Starting loadtest code --- loadtest/castai.go | 110 ++++++++++++++++++++++++++++++++++++++++++++ loadtest/config.go | 19 ++++++++ loadtest/http.go | 71 ++++++++++++++++++++++++++++ loadtest/prepare.go | 28 +++++++++++ 4 files changed, 228 insertions(+) create mode 100644 loadtest/castai.go create mode 100644 loadtest/config.go create mode 100644 loadtest/http.go create mode 100644 loadtest/prepare.go diff --git a/loadtest/castai.go b/loadtest/castai.go new file mode 100644 index 00000000..46af9c84 --- /dev/null +++ b/loadtest/castai.go @@ -0,0 +1,110 @@ +package loadtest + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/samber/lo" + + "github.com/castai/cluster-controller/internal/castai" +) + +// CastAITestServer acts as simple cluster hub mock replacement. +// It exposes a way to "push" actions to the cluster controller via GetActionsPushChannel +// and can be used as an implementation of the server interface that cluster controller expects to call. +type CastAITestServer struct { + log *slog.Logger + actionsPushChannel chan castai.ClusterAction + cfg TestServerConfig +} + +type TestServerConfig struct { + MaxActionsPerCall int + TimeoutWaitingForActions time.Duration + BufferSize int +} + +func NewTestServer(logger *slog.Logger, cfg TestServerConfig) *CastAITestServer { + return &CastAITestServer{ + log: logger, + actionsPushChannel: make(chan castai.ClusterAction, cfg.BufferSize), + cfg: cfg, + } +} + +// GetActionsPushChannel returns a channel that can be used to push actions into the queue to be picked up by CC. +func (c *CastAITestServer) GetActionsPushChannel() chan<- castai.ClusterAction { + return c.actionsPushChannel +} + +func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai.ClusterAction, error) { + c.log.Info(fmt.Sprintf("GetActions called, have %d items in buffer", len(c.actionsPushChannel))) + actionsToReturn := make([]*castai.ClusterAction, 0) + + // Wait for at least one action to arrive from whoever is pushing them. + // If none arrive, we simulate the "empty poll" case of cluster-hub and return empty list. + select { + case x := <-c.actionsPushChannel: + actionsToReturn = append(actionsToReturn, &x) + case <-time.After(c.cfg.TimeoutWaitingForActions): + c.log.Info("No actions to return in %v", c.cfg.TimeoutWaitingForActions) + return nil, nil + case <-ctx.Done(): + return nil, fmt.Errorf("context done with cause (%v), err (%v)", context.Cause(ctx), ctx.Err()) + } + + // Attempt to drain up to max items from the channel. + for len(actionsToReturn) <= c.cfg.MaxActionsPerCall { + select { + case x := <-c.actionsPushChannel: + actionsToReturn = append(actionsToReturn, &x) + case <-time.After(50 * time.Millisecond): + // If we haven't received enough items, just flush. + return actionsToReturn, nil + case <-ctx.Done(): + return nil, fmt.Errorf("context done with cause (%v), err (%v)", context.Cause(ctx), ctx.Err()) + } + } + + return actionsToReturn, nil +} + +func (c *CastAITestServer) AckAction(ctx context.Context, actionID string, req *castai.AckClusterActionRequest) error { + errMsg := lo.FromPtr(req.Error) + c.log.DebugContext(ctx, "action %q acknowledged; has error: %v; error: %v", actionID, req.Error != nil, errMsg) + + return nil +} + +func (c *CastAITestServer) SendAKSInitData(ctx context.Context, req *castai.AKSInitDataRequest) error { + return fmt.Errorf("not implemented; obsolete") +} + +func (c *CastAITestServer) SendLog(ctx context.Context, e *castai.LogEntry) error { + var slogLvl slog.Level + switch e.Level { + case "INFO": + slogLvl = slog.LevelInfo + case "DEBUG": + slogLvl = slog.LevelDebug + case "WARN": + slogLvl = slog.LevelWarn + case "ERROR": + slogLvl = slog.LevelError + default: + slogLvl = 100 // Some arbitrary value + } + + attrs := make([]slog.Attr, 0, len(e.Fields)) + for k, v := range e.Fields { + attrs = append(attrs, slog.Any(k, v)) + } + + msg := fmt.Sprintf("log from controller: %s", e.Message) + + c.log.LogAttrs(ctx, slogLvl, msg, attrs...) + + return nil +} diff --git a/loadtest/config.go b/loadtest/config.go new file mode 100644 index 00000000..ef46c4b1 --- /dev/null +++ b/loadtest/config.go @@ -0,0 +1,19 @@ +package loadtest + +import ( + "reflect" +) + +type Config struct { + Port int +} + +type ActionLoadTestConfig struct { + // TotalActions is the overall amount of actions of this type that will be created. + TotalActions int + // ActionsPerCall controls how many actions of the type to return on a given call. + // In the real world, this parameter is in cluster-hub side to determine how many max actions to return. + ActionsPerCall int + + ActionType reflect.Type +} diff --git a/loadtest/http.go b/loadtest/http.go new file mode 100644 index 00000000..2e85f6bb --- /dev/null +++ b/loadtest/http.go @@ -0,0 +1,71 @@ +package loadtest + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/castai/cluster-controller/internal/castai" +) + +func NewHttpServer(cfg Config, testServer *CastAITestServer) error { + + http.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions", func(w http.ResponseWriter, r *http.Request) { + result, err := testServer.GetActions(r.Context(), "") + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + response := &castai.GetClusterActionsResponse{ + Items: result, + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + return + }) + + http.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions/{action_id}/ack", func(w http.ResponseWriter, r *http.Request) { + actionID := r.PathValue("action_id") + var req castai.AckClusterActionRequest + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = testServer.AckAction(r.Context(), actionID, &req) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + return + }) + + http.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions/logs", func(w http.ResponseWriter, r *http.Request) { + var req castai.LogEntry + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = testServer.SendLog(r.Context(), &req) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + return + }) + + return http.ListenAndServe(fmt.Sprintf(":%d", cfg.Port), nil) + +} diff --git a/loadtest/prepare.go b/loadtest/prepare.go new file mode 100644 index 00000000..0ae7b608 --- /dev/null +++ b/loadtest/prepare.go @@ -0,0 +1,28 @@ +package loadtest + +import ( + v1 "k8s.io/api/apps/v1" + policyv1 "k8s.io/api/policy/v1" + "k8s.io/client-go/kubernetes" +) + +func Apply(client kubernetes.Interface, obj kubernetes.Interface) error { + return nil +} + +// DeploymentWithUnsatisfiablePDB generates a 1-replica deployment and a PDB that does not allow any disruption. +// Useful to test "stuck" drains. +func DeploymentWithUnsatisfiablePDB(client kubernetes.Interface) (*v1.Deployment, *policyv1.PodDisruptionBudget) { + //a := v1.Deployment{ + // TypeMeta: metav1.TypeMeta{}, + // ObjectMeta: metav1.ObjectMeta{}, + // Spec: v1.DeploymentSpec{}, + // Status: v1.DeploymentStatus{}, + //} + //b := policyv1.PodDisruptionBudget{} + // + ////Spec. + // + //v1.De + return nil, nil +} From f740b06f139028dfe5ec9c8299578de32c27a0e0 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Wed, 12 Mar 2025 18:08:18 +0200 Subject: [PATCH 02/22] Manual load test of drain node --- cmd/loadtest/main.go | 61 +++++++++++++++ loadtest/castai.go | 56 ++++++------- loadtest/config.go | 13 +++ loadtest/kwok.go | 183 +++++++++++++++++++++++++++++++++++++++++++ loadtest/prepare.go | 141 ++++++++++++++++++++++++++++----- 5 files changed, 404 insertions(+), 50 deletions(-) create mode 100644 cmd/loadtest/main.go create mode 100644 loadtest/kwok.go diff --git a/cmd/loadtest/main.go b/cmd/loadtest/main.go new file mode 100644 index 00000000..2cffb77a --- /dev/null +++ b/cmd/loadtest/main.go @@ -0,0 +1,61 @@ +package main + +import ( + "fmt" + "log/slog" + "os" + "time" + + "github.com/castai/cluster-controller/loadtest" +) + +// TODO: Move to corba + +func main() { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + cfg := loadtest.Config{ + Port: 8080, + } + + logger.Info("creating test server") + // TODO: Defaults... + testServer := loadtest.NewTestServer(logger, loadtest.TestServerConfig{ + BufferSize: 1000, + MaxActionsPerCall: 500, + TimeoutWaitingForActions: 60 * time.Second, + }) + + go func() { + ch := testServer.GetActionsPushChannel() + + loadtest.TestStuckDrain(ch) + //for range 10000 { + // ch <- castai.ClusterAction{ + // ID: uuid.NewString(), + // ActionCreateEvent: &castai.ActionCreateEvent{ + // Reporter: "provisioning.cast.ai", + // ObjectRef: corev1.ObjectReference{ + // Kind: "Pod", + // Namespace: "default", + // Name: "Dummy-pod", + // UID: types.UID(uuid.New().String()), + // APIVersion: "v1", + // }, + // EventTime: time.Now(), + // EventType: "Warning", + // //Reason: fmt.Sprintf("Just because! %d", i), + // Reason: "Reason", + // Action: "During node creation.", + // Message: "Oh common, you can do better.", + // }, + // } + //} + }() + + logger.Info(fmt.Sprintf("starting http server on port %d", cfg.Port)) + // TODO: Cleanup + err := loadtest.NewHttpServer(cfg, testServer) + if err != nil { + panic(err) + } +} diff --git a/loadtest/castai.go b/loadtest/castai.go index 46af9c84..aa27a6bd 100644 --- a/loadtest/castai.go +++ b/loadtest/castai.go @@ -20,12 +20,6 @@ type CastAITestServer struct { cfg TestServerConfig } -type TestServerConfig struct { - MaxActionsPerCall int - TimeoutWaitingForActions time.Duration - BufferSize int -} - func NewTestServer(logger *slog.Logger, cfg TestServerConfig) *CastAITestServer { return &CastAITestServer{ log: logger, @@ -35,6 +29,7 @@ func NewTestServer(logger *slog.Logger, cfg TestServerConfig) *CastAITestServer } // GetActionsPushChannel returns a channel that can be used to push actions into the queue to be picked up by CC. +// Don't close the returned channel or pay the consequences! func (c *CastAITestServer) GetActionsPushChannel() chan<- castai.ClusterAction { return c.actionsPushChannel } @@ -47,9 +42,10 @@ func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai. // If none arrive, we simulate the "empty poll" case of cluster-hub and return empty list. select { case x := <-c.actionsPushChannel: + c.log.Info("Received action", "action", *x.ActionDrainNode) actionsToReturn = append(actionsToReturn, &x) case <-time.After(c.cfg.TimeoutWaitingForActions): - c.log.Info("No actions to return in %v", c.cfg.TimeoutWaitingForActions) + c.log.Info(fmt.Sprintf("No actions to return in %v", c.cfg.TimeoutWaitingForActions)) return nil, nil case <-ctx.Done(): return nil, fmt.Errorf("context done with cause (%v), err (%v)", context.Cause(ctx), ctx.Err()) @@ -73,7 +69,7 @@ func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai. func (c *CastAITestServer) AckAction(ctx context.Context, actionID string, req *castai.AckClusterActionRequest) error { errMsg := lo.FromPtr(req.Error) - c.log.DebugContext(ctx, "action %q acknowledged; has error: %v; error: %v", actionID, req.Error != nil, errMsg) + c.log.DebugContext(ctx, fmt.Sprintf("action %q acknowledged; has error: %v; error: %v", actionID, req.Error != nil, errMsg)) return nil } @@ -83,28 +79,28 @@ func (c *CastAITestServer) SendAKSInitData(ctx context.Context, req *castai.AKSI } func (c *CastAITestServer) SendLog(ctx context.Context, e *castai.LogEntry) error { - var slogLvl slog.Level - switch e.Level { - case "INFO": - slogLvl = slog.LevelInfo - case "DEBUG": - slogLvl = slog.LevelDebug - case "WARN": - slogLvl = slog.LevelWarn - case "ERROR": - slogLvl = slog.LevelError - default: - slogLvl = 100 // Some arbitrary value - } - - attrs := make([]slog.Attr, 0, len(e.Fields)) - for k, v := range e.Fields { - attrs = append(attrs, slog.Any(k, v)) - } - - msg := fmt.Sprintf("log from controller: %s", e.Message) - - c.log.LogAttrs(ctx, slogLvl, msg, attrs...) + //var slogLvl slog.Level + //switch e.Level { + //case "INFO": + // slogLvl = slog.LevelInfo + //case "DEBUG": + // slogLvl = slog.LevelDebug + //case "WARN": + // slogLvl = slog.LevelWarn + //case "ERROR": + // slogLvl = slog.LevelError + //default: + // slogLvl = 100 // Some arbitrary value + //} + // + //attrs := make([]slog.Attr, 0, len(e.Fields)) + //for k, v := range e.Fields { + // attrs = append(attrs, slog.Any(k, v)) + //} + // + //msg := fmt.Sprintf("log from controller: %s", e.Message) + // + //c.log.LogAttrs(ctx, slogLvl, msg, attrs...) return nil } diff --git a/loadtest/config.go b/loadtest/config.go index ef46c4b1..a9c8cd65 100644 --- a/loadtest/config.go +++ b/loadtest/config.go @@ -2,12 +2,25 @@ package loadtest import ( "reflect" + "time" ) +// Config for the HTTP server. type Config struct { Port int } +// TestServerConfig has settings for the mock server instance. +type TestServerConfig struct { + // MaxActionsPerCall is the upper limit of actions to return in one CastAITestServer.GetActions call. + MaxActionsPerCall int + // TimeoutWaitingForActions controls how long to wait for at least 1 action to appear on server side. + // This mimics CH behavior of not returning early if there are no pending actions and keeping the request "running". + TimeoutWaitingForActions time.Duration + // BufferSize controls the input channel size. + BufferSize int +} + type ActionLoadTestConfig struct { // TotalActions is the overall amount of actions of this type that will be created. TotalActions int diff --git a/loadtest/kwok.go b/loadtest/kwok.go new file mode 100644 index 00000000..00066048 --- /dev/null +++ b/loadtest/kwok.go @@ -0,0 +1,183 @@ +package loadtest + +import ( + "fmt" + + "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +const ( + DefaultKwokMarker = "kwok.x-k8s.io/node" + KwokMarkerValue = "fake-node" +) + +// Kwok wraps +type Kwok struct { +} + +type KwokConfig struct { + // Label should match what kwok is configured to use via --manage-nodes-with-label-selector + // Default is DefaultKwokMarker. Value is always KwokMarkerValue. + Label string + + // Annotation should match what kwok is configured to use via --manage-nodes-with-annotation-selector + // Default is DefaultKwokMarker. Value is always KwokMarkerValue. + Annotation string +} + +// Should be able to create kwok-owned nodes +// And scheduled deployments, etc on them. + +// Run fake server + kwok controller +// Either as two processes or as two container in the same deployment. + +// NewKwokNode creates a fake node with reasonable defaults. +// Can be customized but the marker label/annotation must be present. +// Tainted by default with DefaultKwokMarker to avoid running real pods on it. +// Requires that a kwok-controller is running to actually simulate the node on apply. +func NewKwokNode(cfg KwokConfig, nodeName string) *corev1.Node { + kwokLabel := DefaultKwokMarker + if cfg.Label != "" { + kwokLabel = cfg.Label + } + kwokAnnotation := DefaultKwokMarker + if cfg.Annotation != "" { + kwokAnnotation = cfg.Annotation + } + + defaultLabels := map[string]string{ + kwokLabel: KwokMarkerValue, + corev1.LabelArchStable: "amd64", + corev1.LabelHostname: nodeName, + corev1.LabelOSStable: "linux", + "kubernetes.io/role": "fake-node", + "type": "kwok", + } + + defaultAnnotations := map[string]string{ + kwokAnnotation: KwokMarkerValue, + } + + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Labels: defaultLabels, + Annotations: defaultAnnotations, + }, + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{ + { + Key: DefaultKwokMarker, + Value: KwokMarkerValue, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewMilliQuantity(32, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(256*1024*1024*1024, resource.BinarySI), + corev1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewMilliQuantity(32, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(256*1024*1024*1024, resource.BinarySI), + corev1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + NodeInfo: corev1.NodeSystemInfo{ + KubeletVersion: "kwok", + OperatingSystem: "linux", + Architecture: "amd64", + }, + Phase: corev1.NodeRunning, + }, + } +} + +func ForceDeploymentOnNode(deployment *appsv1.Deployment, nodeName string) { + deployment.Spec.Template.Spec.NodeName = nodeName +} + +// DeploymentWithStuckPDB creates a 1-replica deployment with "stuck PDB" that is never satisfiable, i.e. no pods can be evicted. +// Deployment cannot run in reality, it uses fake container. +// Deployment deploys on kwok fake nodes by default. +func DeploymentWithStuckPDB(deploymentName string) (*appsv1.Deployment, *policyv1.PodDisruptionBudget) { + labelApp := "appname" + labelValue := fmt.Sprintf("%s-stuck-pdb", deploymentName) + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: deploymentName, + Namespace: metav1.NamespaceDefault, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: lo.ToPtr(int32(1)), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + labelApp: labelValue, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + labelApp: labelValue, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "fake-container", + Image: "does-not-exist", + }, + }, + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: DefaultKwokMarker, + Operator: corev1.NodeSelectorOpIn, + Values: []string{KwokMarkerValue}, + }, + }, + }, + }, + }, + }, + }, + Tolerations: []corev1.Toleration{ + { + Key: DefaultKwokMarker, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + } + + pdb := &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-pdb", deploymentName), + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + labelApp: labelValue, + }, + }, + MaxUnavailable: lo.ToPtr(intstr.FromInt32(0)), + }, + } + + return deployment, pdb +} diff --git a/loadtest/prepare.go b/loadtest/prepare.go index 0ae7b608..a81aabd4 100644 --- a/loadtest/prepare.go +++ b/loadtest/prepare.go @@ -1,28 +1,129 @@ package loadtest import ( - v1 "k8s.io/api/apps/v1" - policyv1 "k8s.io/api/policy/v1" + "context" + "fmt" + "log/slog" + "path/filepath" + "time" + + "github.com/google/uuid" + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" + + "github.com/castai/cluster-controller/internal/castai" ) -func Apply(client kubernetes.Interface, obj kubernetes.Interface) error { - return nil -} +func TestStuckDrain(actionChannel chan<- castai.ClusterAction) { + namespaceForTest := "loadtest" + ctx := context.Background() + // TODO: Use custom namespace + // TODO: Proper cleanup + + // Prepare - create node per drain operation. + + var kubeconfig string + if home := homedir.HomeDir(); home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } + + // Build the Kubernetes configuration from the kubeconfig file. + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + panic(fmt.Errorf("failed to build config: %v", err)) + } + + // Create a clientset based on the configuration. + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + panic(fmt.Errorf("failed to create clientset: %v", err)) + } + + // prep clean up + nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", DefaultKwokMarker, KwokMarkerValue), + }) + if err != nil { + panic(fmt.Errorf("failed to list nodes: %v", err)) + } + + for _, n := range nodes.Items { + err = clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + panic(err) + } + } + + slog.Info(fmt.Sprintf("Deleting namespace %v", namespaceForTest)) + err = clientset.CoreV1().Namespaces().Delete(ctx, namespaceForTest, metav1.DeleteOptions{ + GracePeriodSeconds: lo.ToPtr(int64(0)), + PropagationPolicy: lo.ToPtr(metav1.DeletePropagationBackground), + }) + if err != nil && !apierrors.IsNotFound(err) { + panic(err) + } + + for range 300 { + _, err = clientset.CoreV1().Namespaces().Get(ctx, namespaceForTest, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + break + } + time.Sleep(1 * time.Second) + } + + slog.Info(fmt.Sprintf("Recreating namespace %v", namespaceForTest)) + _, err = clientset.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespaceForTest, + }, + }, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + panic(err) + } + + var nodesToEvict []*corev1.Node + + for i := range 500 { + node := NewKwokNode(KwokConfig{}, fmt.Sprintf("kwok-stuck-drain-%d", i)) + + _, err = clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil { + panic(err) + } + nodesToEvict = append(nodesToEvict, node) + + deployment, pdb := DeploymentWithStuckPDB(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) + deployment.ObjectMeta.Namespace = namespaceForTest + deployment.Spec.Replicas = lo.ToPtr(int32(500)) + pdb.ObjectMeta.Namespace = namespaceForTest + + _, err = clientset.AppsV1().Deployments(namespaceForTest).Create(ctx, deployment, metav1.CreateOptions{}) + if err != nil { + panic(err) + } + + _, err = clientset.PolicyV1().PodDisruptionBudgets(namespaceForTest).Create(ctx, pdb, metav1.CreateOptions{}) + if err != nil { + panic(err) + } + } -// DeploymentWithUnsatisfiablePDB generates a 1-replica deployment and a PDB that does not allow any disruption. -// Useful to test "stuck" drains. -func DeploymentWithUnsatisfiablePDB(client kubernetes.Interface) (*v1.Deployment, *policyv1.PodDisruptionBudget) { - //a := v1.Deployment{ - // TypeMeta: metav1.TypeMeta{}, - // ObjectMeta: metav1.ObjectMeta{}, - // Spec: v1.DeploymentSpec{}, - // Status: v1.DeploymentStatus{}, - //} - //b := policyv1.PodDisruptionBudget{} - // - ////Spec. - // - //v1.De - return nil, nil + slog.Info(fmt.Sprintf("Starting drain action creation")) + for _, node := range nodesToEvict { + actionChannel <- castai.ClusterAction{ + ID: uuid.NewString(), + CreatedAt: time.Now().UTC(), + ActionDrainNode: &castai.ActionDrainNode{ + NodeName: node.Name, + NodeID: "", + DrainTimeoutSeconds: 60, + Force: false, + }, + } + } } From 2305c70a5a340f869b6caa23be81acdecc0a942a Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 13 Mar 2025 09:59:31 +0200 Subject: [PATCH 03/22] Basic skeleton of running load test scenarios --- cmd/loadtest/main.go | 113 +++++++++++---- loadtest/castai.go | 1 - loadtest/config.go | 11 -- loadtest/prepare.go | 129 ------------------ .../{kwok.go => scenarios/k8s_objects.go} | 16 +-- loadtest/scenarios/pod_events.go | 58 ++++++++ loadtest/scenarios/scenario.go | 17 +++ loadtest/scenarios/stuck_drain.go | 122 +++++++++++++++++ 8 files changed, 288 insertions(+), 179 deletions(-) delete mode 100644 loadtest/prepare.go rename loadtest/{kwok.go => scenarios/k8s_objects.go} (92%) create mode 100644 loadtest/scenarios/pod_events.go create mode 100644 loadtest/scenarios/scenario.go create mode 100644 loadtest/scenarios/stuck_drain.go diff --git a/cmd/loadtest/main.go b/cmd/loadtest/main.go index 2cffb77a..536af62c 100644 --- a/cmd/loadtest/main.go +++ b/cmd/loadtest/main.go @@ -1,12 +1,23 @@ package main import ( + "context" "fmt" "log/slog" "os" + "path/filepath" "time" + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" + "github.com/castai/cluster-controller/loadtest" + "github.com/castai/cluster-controller/loadtest/scenarios" ) // TODO: Move to corba @@ -25,36 +36,92 @@ func main() { TimeoutWaitingForActions: 60 * time.Second, }) + var kubeconfig string + if home := homedir.HomeDir(); home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } + + // Build the Kubernetes configuration from the kubeconfig file. + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + panic(fmt.Errorf("failed to build config: %v", err)) + } + + // Create a clientset based on the configuration. + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + panic(fmt.Errorf("failed to create clientset: %v", err)) + } + + namespaceForTest := "loadtest-1" + ctx := context.Background() + + // TODO: Extract + logger.Info(fmt.Sprintf("Deleting namespace %v", namespaceForTest)) + err = clientset.CoreV1().Namespaces().Delete(ctx, namespaceForTest, metav1.DeleteOptions{ + GracePeriodSeconds: lo.ToPtr(int64(0)), + PropagationPolicy: lo.ToPtr(metav1.DeletePropagationBackground), + }) + if err != nil && !apierrors.IsNotFound(err) { + panic(err) + } + + for range 300 { + _, err = clientset.CoreV1().Namespaces().Get(ctx, namespaceForTest, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + break + } + time.Sleep(1 * time.Second) + } + + logger.Info(fmt.Sprintf("Recreating namespace %v", namespaceForTest)) + _, err = clientset.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespaceForTest, + }, + }, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + panic(err) + } + // end + go func() { + ctx := context.Background() + ch := testServer.GetActionsPushChannel() - loadtest.TestStuckDrain(ch) - //for range 10000 { - // ch <- castai.ClusterAction{ - // ID: uuid.NewString(), - // ActionCreateEvent: &castai.ActionCreateEvent{ - // Reporter: "provisioning.cast.ai", - // ObjectRef: corev1.ObjectReference{ - // Kind: "Pod", - // Namespace: "default", - // Name: "Dummy-pod", - // UID: types.UID(uuid.New().String()), - // APIVersion: "v1", - // }, - // EventTime: time.Now(), - // EventType: "Warning", - // //Reason: fmt.Sprintf("Just because! %d", i), - // Reason: "Reason", - // Action: "During node creation.", - // Message: "Oh common, you can do better.", - // }, - // } - //} + _, _, eventsScenario := scenarios.PodEvents(2000, logger)() + + prepareDrain, cleanupDrain, drainScenario := scenarios.StuckDrain(100, 1, logger)() + + err := prepareDrain(ctx, namespaceForTest, clientset) + if err != nil { + panic(err) + } + + go func() { + err := eventsScenario(ctx, ch) + if err != nil { + logger.Error(fmt.Sprintf("failed events scenario with %v", err)) + } + }() + + go func() { + err := drainScenario(ctx, ch) + if err != nil { + logger.Error(fmt.Sprintf("failed to drain with %v", err)) + } + + err = cleanupDrain(ctx, namespaceForTest, clientset) + if err != nil { + logger.Error(fmt.Sprintf("failed to cleanup with %v", err)) + } + }() }() logger.Info(fmt.Sprintf("starting http server on port %d", cfg.Port)) // TODO: Cleanup - err := loadtest.NewHttpServer(cfg, testServer) + err = loadtest.NewHttpServer(cfg, testServer) if err != nil { panic(err) } diff --git a/loadtest/castai.go b/loadtest/castai.go index aa27a6bd..ccba85b8 100644 --- a/loadtest/castai.go +++ b/loadtest/castai.go @@ -42,7 +42,6 @@ func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai. // If none arrive, we simulate the "empty poll" case of cluster-hub and return empty list. select { case x := <-c.actionsPushChannel: - c.log.Info("Received action", "action", *x.ActionDrainNode) actionsToReturn = append(actionsToReturn, &x) case <-time.After(c.cfg.TimeoutWaitingForActions): c.log.Info(fmt.Sprintf("No actions to return in %v", c.cfg.TimeoutWaitingForActions)) diff --git a/loadtest/config.go b/loadtest/config.go index a9c8cd65..2f027609 100644 --- a/loadtest/config.go +++ b/loadtest/config.go @@ -1,7 +1,6 @@ package loadtest import ( - "reflect" "time" ) @@ -20,13 +19,3 @@ type TestServerConfig struct { // BufferSize controls the input channel size. BufferSize int } - -type ActionLoadTestConfig struct { - // TotalActions is the overall amount of actions of this type that will be created. - TotalActions int - // ActionsPerCall controls how many actions of the type to return on a given call. - // In the real world, this parameter is in cluster-hub side to determine how many max actions to return. - ActionsPerCall int - - ActionType reflect.Type -} diff --git a/loadtest/prepare.go b/loadtest/prepare.go deleted file mode 100644 index a81aabd4..00000000 --- a/loadtest/prepare.go +++ /dev/null @@ -1,129 +0,0 @@ -package loadtest - -import ( - "context" - "fmt" - "log/slog" - "path/filepath" - "time" - - "github.com/google/uuid" - "github.com/samber/lo" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/util/homedir" - - "github.com/castai/cluster-controller/internal/castai" -) - -func TestStuckDrain(actionChannel chan<- castai.ClusterAction) { - namespaceForTest := "loadtest" - ctx := context.Background() - // TODO: Use custom namespace - // TODO: Proper cleanup - - // Prepare - create node per drain operation. - - var kubeconfig string - if home := homedir.HomeDir(); home != "" { - kubeconfig = filepath.Join(home, ".kube", "config") - } - - // Build the Kubernetes configuration from the kubeconfig file. - config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - panic(fmt.Errorf("failed to build config: %v", err)) - } - - // Create a clientset based on the configuration. - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - panic(fmt.Errorf("failed to create clientset: %v", err)) - } - - // prep clean up - nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s", DefaultKwokMarker, KwokMarkerValue), - }) - if err != nil { - panic(fmt.Errorf("failed to list nodes: %v", err)) - } - - for _, n := range nodes.Items { - err = clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - panic(err) - } - } - - slog.Info(fmt.Sprintf("Deleting namespace %v", namespaceForTest)) - err = clientset.CoreV1().Namespaces().Delete(ctx, namespaceForTest, metav1.DeleteOptions{ - GracePeriodSeconds: lo.ToPtr(int64(0)), - PropagationPolicy: lo.ToPtr(metav1.DeletePropagationBackground), - }) - if err != nil && !apierrors.IsNotFound(err) { - panic(err) - } - - for range 300 { - _, err = clientset.CoreV1().Namespaces().Get(ctx, namespaceForTest, metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - break - } - time.Sleep(1 * time.Second) - } - - slog.Info(fmt.Sprintf("Recreating namespace %v", namespaceForTest)) - _, err = clientset.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: namespaceForTest, - }, - }, metav1.CreateOptions{}) - if err != nil && !apierrors.IsAlreadyExists(err) { - panic(err) - } - - var nodesToEvict []*corev1.Node - - for i := range 500 { - node := NewKwokNode(KwokConfig{}, fmt.Sprintf("kwok-stuck-drain-%d", i)) - - _, err = clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) - if err != nil { - panic(err) - } - nodesToEvict = append(nodesToEvict, node) - - deployment, pdb := DeploymentWithStuckPDB(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) - deployment.ObjectMeta.Namespace = namespaceForTest - deployment.Spec.Replicas = lo.ToPtr(int32(500)) - pdb.ObjectMeta.Namespace = namespaceForTest - - _, err = clientset.AppsV1().Deployments(namespaceForTest).Create(ctx, deployment, metav1.CreateOptions{}) - if err != nil { - panic(err) - } - - _, err = clientset.PolicyV1().PodDisruptionBudgets(namespaceForTest).Create(ctx, pdb, metav1.CreateOptions{}) - if err != nil { - panic(err) - } - } - - slog.Info(fmt.Sprintf("Starting drain action creation")) - for _, node := range nodesToEvict { - actionChannel <- castai.ClusterAction{ - ID: uuid.NewString(), - CreatedAt: time.Now().UTC(), - ActionDrainNode: &castai.ActionDrainNode{ - NodeName: node.Name, - NodeID: "", - DrainTimeoutSeconds: 60, - Force: false, - }, - } - } -} diff --git a/loadtest/kwok.go b/loadtest/scenarios/k8s_objects.go similarity index 92% rename from loadtest/kwok.go rename to loadtest/scenarios/k8s_objects.go index 00066048..90c6c0af 100644 --- a/loadtest/kwok.go +++ b/loadtest/scenarios/k8s_objects.go @@ -1,4 +1,4 @@ -package loadtest +package scenarios import ( "fmt" @@ -17,10 +17,6 @@ const ( KwokMarkerValue = "fake-node" ) -// Kwok wraps -type Kwok struct { -} - type KwokConfig struct { // Label should match what kwok is configured to use via --manage-nodes-with-label-selector // Default is DefaultKwokMarker. Value is always KwokMarkerValue. @@ -31,12 +27,6 @@ type KwokConfig struct { Annotation string } -// Should be able to create kwok-owned nodes -// And scheduled deployments, etc on them. - -// Run fake server + kwok controller -// Either as two processes or as two container in the same deployment. - // NewKwokNode creates a fake node with reasonable defaults. // Can be customized but the marker label/annotation must be present. // Tainted by default with DefaultKwokMarker to avoid running real pods on it. @@ -100,10 +90,6 @@ func NewKwokNode(cfg KwokConfig, nodeName string) *corev1.Node { } } -func ForceDeploymentOnNode(deployment *appsv1.Deployment, nodeName string) { - deployment.Spec.Template.Spec.NodeName = nodeName -} - // DeploymentWithStuckPDB creates a 1-replica deployment with "stuck PDB" that is never satisfiable, i.e. no pods can be evicted. // Deployment cannot run in reality, it uses fake container. // Deployment deploys on kwok fake nodes by default. diff --git a/loadtest/scenarios/pod_events.go b/loadtest/scenarios/pod_events.go new file mode 100644 index 00000000..bc3499a3 --- /dev/null +++ b/loadtest/scenarios/pod_events.go @@ -0,0 +1,58 @@ +package scenarios + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +func PodEvents(count int, log *slog.Logger) TestScenario { + return func() (Preparation, Cleanup, TestRun) { + prepare := func(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + // nothing to prepare for this test, pod does not have to exist to create events. + return nil + } + + cleanup := func(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + // nothing to clean for this test, events are dropped automatically after certain time. + + return nil + } + + run := func(_ context.Context, actionChannel chan<- castai.ClusterAction) error { + log.Info(fmt.Sprintf("Starting creating %d events for different pods", count)) + for i := range count { + actionChannel <- castai.ClusterAction{ + ID: uuid.NewString(), + ActionCreateEvent: &castai.ActionCreateEvent{ + Reporter: "provisioning.cast.ai", + ObjectRef: corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "Dummy-pod", + UID: types.UID(uuid.New().String()), + APIVersion: "v1", + }, + EventTime: time.Now(), + EventType: "Warning", + // Reason is different so events won't be aggregated by CC's event broadcaster. + Reason: fmt.Sprintf("Just because! %d", i), + Action: "During node creation.", + Message: "Oh common, you can do better.", + }, + } + } + + return nil + } + return prepare, cleanup, run + } +} diff --git a/loadtest/scenarios/scenario.go b/loadtest/scenarios/scenario.go new file mode 100644 index 00000000..e516eb67 --- /dev/null +++ b/loadtest/scenarios/scenario.go @@ -0,0 +1,17 @@ +package scenarios + +import ( + "context" + + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +type Preparation func(ctx context.Context, namespace string, clientset kubernetes.Interface) error + +type Cleanup func(ctx context.Context, namespace string, clientset kubernetes.Interface) error + +type TestRun func(ctx context.Context, actionChannel chan<- castai.ClusterAction) error + +type TestScenario func() (Preparation, Cleanup, TestRun) diff --git a/loadtest/scenarios/stuck_drain.go b/loadtest/scenarios/stuck_drain.go new file mode 100644 index 00000000..7697c756 --- /dev/null +++ b/loadtest/scenarios/stuck_drain.go @@ -0,0 +1,122 @@ +package scenarios + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +func StuckDrain(nodeCount int, deploymentReplicas int, log *slog.Logger) TestScenario { + return func() (Preparation, Cleanup, TestRun) { + var nodesToDrain []*corev1.Node + + // Creates node per action + 1 deployment and PDB for each node. + prepare := func(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + for i := range nodeCount { + nodeName := fmt.Sprintf("kwok-stuck-drain-%d", i) + log.Info(fmt.Sprintf("Creating node %s", nodeName)) + node := NewKwokNode(KwokConfig{}, nodeName) + + _, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create fake node: %w", err) + } + if err != nil && apierrors.IsAlreadyExists(err) { + log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) + } + nodesToDrain = append(nodesToDrain, node) + + deployment, pdb := DeploymentWithStuckPDB(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) + deployment.ObjectMeta.Namespace = namespace + deployment.Spec.Replicas = lo.ToPtr(int32(deploymentReplicas)) + deployment.Spec.Template.Spec.NodeName = nodeName + pdb.ObjectMeta.Namespace = namespace + + _, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create fake deployment: %w", err) + } + + _, err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Create(ctx, pdb, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create fake pod disruption budget: %w", err) + } + } + + return nil + } + + cleanup := func(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + var errs []error + // We iterate through all nodes as they are not deleted with the ns and can leak => so we want do delete as many as possible. + for _, n := range nodesToDrain { + err := clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + log.Warn("failed to delete fake node, will continue with other nodes", "nodeName", n.Name) + errs = append(errs, err) + } + } + if len(errs) > 0 { + return errors.Join(errs...) + } + + // We assume no other tests are using the same NS so just delete all. + deploymentsInNS, err := clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list deployments: %w", err) + } + + for _, deployment := range deploymentsInNS.Items { + err = clientset.AppsV1().Deployments(namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete fake deployment: %w", err) + } + } + + pdbsInNS, err := clientset.PolicyV1().PodDisruptionBudgets(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list pod disruption budgets: %w", err) + } + + for _, pdb := range pdbsInNS.Items { + err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Delete(ctx, pdb.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete fake pod disruption budget: %w", err) + } + } + + log.Info("Finished up cleaning nodes for drain, deployments and PDBs.") + return nil + } + + run := func(ctx context.Context, actionChannel chan<- castai.ClusterAction) error { + log.Info(fmt.Sprintf("Starting drain action creation with %d nodes", len(nodesToDrain))) + for _, node := range nodesToDrain { + actionChannel <- castai.ClusterAction{ + ID: uuid.NewString(), + CreatedAt: time.Now().UTC(), + ActionDrainNode: &castai.ActionDrainNode{ + NodeName: node.Name, + NodeID: "", + DrainTimeoutSeconds: 60, + Force: false, + }, + } + } + + return nil + } + return prepare, cleanup, run + } +} From 68f62c91117d664f1e4e33306f9d75c60945758d Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 13 Mar 2025 14:33:04 +0200 Subject: [PATCH 04/22] Add testserver cobra command --- cmd/loadtest/main.go | 128 --------------------------------- cmd/root.go | 2 + cmd/testserver/command.go | 16 +++++ cmd/testserver/run.go | 91 +++++++++++++++++++++++ loadtest/scenarios/scenario.go | 79 ++++++++++++++++++++ 5 files changed, 188 insertions(+), 128 deletions(-) delete mode 100644 cmd/loadtest/main.go create mode 100644 cmd/testserver/command.go create mode 100644 cmd/testserver/run.go diff --git a/cmd/loadtest/main.go b/cmd/loadtest/main.go deleted file mode 100644 index 536af62c..00000000 --- a/cmd/loadtest/main.go +++ /dev/null @@ -1,128 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log/slog" - "os" - "path/filepath" - "time" - - "github.com/samber/lo" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/util/homedir" - - "github.com/castai/cluster-controller/loadtest" - "github.com/castai/cluster-controller/loadtest/scenarios" -) - -// TODO: Move to corba - -func main() { - logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) - cfg := loadtest.Config{ - Port: 8080, - } - - logger.Info("creating test server") - // TODO: Defaults... - testServer := loadtest.NewTestServer(logger, loadtest.TestServerConfig{ - BufferSize: 1000, - MaxActionsPerCall: 500, - TimeoutWaitingForActions: 60 * time.Second, - }) - - var kubeconfig string - if home := homedir.HomeDir(); home != "" { - kubeconfig = filepath.Join(home, ".kube", "config") - } - - // Build the Kubernetes configuration from the kubeconfig file. - config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - panic(fmt.Errorf("failed to build config: %v", err)) - } - - // Create a clientset based on the configuration. - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - panic(fmt.Errorf("failed to create clientset: %v", err)) - } - - namespaceForTest := "loadtest-1" - ctx := context.Background() - - // TODO: Extract - logger.Info(fmt.Sprintf("Deleting namespace %v", namespaceForTest)) - err = clientset.CoreV1().Namespaces().Delete(ctx, namespaceForTest, metav1.DeleteOptions{ - GracePeriodSeconds: lo.ToPtr(int64(0)), - PropagationPolicy: lo.ToPtr(metav1.DeletePropagationBackground), - }) - if err != nil && !apierrors.IsNotFound(err) { - panic(err) - } - - for range 300 { - _, err = clientset.CoreV1().Namespaces().Get(ctx, namespaceForTest, metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - break - } - time.Sleep(1 * time.Second) - } - - logger.Info(fmt.Sprintf("Recreating namespace %v", namespaceForTest)) - _, err = clientset.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: namespaceForTest, - }, - }, metav1.CreateOptions{}) - if err != nil && !apierrors.IsAlreadyExists(err) { - panic(err) - } - // end - - go func() { - ctx := context.Background() - - ch := testServer.GetActionsPushChannel() - - _, _, eventsScenario := scenarios.PodEvents(2000, logger)() - - prepareDrain, cleanupDrain, drainScenario := scenarios.StuckDrain(100, 1, logger)() - - err := prepareDrain(ctx, namespaceForTest, clientset) - if err != nil { - panic(err) - } - - go func() { - err := eventsScenario(ctx, ch) - if err != nil { - logger.Error(fmt.Sprintf("failed events scenario with %v", err)) - } - }() - - go func() { - err := drainScenario(ctx, ch) - if err != nil { - logger.Error(fmt.Sprintf("failed to drain with %v", err)) - } - - err = cleanupDrain(ctx, namespaceForTest, clientset) - if err != nil { - logger.Error(fmt.Sprintf("failed to cleanup with %v", err)) - } - }() - }() - - logger.Info(fmt.Sprintf("starting http server on port %d", cfg.Port)) - // TODO: Cleanup - err = loadtest.NewHttpServer(cfg, testServer) - if err != nil { - panic(err) - } -} diff --git a/cmd/root.go b/cmd/root.go index 16410faa..316a049b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -9,6 +9,7 @@ import ( "github.com/castai/cluster-controller/cmd/controller" "github.com/castai/cluster-controller/cmd/monitor" + "github.com/castai/cluster-controller/cmd/testserver" ) var rootCmd = &cobra.Command{ @@ -40,6 +41,7 @@ func Execute(ctx context.Context) { func init() { rootCmd.AddCommand(controller.NewCmd()) rootCmd.AddCommand(monitor.NewCmd()) + rootCmd.AddCommand(testserver.NewCmd()) } func fatal(err error) { diff --git a/cmd/testserver/command.go b/cmd/testserver/command.go new file mode 100644 index 00000000..1007e0a7 --- /dev/null +++ b/cmd/testserver/command.go @@ -0,0 +1,16 @@ +package testserver + +import "github.com/spf13/cobra" + +const Use = "test-server" + +func NewCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: Use, + RunE: func(cmd *cobra.Command, args []string) error { + return run(cmd.Context()) + }, + } + + return cmd +} diff --git a/cmd/testserver/run.go b/cmd/testserver/run.go new file mode 100644 index 00000000..70fae32d --- /dev/null +++ b/cmd/testserver/run.go @@ -0,0 +1,91 @@ +package testserver + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "os" + "sync" + "time" + + "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/config" + "github.com/castai/cluster-controller/loadtest" + "github.com/castai/cluster-controller/loadtest/scenarios" +) + +func run(ctx context.Context) error { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + // TODO: Export as envVars + cfg := loadtest.Config{ + Port: 8080, + } + logger.Info("creating test server") + // TODO: Defaults... + testServer := loadtest.NewTestServer(logger, loadtest.TestServerConfig{ + BufferSize: 1000, + MaxActionsPerCall: 500, + TimeoutWaitingForActions: 60 * time.Second, + }) + + // Not ideal but fast + discardLogger := logrus.New() + discardLogger.Out = io.Discard + restConfig, err := config.RetrieveKubeConfig(discardLogger) + if err != nil { + return fmt.Errorf("failed to get kubeconfig: %w", err) + } + + clientSet, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return fmt.Errorf("obtaining kubernetes clientset: %w", err) + } + + go func() { + logger.Info("Starting HTTP server for test") + err = loadtest.NewHttpServer(ctx, cfg, testServer) + if err != nil { + logger.Error("", "err", err) + panic(err) + } + }() + + ch := testServer.GetActionsPushChannel() + + testScenarios := []scenarios.TestScenario{ + scenarios.PodEvents(2000, logger), + scenarios.StuckDrain(100, 1, logger), + } + + var wg sync.WaitGroup + wg.Add(len(testScenarios)) + errs := make(chan error, len(testScenarios)) + + for i, test := range testScenarios { + go func() { + defer wg.Done() + logger.Info(fmt.Sprintf("Starting test scenario %d", i)) + + err := scenarios.RunScenario(ctx, test, ch, logger, clientSet) + errs <- err + }() + } + + logger.Info("Waiting for test scenarios to finish") + wg.Wait() + close(errs) + receivedErrors := make([]error, 0) + for err := range errs { + if err != nil { + receivedErrors = append(receivedErrors, err) + } + } + logger.Info(fmt.Sprintf("All test scenarios are done, received (%d) errors, exiting", len(receivedErrors))) + + // TODO: Wait for server channel to be empty as well + return errors.Join(receivedErrors...) +} diff --git a/loadtest/scenarios/scenario.go b/loadtest/scenarios/scenario.go index e516eb67..206f5307 100644 --- a/loadtest/scenarios/scenario.go +++ b/loadtest/scenarios/scenario.go @@ -2,7 +2,15 @@ package scenarios import ( "context" + "fmt" + "log/slog" + "math/rand" + "time" + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "github.com/castai/cluster-controller/internal/castai" @@ -15,3 +23,74 @@ type Cleanup func(ctx context.Context, namespace string, clientset kubernetes.In type TestRun func(ctx context.Context, actionChannel chan<- castai.ClusterAction) error type TestScenario func() (Preparation, Cleanup, TestRun) + +func RunScenario( + ctx context.Context, + scenario TestScenario, + actionChannel chan<- castai.ClusterAction, + logger *slog.Logger, + clientset kubernetes.Interface, +) error { + namespaceForTest := fmt.Sprintf("test-namespace-%d", rand.Int31()) + logger = logger.With("namespace", namespaceForTest) + + // Prepare the namespace to run the test in. + logger.Info("Preparing namespace for test") + _, err := clientset.CoreV1().Namespaces().Get(ctx, namespaceForTest, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to get namespace for test %v: %w", namespaceForTest, err) + } + if !apierrors.IsNotFound(err) { + return fmt.Errorf("namespace %v already exists and could be in use, cannot continue", namespaceForTest) + } + + logger.Info("Namespace does not exist, will create") + _, err = clientset.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespaceForTest, + }, + }, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create namespace %v: %w", namespaceForTest, err) + } + defer func() { + logger.Info("Deleting namespace for test") + err := clientset.CoreV1().Namespaces().Delete(ctx, namespaceForTest, metav1.DeleteOptions{ + GracePeriodSeconds: lo.ToPtr(int64(0)), + PropagationPolicy: lo.ToPtr(metav1.DeletePropagationBackground), + }) + if err != nil { + logger.Error(fmt.Sprintf("Failed to delete namespace for test %v: %v", namespaceForTest, err)) + return + } + logger.Info("Successfully deleted namespace for test") + }() + logger.Info("Namespace created") + + logger.Info("Starting test scenario") + prepare, cleanup, run := scenario() + + logger.Info("Running preparation function") + err = prepare(ctx, namespaceForTest, clientset) + if err != nil { + return fmt.Errorf("failed to run preparation function: %w", err) + } + defer func() { + logger.Info("Running cleanup function") + err := cleanup(ctx, namespaceForTest, clientset) + if err != nil { + logger.Error("failed ot run cleanup", "error", err) + } + }() + + scenarioCtx, cancel := context.WithTimeout(ctx, 30*time.Minute) + defer cancel() + + logger.Info("Starting scenario execution") + err = run(scenarioCtx, actionChannel) + if err != nil { + return fmt.Errorf("failed to run scenario: %w", err) + } + + return nil +} From 41b35075e323d72194408e8522918346548f8de0 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 13 Mar 2025 14:49:54 +0200 Subject: [PATCH 05/22] Simple make step for now --- Makefile | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Makefile b/Makefile index df427822..19d25a4f 100644 --- a/Makefile +++ b/Makefile @@ -38,3 +38,8 @@ test: generate-e2e-client: go generate ./e2e/client .PHONY: generate-e2e-client + +# TODO: Make this less simplistic +run-loadtest: + # TODO: Required because of reusing config + API_KEY=dummy API_URL=http://example.com CLUSTER_ID=D30A163C-C5DF-4CC8-985C-D1449398295E KUBECONFIG=~/.kube/config LOG_LEVEL=4 go run . test-server \ No newline at end of file From 1929c51c1b1d7f1f3ebce162e84805c6945473c4 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 13 Mar 2025 16:11:53 +0200 Subject: [PATCH 06/22] Wait for ready deployments in stuck drain test --- loadtest/scenarios/stuck_drain.go | 14 ++++++++++++++ loadtest/scenarios/util.go | 24 ++++++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 loadtest/scenarios/util.go diff --git a/loadtest/scenarios/stuck_drain.go b/loadtest/scenarios/stuck_drain.go index 7697c756..552036c0 100644 --- a/loadtest/scenarios/stuck_drain.go +++ b/loadtest/scenarios/stuck_drain.go @@ -37,6 +37,7 @@ func StuckDrain(nodeCount int, deploymentReplicas int, log *slog.Logger) TestSce } nodesToDrain = append(nodesToDrain, node) + log.Info(fmt.Sprintf("Creating deployment on node %s", nodeName)) deployment, pdb := DeploymentWithStuckPDB(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) deployment.ObjectMeta.Namespace = namespace deployment.Spec.Replicas = lo.ToPtr(int32(deploymentReplicas)) @@ -48,6 +49,19 @@ func StuckDrain(nodeCount int, deploymentReplicas int, log *slog.Logger) TestSce return fmt.Errorf("failed to create fake deployment: %w", err) } + // Wait for deployment to become ready, otherwise we might start draining before the pod is up. + progressed := WaitUntil(ctx, 30*time.Second, func() bool { + d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) + if err != nil { + log.Warn("failed to get deployment after creating", "err", err) + return false + } + return d.Status.ReadyReplicas == *d.Spec.Replicas + }) + if !progressed { + return fmt.Errorf("deployment %s did not progress to ready state in time", deployment.Name) + } + _, err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Create(ctx, pdb, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("failed to create fake pod disruption budget: %w", err) diff --git a/loadtest/scenarios/util.go b/loadtest/scenarios/util.go new file mode 100644 index 00000000..900304f3 --- /dev/null +++ b/loadtest/scenarios/util.go @@ -0,0 +1,24 @@ +package scenarios + +import ( + "context" + "time" +) + +func WaitUntil(ctx context.Context, duration time.Duration, condition func() bool) bool { + start := time.Now() + for { + select { + case <-ctx.Done(): + return false + default: + } + if time.Now().Sub(start) > duration { + return false + } + if condition() { + return true + } + time.Sleep(500 * time.Millisecond) + } +} From c39232db1d445bd627050abee66827823c11ade9 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Fri, 14 Mar 2025 08:38:47 +0200 Subject: [PATCH 07/22] Change how test server works so scenario can wait for actions to be acked and not cleanup early. --- cmd/testserver/run.go | 8 ++--- loadtest/castai.go | 81 ++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 80 insertions(+), 9 deletions(-) diff --git a/cmd/testserver/run.go b/cmd/testserver/run.go index 70fae32d..e9b86336 100644 --- a/cmd/testserver/run.go +++ b/cmd/testserver/run.go @@ -27,7 +27,7 @@ func run(ctx context.Context) error { logger.Info("creating test server") // TODO: Defaults... testServer := loadtest.NewTestServer(logger, loadtest.TestServerConfig{ - BufferSize: 1000, + BufferSize: 0, MaxActionsPerCall: 500, TimeoutWaitingForActions: 60 * time.Second, }) @@ -54,10 +54,8 @@ func run(ctx context.Context) error { } }() - ch := testServer.GetActionsPushChannel() - testScenarios := []scenarios.TestScenario{ - scenarios.PodEvents(2000, logger), + //scenarios.PodEvents(2000, logger), scenarios.StuckDrain(100, 1, logger), } @@ -70,7 +68,7 @@ func run(ctx context.Context) error { defer wg.Done() logger.Info(fmt.Sprintf("Starting test scenario %d", i)) - err := scenarios.RunScenario(ctx, test, ch, logger, clientSet) + err := scenarios.RunScenario(ctx, test, testServer, logger, clientSet) errs <- err }() } diff --git a/loadtest/castai.go b/loadtest/castai.go index ccba85b8..3d864558 100644 --- a/loadtest/castai.go +++ b/loadtest/castai.go @@ -4,8 +4,10 @@ import ( "context" "fmt" "log/slog" + "sync" "time" + "github.com/google/uuid" "github.com/samber/lo" "github.com/castai/cluster-controller/internal/castai" @@ -18,6 +20,9 @@ type CastAITestServer struct { log *slog.Logger actionsPushChannel chan castai.ClusterAction cfg TestServerConfig + + logMx sync.Mutex + actionsLog map[string]chan string } func NewTestServer(logger *slog.Logger, cfg TestServerConfig) *CastAITestServer { @@ -25,15 +30,48 @@ func NewTestServer(logger *slog.Logger, cfg TestServerConfig) *CastAITestServer log: logger, actionsPushChannel: make(chan castai.ClusterAction, cfg.BufferSize), cfg: cfg, + actionsLog: make(map[string]chan string), } } -// GetActionsPushChannel returns a channel that can be used to push actions into the queue to be picked up by CC. -// Don't close the returned channel or pay the consequences! -func (c *CastAITestServer) GetActionsPushChannel() chan<- castai.ClusterAction { - return c.actionsPushChannel +func (c *CastAITestServer) Shutdown() { + // Drain } +// ExecuteActions pushes the list of actions to the queue for cluster controller to process. +// This method returns when all actions are acked or context is cancelled. +func (c *CastAITestServer) ExecuteActions(ctx context.Context, actions []castai.ClusterAction) { + // owner channel has 1:n relationship with the actions. It handles the ack + ownerChannel := make(chan string, len(actions)) + + for _, action := range actions { + if action.ID == "" { + action.ID = uuid.NewString() + } + c.addActionToStore(action.ID, ownerChannel) + c.actionsPushChannel <- action + } + + // Read from owner channel until len(actions) times, then close and return. + finished := 0 + for { + select { + case <-ctx.Done(): + // TODO: Clean up all actions? + return + case finishedAction := <-ownerChannel: + c.removeActionFromStore(finishedAction) + finished++ + if finished == len(actions) { + close(ownerChannel) + return + } + } + } +} + +/* Start Cluster-hub mock implementation */ + func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai.ClusterAction, error) { c.log.Info(fmt.Sprintf("GetActions called, have %d items in buffer", len(c.actionsPushChannel))) actionsToReturn := make([]*castai.ClusterAction, 0) @@ -70,6 +108,13 @@ func (c *CastAITestServer) AckAction(ctx context.Context, actionID string, req * errMsg := lo.FromPtr(req.Error) c.log.DebugContext(ctx, fmt.Sprintf("action %q acknowledged; has error: %v; error: %v", actionID, req.Error != nil, errMsg)) + receiver := c.getActionReceiver(actionID) + if receiver == nil { + return fmt.Errorf("action %q does not have a receiver", actionID) + } + // Notify owner that this action was done. + receiver <- actionID + return nil } @@ -103,3 +148,31 @@ func (c *CastAITestServer) SendLog(ctx context.Context, e *castai.LogEntry) erro return nil } + +/* End Cluster-hub mock implementation */ + +func (c *CastAITestServer) addActionToStore(actionID string, receiver chan string) { + c.logMx.Lock() + defer c.logMx.Unlock() + + c.actionsLog[actionID] = receiver +} + +func (c *CastAITestServer) removeActionFromStore(actionID string) { + c.logMx.Lock() + defer c.logMx.Unlock() + + delete(c.actionsLog, actionID) +} + +func (c *CastAITestServer) getActionReceiver(actionID string) chan string { + c.logMx.Lock() + defer c.logMx.Unlock() + + receiver, ok := c.actionsLog[actionID] + if !ok { + c.log.Error(fmt.Sprintf("Receiver for action %s is no longer there, possibly shutting down", actionID)) + return nil + } + return receiver +} From e7f6132273b49c67f8d6bee75be6f12bfa2830bb Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Fri, 14 Mar 2025 08:39:48 +0200 Subject: [PATCH 08/22] Cleanup --- loadtest/scenarios/pod_events.go | 8 +++++--- loadtest/scenarios/scenario.go | 13 ++++++++++--- loadtest/scenarios/stuck_drain.go | 20 ++++++++++++-------- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/loadtest/scenarios/pod_events.go b/loadtest/scenarios/pod_events.go index bc3499a3..d7814792 100644 --- a/loadtest/scenarios/pod_events.go +++ b/loadtest/scenarios/pod_events.go @@ -27,10 +27,11 @@ func PodEvents(count int, log *slog.Logger) TestScenario { return nil } - run := func(_ context.Context, actionChannel chan<- castai.ClusterAction) error { + run := func(ctx context.Context, executor ActionExecutor) error { log.Info(fmt.Sprintf("Starting creating %d events for different pods", count)) + actions := make([]castai.ClusterAction, 0, count) for i := range count { - actionChannel <- castai.ClusterAction{ + actions = append(actions, castai.ClusterAction{ ID: uuid.NewString(), ActionCreateEvent: &castai.ActionCreateEvent{ Reporter: "provisioning.cast.ai", @@ -48,8 +49,9 @@ func PodEvents(count int, log *slog.Logger) TestScenario { Action: "During node creation.", Message: "Oh common, you can do better.", }, - } + }) } + executor.ExecuteActions(ctx, actions) return nil } diff --git a/loadtest/scenarios/scenario.go b/loadtest/scenarios/scenario.go index 206f5307..da045d6b 100644 --- a/loadtest/scenarios/scenario.go +++ b/loadtest/scenarios/scenario.go @@ -16,18 +16,25 @@ import ( "github.com/castai/cluster-controller/internal/castai" ) +// TODO Spend more than 2 seconds thinking about names + +type ActionExecutor interface { + // ExecuteActions is expected to execute all actions and wait for ack before returning; otherwise cleanups might run too early. + ExecuteActions(ctx context.Context, actions []castai.ClusterAction) +} + type Preparation func(ctx context.Context, namespace string, clientset kubernetes.Interface) error type Cleanup func(ctx context.Context, namespace string, clientset kubernetes.Interface) error -type TestRun func(ctx context.Context, actionChannel chan<- castai.ClusterAction) error +type TestRun func(ctx context.Context, executor ActionExecutor) error type TestScenario func() (Preparation, Cleanup, TestRun) func RunScenario( ctx context.Context, scenario TestScenario, - actionChannel chan<- castai.ClusterAction, + actioner ActionExecutor, logger *slog.Logger, clientset kubernetes.Interface, ) error { @@ -87,7 +94,7 @@ func RunScenario( defer cancel() logger.Info("Starting scenario execution") - err = run(scenarioCtx, actionChannel) + err = run(scenarioCtx, actioner) if err != nil { return fmt.Errorf("failed to run scenario: %w", err) } diff --git a/loadtest/scenarios/stuck_drain.go b/loadtest/scenarios/stuck_drain.go index 552036c0..c05d26f0 100644 --- a/loadtest/scenarios/stuck_drain.go +++ b/loadtest/scenarios/stuck_drain.go @@ -49,6 +49,11 @@ func StuckDrain(nodeCount int, deploymentReplicas int, log *slog.Logger) TestSce return fmt.Errorf("failed to create fake deployment: %w", err) } + _, err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Create(ctx, pdb, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create fake pod disruption budget: %w", err) + } + // Wait for deployment to become ready, otherwise we might start draining before the pod is up. progressed := WaitUntil(ctx, 30*time.Second, func() bool { d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) @@ -61,11 +66,6 @@ func StuckDrain(nodeCount int, deploymentReplicas int, log *slog.Logger) TestSce if !progressed { return fmt.Errorf("deployment %s did not progress to ready state in time", deployment.Name) } - - _, err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Create(ctx, pdb, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("failed to create fake pod disruption budget: %w", err) - } } return nil @@ -114,10 +114,12 @@ func StuckDrain(nodeCount int, deploymentReplicas int, log *slog.Logger) TestSce return nil } - run := func(ctx context.Context, actionChannel chan<- castai.ClusterAction) error { + run := func(ctx context.Context, executor ActionExecutor) error { log.Info(fmt.Sprintf("Starting drain action creation with %d nodes", len(nodesToDrain))) + + actions := make([]castai.ClusterAction, 0, len(nodesToDrain)) for _, node := range nodesToDrain { - actionChannel <- castai.ClusterAction{ + actions = append(actions, castai.ClusterAction{ ID: uuid.NewString(), CreatedAt: time.Now().UTC(), ActionDrainNode: &castai.ActionDrainNode{ @@ -126,9 +128,11 @@ func StuckDrain(nodeCount int, deploymentReplicas int, log *slog.Logger) TestSce DrainTimeoutSeconds: 60, Force: false, }, - } + }) } + executor.ExecuteActions(ctx, actions) + return nil } return prepare, cleanup, run From db940fbf65a1f6c8de4da1b31618971abc6ca93f Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Fri, 14 Mar 2025 11:38:42 +0200 Subject: [PATCH 09/22] Move scenarios to interface and struct --- loadtest/scenarios/pod_events.go | 96 +++++++------ loadtest/scenarios/scenario.go | 22 ++- loadtest/scenarios/stuck_drain.go | 228 ++++++++++++++++-------------- 3 files changed, 190 insertions(+), 156 deletions(-) diff --git a/loadtest/scenarios/pod_events.go b/loadtest/scenarios/pod_events.go index d7814792..af8ad46b 100644 --- a/loadtest/scenarios/pod_events.go +++ b/loadtest/scenarios/pod_events.go @@ -15,46 +15,60 @@ import ( ) func PodEvents(count int, log *slog.Logger) TestScenario { - return func() (Preparation, Cleanup, TestRun) { - prepare := func(ctx context.Context, namespace string, clientset kubernetes.Interface) error { - // nothing to prepare for this test, pod does not have to exist to create events. - return nil - } - - cleanup := func(ctx context.Context, namespace string, clientset kubernetes.Interface) error { - // nothing to clean for this test, events are dropped automatically after certain time. - - return nil - } - - run := func(ctx context.Context, executor ActionExecutor) error { - log.Info(fmt.Sprintf("Starting creating %d events for different pods", count)) - actions := make([]castai.ClusterAction, 0, count) - for i := range count { - actions = append(actions, castai.ClusterAction{ - ID: uuid.NewString(), - ActionCreateEvent: &castai.ActionCreateEvent{ - Reporter: "provisioning.cast.ai", - ObjectRef: corev1.ObjectReference{ - Kind: "Pod", - Namespace: "default", - Name: "Dummy-pod", - UID: types.UID(uuid.New().String()), - APIVersion: "v1", - }, - EventTime: time.Now(), - EventType: "Warning", - // Reason is different so events won't be aggregated by CC's event broadcaster. - Reason: fmt.Sprintf("Just because! %d", i), - Action: "During node creation.", - Message: "Oh common, you can do better.", - }, - }) - } - executor.ExecuteActions(ctx, actions) - - return nil - } - return prepare, cleanup, run + return &podEventsScenario{ + totalEvents: count, + log: log, } } + +type podEventsScenario struct { + totalEvents int + log *slog.Logger +} + +func (p *podEventsScenario) Name() string { + return "pod events" +} + +func (p *podEventsScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + // nothing to prepare for this test, pod does not have to exist to create events. + return nil +} + +func (p *podEventsScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + // nothing to clean for this test, events are dropped automatically after certain time. + + return nil +} + +func (p *podEventsScenario) Run(ctx context.Context, namespace string, _ kubernetes.Interface, executor ActionExecutor) error { + p.log.Info(fmt.Sprintf("Starting creating %d events for different pods", p.totalEvents)) + actions := make([]castai.ClusterAction, 0, p.totalEvents) + for i := range p.totalEvents { + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + ActionCreateEvent: &castai.ActionCreateEvent{ + Reporter: "provisioning.cast.ai", + ObjectRef: corev1.ObjectReference{ + Kind: "Pod", + // Actions are executed async on CC, meaning they are acked even if rejected by server. + // This means we can't rely on the test namespace as it'll disappear before all events are processed. + // So we use a namespace that _will_ be there. + Namespace: corev1.NamespaceDefault, + Name: "Dummy-pod", + UID: types.UID(uuid.New().String()), + APIVersion: "v1", + }, + EventTime: time.Now(), + EventType: "Warning", + // Reason is different so events won't be aggregated by CC's event broadcaster. + Reason: fmt.Sprintf("Just because! %d", i), + Action: "During node creation.", + Message: "Oh common, you can do better.", + }, + }) + } + executor.ExecuteActions(ctx, actions) + + return nil +} diff --git a/loadtest/scenarios/scenario.go b/loadtest/scenarios/scenario.go index da045d6b..64c6cd13 100644 --- a/loadtest/scenarios/scenario.go +++ b/loadtest/scenarios/scenario.go @@ -23,13 +23,12 @@ type ActionExecutor interface { ExecuteActions(ctx context.Context, actions []castai.ClusterAction) } -type Preparation func(ctx context.Context, namespace string, clientset kubernetes.Interface) error - -type Cleanup func(ctx context.Context, namespace string, clientset kubernetes.Interface) error - -type TestRun func(ctx context.Context, executor ActionExecutor) error - -type TestScenario func() (Preparation, Cleanup, TestRun) +type TestScenario interface { + Name() string + Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error + Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error + Run(ctx context.Context, namespace string, clientset kubernetes.Interface, executor ActionExecutor) error +} func RunScenario( ctx context.Context, @@ -39,7 +38,7 @@ func RunScenario( clientset kubernetes.Interface, ) error { namespaceForTest := fmt.Sprintf("test-namespace-%d", rand.Int31()) - logger = logger.With("namespace", namespaceForTest) + logger = logger.With("namespace", namespaceForTest, "scenario", scenario.Name()) // Prepare the namespace to run the test in. logger.Info("Preparing namespace for test") @@ -75,16 +74,15 @@ func RunScenario( logger.Info("Namespace created") logger.Info("Starting test scenario") - prepare, cleanup, run := scenario() logger.Info("Running preparation function") - err = prepare(ctx, namespaceForTest, clientset) + err = scenario.Preparation(ctx, namespaceForTest, clientset) if err != nil { return fmt.Errorf("failed to run preparation function: %w", err) } defer func() { logger.Info("Running cleanup function") - err := cleanup(ctx, namespaceForTest, clientset) + err := scenario.Cleanup(ctx, namespaceForTest, clientset) if err != nil { logger.Error("failed ot run cleanup", "error", err) } @@ -94,7 +92,7 @@ func RunScenario( defer cancel() logger.Info("Starting scenario execution") - err = run(scenarioCtx, actioner) + err = scenario.Run(scenarioCtx, namespaceForTest, clientset, actioner) if err != nil { return fmt.Errorf("failed to run scenario: %w", err) } diff --git a/loadtest/scenarios/stuck_drain.go b/loadtest/scenarios/stuck_drain.go index c05d26f0..885040c9 100644 --- a/loadtest/scenarios/stuck_drain.go +++ b/loadtest/scenarios/stuck_drain.go @@ -18,123 +18,145 @@ import ( ) func StuckDrain(nodeCount int, deploymentReplicas int, log *slog.Logger) TestScenario { - return func() (Preparation, Cleanup, TestRun) { - var nodesToDrain []*corev1.Node - - // Creates node per action + 1 deployment and PDB for each node. - prepare := func(ctx context.Context, namespace string, clientset kubernetes.Interface) error { - for i := range nodeCount { - nodeName := fmt.Sprintf("kwok-stuck-drain-%d", i) - log.Info(fmt.Sprintf("Creating node %s", nodeName)) - node := NewKwokNode(KwokConfig{}, nodeName) - - _, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) - if err != nil && !apierrors.IsAlreadyExists(err) { - return fmt.Errorf("failed to create fake node: %w", err) - } - if err != nil && apierrors.IsAlreadyExists(err) { - log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) - } - nodesToDrain = append(nodesToDrain, node) - - log.Info(fmt.Sprintf("Creating deployment on node %s", nodeName)) - deployment, pdb := DeploymentWithStuckPDB(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) - deployment.ObjectMeta.Namespace = namespace - deployment.Spec.Replicas = lo.ToPtr(int32(deploymentReplicas)) - deployment.Spec.Template.Spec.NodeName = nodeName - pdb.ObjectMeta.Namespace = namespace - - _, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("failed to create fake deployment: %w", err) - } - - _, err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Create(ctx, pdb, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("failed to create fake pod disruption budget: %w", err) - } - - // Wait for deployment to become ready, otherwise we might start draining before the pod is up. - progressed := WaitUntil(ctx, 30*time.Second, func() bool { - d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) - if err != nil { - log.Warn("failed to get deployment after creating", "err", err) - return false - } - return d.Status.ReadyReplicas == *d.Spec.Replicas - }) - if !progressed { - return fmt.Errorf("deployment %s did not progress to ready state in time", deployment.Name) - } - } + return &stuckDrainScenario{ + nodeCount: nodeCount, + deploymentReplicas: deploymentReplicas, + log: log, + } +} - return nil - } +type stuckDrainScenario struct { + nodeCount int + deploymentReplicas int + log *slog.Logger - cleanup := func(ctx context.Context, namespace string, clientset kubernetes.Interface) error { - var errs []error - // We iterate through all nodes as they are not deleted with the ns and can leak => so we want do delete as many as possible. - for _, n := range nodesToDrain { - err := clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - log.Warn("failed to delete fake node, will continue with other nodes", "nodeName", n.Name) - errs = append(errs, err) - } - } - if len(errs) > 0 { - return errors.Join(errs...) - } + nodesToDrain []*corev1.Node +} - // We assume no other tests are using the same NS so just delete all. - deploymentsInNS, err := clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{}) - if err != nil { - return fmt.Errorf("failed to list deployments: %w", err) - } +func (s *stuckDrainScenario) Name() string { + return "drain node with stuck pdb" +} - for _, deployment := range deploymentsInNS.Items { - err = clientset.AppsV1().Deployments(namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return fmt.Errorf("failed to delete fake deployment: %w", err) - } - } +func (s *stuckDrainScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + nodes := make([]*corev1.Node, 0, s.nodeCount) + for i := range s.nodeCount { + nodeName := fmt.Sprintf("kwok-stuck-drain-%d", i) + s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) + node := NewKwokNode(KwokConfig{}, nodeName) + + _, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create fake node: %w", err) + } + if err != nil && apierrors.IsAlreadyExists(err) { + s.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) + } + nodes = append(nodes, node) + + s.log.Info(fmt.Sprintf("Creating deployment on node %s", nodeName)) + deployment, pdb := DeploymentWithStuckPDB(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) + deployment.ObjectMeta.Namespace = namespace + deployment.Spec.Replicas = lo.ToPtr(int32(s.deploymentReplicas)) + deployment.Spec.Template.Spec.NodeName = nodeName + pdb.ObjectMeta.Namespace = namespace + + _, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create fake deployment: %w", err) + } + + _, err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Create(ctx, pdb, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create fake pod disruption budget: %w", err) + } - pdbsInNS, err := clientset.PolicyV1().PodDisruptionBudgets(namespace).List(ctx, metav1.ListOptions{}) + // Wait for deployment to become ready, otherwise we might start draining before the pod is up. + progressed := WaitUntil(ctx, 30*time.Second, func() bool { + d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("failed to list pod disruption budgets: %w", err) + s.log.Warn("failed to get deployment after creating", "err", err) + return false } + return d.Status.ReadyReplicas == *d.Spec.Replicas + }) + if !progressed { + return fmt.Errorf("deployment %s did not progress to ready state in time", deployment.Name) + } + } + s.nodesToDrain = nodes - for _, pdb := range pdbsInNS.Items { - err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Delete(ctx, pdb.Name, metav1.DeleteOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return fmt.Errorf("failed to delete fake pod disruption budget: %w", err) - } - } + return nil +} - log.Info("Finished up cleaning nodes for drain, deployments and PDBs.") +func (s *stuckDrainScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + var errs []error + // We iterate through all nodes as they are not deleted with the ns and can leak => so we want do delete as many as possible. + for _, n := range s.nodesToDrain { + if n == nil { + s.log.Error("NODE IS NIL") + continue + } + if clientset == nil { + s.log.Error("CLIENTSET IS NIL") return nil } + err := clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + s.log.Warn("failed to delete fake node, will continue with other nodes", "nodeName", n.Name) + errs = append(errs, err) + } + } + if len(errs) > 0 { + return errors.Join(errs...) + } - run := func(ctx context.Context, executor ActionExecutor) error { - log.Info(fmt.Sprintf("Starting drain action creation with %d nodes", len(nodesToDrain))) - - actions := make([]castai.ClusterAction, 0, len(nodesToDrain)) - for _, node := range nodesToDrain { - actions = append(actions, castai.ClusterAction{ - ID: uuid.NewString(), - CreatedAt: time.Now().UTC(), - ActionDrainNode: &castai.ActionDrainNode{ - NodeName: node.Name, - NodeID: "", - DrainTimeoutSeconds: 60, - Force: false, - }, - }) - } + // We assume no other tests are using the same NS so just delete all. + deploymentsInNS, err := clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list deployments: %w", err) + } - executor.ExecuteActions(ctx, actions) + for _, deployment := range deploymentsInNS.Items { + err = clientset.AppsV1().Deployments(namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete fake deployment: %w", err) + } + } - return nil + pdbsInNS, err := clientset.PolicyV1().PodDisruptionBudgets(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list pod disruption budgets: %w", err) + } + + for _, pdb := range pdbsInNS.Items { + err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Delete(ctx, pdb.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete fake pod disruption budget: %w", err) } - return prepare, cleanup, run } + + s.log.Info("Finished up cleaning nodes for drain, deployments and PDBs.") + return nil +} + +func (s *stuckDrainScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error { + s.log.Info(fmt.Sprintf("Starting drain action creation with %d nodes", len(s.nodesToDrain))) + + actions := make([]castai.ClusterAction, 0, len(s.nodesToDrain)) + for _, node := range s.nodesToDrain { + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + CreatedAt: time.Now().UTC(), + ActionDrainNode: &castai.ActionDrainNode{ + NodeName: node.Name, + NodeID: "", + DrainTimeoutSeconds: 60, + Force: false, + }, + }) + } + + executor.ExecuteActions(ctx, actions) + + return nil } From 3bba12f37f5e5bf66669b49a1074d49bd0d1bd7e Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Fri, 14 Mar 2025 11:39:52 +0200 Subject: [PATCH 10/22] Re-enable events in default run --- cmd/testserver/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/testserver/run.go b/cmd/testserver/run.go index e9b86336..49a910a2 100644 --- a/cmd/testserver/run.go +++ b/cmd/testserver/run.go @@ -55,7 +55,7 @@ func run(ctx context.Context) error { }() testScenarios := []scenarios.TestScenario{ - //scenarios.PodEvents(2000, logger), + scenarios.PodEvents(2000, logger), scenarios.StuckDrain(100, 1, logger), } From 0c6bf4c2b3968a618f47b5c259060a0445b1db32 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Tue, 18 Mar 2025 16:05:37 +0200 Subject: [PATCH 11/22] Linter --- loadtest/castai.go | 4 ++-- loadtest/http.go | 12 +++--------- loadtest/scenarios/scenario.go | 1 + loadtest/scenarios/stuck_drain.go | 3 ++- loadtest/scenarios/util.go | 2 +- 5 files changed, 9 insertions(+), 13 deletions(-) diff --git a/loadtest/castai.go b/loadtest/castai.go index 3d864558..f0f7fcd3 100644 --- a/loadtest/castai.go +++ b/loadtest/castai.go @@ -85,7 +85,7 @@ func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai. c.log.Info(fmt.Sprintf("No actions to return in %v", c.cfg.TimeoutWaitingForActions)) return nil, nil case <-ctx.Done(): - return nil, fmt.Errorf("context done with cause (%v), err (%v)", context.Cause(ctx), ctx.Err()) + return nil, fmt.Errorf("context done with cause (%w), err (%w)", context.Cause(ctx), ctx.Err()) } // Attempt to drain up to max items from the channel. @@ -97,7 +97,7 @@ func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai. // If we haven't received enough items, just flush. return actionsToReturn, nil case <-ctx.Done(): - return nil, fmt.Errorf("context done with cause (%v), err (%v)", context.Cause(ctx), ctx.Err()) + return nil, fmt.Errorf("context done with cause (%w), err (%w)", context.Cause(ctx), ctx.Err()) } } diff --git a/loadtest/http.go b/loadtest/http.go index 2e85f6bb..c0e106ec 100644 --- a/loadtest/http.go +++ b/loadtest/http.go @@ -1,6 +1,7 @@ package loadtest import ( + "context" "encoding/json" "fmt" "net/http" @@ -8,8 +9,7 @@ import ( "github.com/castai/cluster-controller/internal/castai" ) -func NewHttpServer(cfg Config, testServer *CastAITestServer) error { - +func NewHttpServer(ctx context.Context, cfg Config, testServer *CastAITestServer) error { http.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions", func(w http.ResponseWriter, r *http.Request) { result, err := testServer.GetActions(r.Context(), "") if err != nil { @@ -27,8 +27,6 @@ func NewHttpServer(cfg Config, testServer *CastAITestServer) error { http.Error(w, err.Error(), http.StatusInternalServerError) return } - - return }) http.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions/{action_id}/ack", func(w http.ResponseWriter, r *http.Request) { @@ -45,8 +43,6 @@ func NewHttpServer(cfg Config, testServer *CastAITestServer) error { http.Error(w, err.Error(), http.StatusInternalServerError) return } - - return }) http.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions/logs", func(w http.ResponseWriter, r *http.Request) { @@ -62,10 +58,8 @@ func NewHttpServer(cfg Config, testServer *CastAITestServer) error { http.Error(w, err.Error(), http.StatusInternalServerError) return } - - return }) + //nolint:gosec // Missing timeouts are not a real issue here. return http.ListenAndServe(fmt.Sprintf(":%d", cfg.Port), nil) - } diff --git a/loadtest/scenarios/scenario.go b/loadtest/scenarios/scenario.go index 64c6cd13..c67a7298 100644 --- a/loadtest/scenarios/scenario.go +++ b/loadtest/scenarios/scenario.go @@ -37,6 +37,7 @@ func RunScenario( logger *slog.Logger, clientset kubernetes.Interface, ) error { + //nolint:gosec // No point to use crypto/rand. namespaceForTest := fmt.Sprintf("test-namespace-%d", rand.Int31()) logger = logger.With("namespace", namespaceForTest, "scenario", scenario.Name()) diff --git a/loadtest/scenarios/stuck_drain.go b/loadtest/scenarios/stuck_drain.go index 885040c9..d4758e27 100644 --- a/loadtest/scenarios/stuck_drain.go +++ b/loadtest/scenarios/stuck_drain.go @@ -17,7 +17,7 @@ import ( "github.com/castai/cluster-controller/internal/castai" ) -func StuckDrain(nodeCount int, deploymentReplicas int, log *slog.Logger) TestScenario { +func StuckDrain(nodeCount, deploymentReplicas int, log *slog.Logger) TestScenario { return &stuckDrainScenario{ nodeCount: nodeCount, deploymentReplicas: deploymentReplicas, @@ -56,6 +56,7 @@ func (s *stuckDrainScenario) Preparation(ctx context.Context, namespace string, s.log.Info(fmt.Sprintf("Creating deployment on node %s", nodeName)) deployment, pdb := DeploymentWithStuckPDB(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) deployment.ObjectMeta.Namespace = namespace + //nolint:gosec // Not afraid of overflow here. deployment.Spec.Replicas = lo.ToPtr(int32(s.deploymentReplicas)) deployment.Spec.Template.Spec.NodeName = nodeName pdb.ObjectMeta.Namespace = namespace diff --git a/loadtest/scenarios/util.go b/loadtest/scenarios/util.go index 900304f3..8ad3f034 100644 --- a/loadtest/scenarios/util.go +++ b/loadtest/scenarios/util.go @@ -13,7 +13,7 @@ func WaitUntil(ctx context.Context, duration time.Duration, condition func() boo return false default: } - if time.Now().Sub(start) > duration { + if time.Since(start) > duration { return false } if condition() { From 628d25b95de3109685cb8c57a1179462270a0baf Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 20 Mar 2025 09:31:28 +0200 Subject: [PATCH 12/22] Remove SendAKSINitData --- loadtest/castai.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/loadtest/castai.go b/loadtest/castai.go index f0f7fcd3..2f613aca 100644 --- a/loadtest/castai.go +++ b/loadtest/castai.go @@ -118,10 +118,6 @@ func (c *CastAITestServer) AckAction(ctx context.Context, actionID string, req * return nil } -func (c *CastAITestServer) SendAKSInitData(ctx context.Context, req *castai.AKSInitDataRequest) error { - return fmt.Errorf("not implemented; obsolete") -} - func (c *CastAITestServer) SendLog(ctx context.Context, e *castai.LogEntry) error { //var slogLvl slog.Level //switch e.Level { From c88c5af903b6a002be548307d3574ce0bc28799b Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 20 Mar 2025 10:07:08 +0200 Subject: [PATCH 13/22] Remove dependency on main config file and use loadtest one. --- cmd/testserver/run.go | 58 +++++++++++++++++++++++++++++-------------- loadtest/config.go | 30 ++++++++++++++++++++++ 2 files changed, 70 insertions(+), 18 deletions(-) diff --git a/cmd/testserver/run.go b/cmd/testserver/run.go index 49a910a2..d16ef616 100644 --- a/cmd/testserver/run.go +++ b/cmd/testserver/run.go @@ -4,16 +4,15 @@ import ( "context" "errors" "fmt" - "io" "log/slog" "os" "sync" "time" - "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" - "github.com/castai/cluster-controller/internal/config" "github.com/castai/cluster-controller/loadtest" "github.com/castai/cluster-controller/loadtest/scenarios" ) @@ -21,9 +20,8 @@ import ( func run(ctx context.Context) error { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) // TODO: Export as envVars - cfg := loadtest.Config{ - Port: 8080, - } + cfg := loadtest.GetConfig() + logger.Info(fmt.Sprintf("%v", cfg)) logger.Info("creating test server") // TODO: Defaults... testServer := loadtest.NewTestServer(logger, loadtest.TestServerConfig{ @@ -32,17 +30,9 @@ func run(ctx context.Context) error { TimeoutWaitingForActions: 60 * time.Second, }) - // Not ideal but fast - discardLogger := logrus.New() - discardLogger.Out = io.Discard - restConfig, err := config.RetrieveKubeConfig(discardLogger) - if err != nil { - return fmt.Errorf("failed to get kubeconfig: %w", err) - } - - clientSet, err := kubernetes.NewForConfig(restConfig) + clientSet, err := createK8SClient(cfg, logger) if err != nil { - return fmt.Errorf("obtaining kubernetes clientset: %w", err) + return err } go func() { @@ -55,8 +45,8 @@ func run(ctx context.Context) error { }() testScenarios := []scenarios.TestScenario{ - scenarios.PodEvents(2000, logger), - scenarios.StuckDrain(100, 1, logger), + scenarios.PodEvents(100000, logger), + //scenarios.StuckDrain(100, 1, logger), } var wg sync.WaitGroup @@ -87,3 +77,35 @@ func run(ctx context.Context) error { // TODO: Wait for server channel to be empty as well return errors.Join(receivedErrors...) } + +func createK8SClient(cfg loadtest.Config, logger *slog.Logger) (*kubernetes.Clientset, error) { + if cfg.KubeConfig == "" { + logger.Info("Using in-cluster configuration") + restConfig, err := rest.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("error creating in-cluster config: %w", err) + } + clientSet, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("obtaining kubernetes clientset: %w", err) + } + return clientSet, nil + } + + logger.Info(fmt.Sprintf("Using kubeconfig from %q", cfg.KubeConfig)) + data, err := os.ReadFile(cfg.KubeConfig) + if err != nil { + return nil, fmt.Errorf("reading kubeconfig at %s: %w", cfg.KubeConfig, err) + } + + restConfig, err := clientcmd.RESTConfigFromKubeConfig(data) + if err != nil { + return nil, fmt.Errorf("creating rest config from %q: %w", cfg.KubeConfig, err) + } + + clientSet, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("obtaining kubernetes clientset: %w", err) + } + return clientSet, nil +} diff --git a/loadtest/config.go b/loadtest/config.go index 2f027609..cd6bdb89 100644 --- a/loadtest/config.go +++ b/loadtest/config.go @@ -1,12 +1,19 @@ package loadtest import ( + "fmt" "time" + + "github.com/spf13/viper" ) // Config for the HTTP server. type Config struct { + // Port where the mock server to listen on. Port int + + // KubeConfig can point to a kubeconfig file. If empty, InCluster client will be assumed. + KubeConfig string } // TestServerConfig has settings for the mock server instance. @@ -19,3 +26,26 @@ type TestServerConfig struct { // BufferSize controls the input channel size. BufferSize int } + +var singletonCfg *Config + +func GetConfig() Config { + // not thread safe, but you will not put this under concurrent pressure, right? + if singletonCfg != nil { + return *singletonCfg + } + + _ = viper.BindEnv("port", "PORT") + _ = viper.BindEnv("kubeconfig", "KUBECONFIG") + + singletonCfg = &Config{} + if err := viper.Unmarshal(&singletonCfg); err != nil { + panic(fmt.Errorf("parsing configuration: %w", err)) + } + + if singletonCfg.Port == 0 { + panic(fmt.Errorf("test server port must be set")) + } + + return *singletonCfg +} From 80433c66210b0b3a0ea88754b1256937b91b36f9 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 20 Mar 2025 10:15:16 +0200 Subject: [PATCH 14/22] Quick docs on running locally. --- README.md | 3 +++ loadtest/README.md | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 loadtest/README.md diff --git a/README.md b/README.md index 9981360d..657277ab 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,9 @@ Option 1. Deploy controller in Kind cluster. --set clusterID="your-cluster-id" | kubectl apply -f - -n castai-agent ``` +### Load tests +See [docs](loadtest/README.md) + ## Community - [Twitter](https://twitter.com/cast_ai) diff --git a/loadtest/README.md b/loadtest/README.md new file mode 100644 index 00000000..9628522e --- /dev/null +++ b/loadtest/README.md @@ -0,0 +1,28 @@ +# Load testing Cluster controller + +Load test requires 3 components: +- Test server that simulates cluster-hub and the scenarios. +- Kwok controller to simulate nodes/pods +- Cluster controller itself. + +Right now this is extremely basic and you have to run those manually locally. + +Start kwok: +``` + kwok --kubeconfig=~/.kube/config \ + --manage-all-nodes=false \ + --manage-nodes-with-annotation-selector=kwok.x-k8s.io/node=fake-node \ + --node-lease-duration-seconds=40 \ + --cidr=10.0.0.1/24 \ + --node-ip=10.0.0.1 +``` + +Run the test server on port 8080 against your current kubeconfig context: +``` +KUBECONFIG=~/.kube/config PORT=8080 go run . test-server +``` + +After starting, start cluster controller with some dummy values and point it to the test server: +``` +API_KEY=dummy API_URL=http://localhost:8080 CLUSTER_ID=D30A163C-C5DF-4CC8-985C-D1449398295E KUBECONFIG=~/.kube/config LOG_LEVEL=4 LEADER_ELECTION_NAMESPACE=default METRICS_ENABLED=true go run . +``` From 976b82d1d18a34dc9f1bc91f1e1183923a15ba75 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 20 Mar 2025 12:53:46 +0200 Subject: [PATCH 15/22] Use separate context for cleanups --- loadtest/scenarios/scenario.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/loadtest/scenarios/scenario.go b/loadtest/scenarios/scenario.go index c67a7298..c3cd6766 100644 --- a/loadtest/scenarios/scenario.go +++ b/loadtest/scenarios/scenario.go @@ -61,8 +61,12 @@ func RunScenario( return fmt.Errorf("failed to create namespace %v: %w", namespaceForTest, err) } defer func() { + // Cleanup uses different context so it runs even when the overall one is already cancelled + ctxForCleanup, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + logger.Info("Deleting namespace for test") - err := clientset.CoreV1().Namespaces().Delete(ctx, namespaceForTest, metav1.DeleteOptions{ + err := clientset.CoreV1().Namespaces().Delete(ctxForCleanup, namespaceForTest, metav1.DeleteOptions{ GracePeriodSeconds: lo.ToPtr(int64(0)), PropagationPolicy: lo.ToPtr(metav1.DeletePropagationBackground), }) @@ -82,8 +86,12 @@ func RunScenario( return fmt.Errorf("failed to run preparation function: %w", err) } defer func() { + // Cleanup uses different context so it runs even when the overall one is already cancelled + ctxForCleanup, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + logger.Info("Running cleanup function") - err := scenario.Cleanup(ctx, namespaceForTest, clientset) + err := scenario.Cleanup(ctxForCleanup, namespaceForTest, clientset) if err != nil { logger.Error("failed ot run cleanup", "error", err) } From cc09dda1c7284276ae3e6956201dfb484e42bf05 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 20 Mar 2025 12:54:29 +0200 Subject: [PATCH 16/22] Remove buffer config --- cmd/testserver/run.go | 12 +++++------- loadtest/castai.go | 2 +- loadtest/config.go | 2 -- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/cmd/testserver/run.go b/cmd/testserver/run.go index d16ef616..3c75450d 100644 --- a/cmd/testserver/run.go +++ b/cmd/testserver/run.go @@ -19,14 +19,11 @@ import ( func run(ctx context.Context) error { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) - // TODO: Export as envVars cfg := loadtest.GetConfig() - logger.Info(fmt.Sprintf("%v", cfg)) logger.Info("creating test server") - // TODO: Defaults... + testServer := loadtest.NewTestServer(logger, loadtest.TestServerConfig{ - BufferSize: 0, - MaxActionsPerCall: 500, + MaxActionsPerCall: 1000, TimeoutWaitingForActions: 60 * time.Second, }) @@ -45,8 +42,8 @@ func run(ctx context.Context) error { }() testScenarios := []scenarios.TestScenario{ - scenarios.PodEvents(100000, logger), - //scenarios.StuckDrain(100, 1, logger), + scenarios.PodEvents(5000, logger), + //scenarios.StuckDrain(100, 60, logger), } var wg sync.WaitGroup @@ -65,6 +62,7 @@ func run(ctx context.Context) error { logger.Info("Waiting for test scenarios to finish") wg.Wait() + close(errs) receivedErrors := make([]error, 0) for err := range errs { diff --git a/loadtest/castai.go b/loadtest/castai.go index 2f613aca..53c2ba08 100644 --- a/loadtest/castai.go +++ b/loadtest/castai.go @@ -28,7 +28,7 @@ type CastAITestServer struct { func NewTestServer(logger *slog.Logger, cfg TestServerConfig) *CastAITestServer { return &CastAITestServer{ log: logger, - actionsPushChannel: make(chan castai.ClusterAction, cfg.BufferSize), + actionsPushChannel: make(chan castai.ClusterAction, 10000), cfg: cfg, actionsLog: make(map[string]chan string), } diff --git a/loadtest/config.go b/loadtest/config.go index cd6bdb89..61cfbb5b 100644 --- a/loadtest/config.go +++ b/loadtest/config.go @@ -23,8 +23,6 @@ type TestServerConfig struct { // TimeoutWaitingForActions controls how long to wait for at least 1 action to appear on server side. // This mimics CH behavior of not returning early if there are no pending actions and keeping the request "running". TimeoutWaitingForActions time.Duration - // BufferSize controls the input channel size. - BufferSize int } var singletonCfg *Config From a8393f286310d90f5cae06e3158839093c7d7995 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 20 Mar 2025 12:55:35 +0200 Subject: [PATCH 17/22] Remove unused commented code --- loadtest/castai.go | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/loadtest/castai.go b/loadtest/castai.go index 53c2ba08..40e51ae1 100644 --- a/loadtest/castai.go +++ b/loadtest/castai.go @@ -119,29 +119,7 @@ func (c *CastAITestServer) AckAction(ctx context.Context, actionID string, req * } func (c *CastAITestServer) SendLog(ctx context.Context, e *castai.LogEntry) error { - //var slogLvl slog.Level - //switch e.Level { - //case "INFO": - // slogLvl = slog.LevelInfo - //case "DEBUG": - // slogLvl = slog.LevelDebug - //case "WARN": - // slogLvl = slog.LevelWarn - //case "ERROR": - // slogLvl = slog.LevelError - //default: - // slogLvl = 100 // Some arbitrary value - //} - // - //attrs := make([]slog.Attr, 0, len(e.Fields)) - //for k, v := range e.Fields { - // attrs = append(attrs, slog.Any(k, v)) - //} - // - //msg := fmt.Sprintf("log from controller: %s", e.Message) - // - //c.log.LogAttrs(ctx, slogLvl, msg, attrs...) - + // No-op for now, maybe track metrics in the future? return nil } From 2f315edd069f3407390773b5a780e7e5abe25f9c Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 20 Mar 2025 13:12:04 +0200 Subject: [PATCH 18/22] Defer cleanup before running prepare so it cleans up half-baked prepares. --- loadtest/scenarios/scenario.go | 15 +++++++++++---- loadtest/scenarios/stuck_drain.go | 16 +++++----------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/loadtest/scenarios/scenario.go b/loadtest/scenarios/scenario.go index c3cd6766..c59770a9 100644 --- a/loadtest/scenarios/scenario.go +++ b/loadtest/scenarios/scenario.go @@ -25,7 +25,12 @@ type ActionExecutor interface { type TestScenario interface { Name() string + // Preparation should create any necessary resources in the cluster for the test so it runs in realistic env. Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error + // Cleanup should delete any items created by the preparation or the test itself. + // It might be called even if Preparation or Run did not complete so it should handle those cases gracefully. + // The scenario's namespace is deleted at the end but ideally scenarios delete their resources as well, + // otherwise namespace deletion can take very long to propagate. Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error Run(ctx context.Context, namespace string, clientset kubernetes.Interface, executor ActionExecutor) error } @@ -81,10 +86,7 @@ func RunScenario( logger.Info("Starting test scenario") logger.Info("Running preparation function") - err = scenario.Preparation(ctx, namespaceForTest, clientset) - if err != nil { - return fmt.Errorf("failed to run preparation function: %w", err) - } + // We defer the cleanup before running preparation or run because each can "fail" in the middle and leave hanging resources. defer func() { // Cleanup uses different context so it runs even when the overall one is already cancelled ctxForCleanup, cancel := context.WithTimeout(context.Background(), 5*time.Minute) @@ -97,6 +99,11 @@ func RunScenario( } }() + err = scenario.Preparation(ctx, namespaceForTest, clientset) + if err != nil { + return fmt.Errorf("failed to run preparation function: %w", err) + } + scenarioCtx, cancel := context.WithTimeout(ctx, 30*time.Minute) defer cancel() diff --git a/loadtest/scenarios/stuck_drain.go b/loadtest/scenarios/stuck_drain.go index d4758e27..96d15c3d 100644 --- a/loadtest/scenarios/stuck_drain.go +++ b/loadtest/scenarios/stuck_drain.go @@ -38,7 +38,7 @@ func (s *stuckDrainScenario) Name() string { } func (s *stuckDrainScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { - nodes := make([]*corev1.Node, 0, s.nodeCount) + s.nodesToDrain = make([]*corev1.Node, 0, s.nodeCount) for i := range s.nodeCount { nodeName := fmt.Sprintf("kwok-stuck-drain-%d", i) s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) @@ -51,7 +51,7 @@ func (s *stuckDrainScenario) Preparation(ctx context.Context, namespace string, if err != nil && apierrors.IsAlreadyExists(err) { s.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) } - nodes = append(nodes, node) + s.nodesToDrain = append(s.nodesToDrain, node) s.log.Info(fmt.Sprintf("Creating deployment on node %s", nodeName)) deployment, pdb := DeploymentWithStuckPDB(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) @@ -84,7 +84,6 @@ func (s *stuckDrainScenario) Preparation(ctx context.Context, namespace string, return fmt.Errorf("deployment %s did not progress to ready state in time", deployment.Name) } } - s.nodesToDrain = nodes return nil } @@ -93,14 +92,7 @@ func (s *stuckDrainScenario) Cleanup(ctx context.Context, namespace string, clie var errs []error // We iterate through all nodes as they are not deleted with the ns and can leak => so we want do delete as many as possible. for _, n := range s.nodesToDrain { - if n == nil { - s.log.Error("NODE IS NIL") - continue - } - if clientset == nil { - s.log.Error("CLIENTSET IS NIL") - return nil - } + s.log.Info(fmt.Sprintf("Deleting node %s", n.Name)) err := clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { s.log.Warn("failed to delete fake node, will continue with other nodes", "nodeName", n.Name) @@ -112,6 +104,8 @@ func (s *stuckDrainScenario) Cleanup(ctx context.Context, namespace string, clie } // We assume no other tests are using the same NS so just delete all. + s.log.Info("Deleting deployments from stuck drain nodes") + deploymentsInNS, err := clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{}) if err != nil { return fmt.Errorf("failed to list deployments: %w", err) From 9f66e96cd7ad72700442c4fd6b9ea196dbf58f64 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 20 Mar 2025 14:46:28 +0200 Subject: [PATCH 19/22] Logging adjustment. --- loadtest/scenarios/stuck_drain.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/loadtest/scenarios/stuck_drain.go b/loadtest/scenarios/stuck_drain.go index 96d15c3d..905d16de 100644 --- a/loadtest/scenarios/stuck_drain.go +++ b/loadtest/scenarios/stuck_drain.go @@ -104,14 +104,13 @@ func (s *stuckDrainScenario) Cleanup(ctx context.Context, namespace string, clie } // We assume no other tests are using the same NS so just delete all. - s.log.Info("Deleting deployments from stuck drain nodes") - deploymentsInNS, err := clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{}) if err != nil { return fmt.Errorf("failed to list deployments: %w", err) } for _, deployment := range deploymentsInNS.Items { + s.log.Info(fmt.Sprintf("Deleting deployment %s", deployment.Name)) err = clientset.AppsV1().Deployments(namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("failed to delete fake deployment: %w", err) @@ -124,6 +123,7 @@ func (s *stuckDrainScenario) Cleanup(ctx context.Context, namespace string, clie } for _, pdb := range pdbsInNS.Items { + s.log.Info(fmt.Sprintf("Deleting PDB %s", pdb.Name)) err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Delete(ctx, pdb.Name, metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("failed to delete fake pod disruption budget: %w", err) From e5772688ea59e771f16341012b393f3b3625fca6 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 20 Mar 2025 14:57:55 +0200 Subject: [PATCH 20/22] Small adjustments --- cmd/testserver/run.go | 4 ++-- loadtest/castai.go | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/testserver/run.go b/cmd/testserver/run.go index 3c75450d..f142ac3f 100644 --- a/cmd/testserver/run.go +++ b/cmd/testserver/run.go @@ -42,8 +42,9 @@ func run(ctx context.Context) error { }() testScenarios := []scenarios.TestScenario{ - scenarios.PodEvents(5000, logger), + //scenarios.PodEvents(5000, logger), //scenarios.StuckDrain(100, 60, logger), + scenarios.StuckDrain(10, 1, logger), } var wg sync.WaitGroup @@ -72,7 +73,6 @@ func run(ctx context.Context) error { } logger.Info(fmt.Sprintf("All test scenarios are done, received (%d) errors, exiting", len(receivedErrors))) - // TODO: Wait for server channel to be empty as well return errors.Join(receivedErrors...) } diff --git a/loadtest/castai.go b/loadtest/castai.go index 40e51ae1..20c5423c 100644 --- a/loadtest/castai.go +++ b/loadtest/castai.go @@ -57,7 +57,7 @@ func (c *CastAITestServer) ExecuteActions(ctx context.Context, actions []castai. for { select { case <-ctx.Done(): - // TODO: Clean up all actions? + c.log.Info(fmt.Sprintf("Received signal to stop finished with cause (%q) and err (%v). Closing executor.", context.Cause(ctx), ctx.Err())) return case finishedAction := <-ownerChannel: c.removeActionFromStore(finishedAction) @@ -89,11 +89,12 @@ func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai. } // Attempt to drain up to max items from the channel. - for len(actionsToReturn) <= c.cfg.MaxActionsPerCall { + for len(actionsToReturn) < c.cfg.MaxActionsPerCall { select { case x := <-c.actionsPushChannel: actionsToReturn = append(actionsToReturn, &x) case <-time.After(50 * time.Millisecond): + c.log.Info(fmt.Sprintf("Returning %d actions for processing", len(actionsToReturn))) // If we haven't received enough items, just flush. return actionsToReturn, nil case <-ctx.Done(): @@ -101,6 +102,7 @@ func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai. } } + c.log.Info(fmt.Sprintf("Returning %d actions for processing", len(actionsToReturn))) return actionsToReturn, nil } From dc12745452e563b417009bea6ae0f6d1a9278329 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 20 Mar 2025 15:10:17 +0200 Subject: [PATCH 21/22] Linter fix. --- cmd/testserver/run.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/testserver/run.go b/cmd/testserver/run.go index f142ac3f..a658665f 100644 --- a/cmd/testserver/run.go +++ b/cmd/testserver/run.go @@ -42,8 +42,8 @@ func run(ctx context.Context) error { }() testScenarios := []scenarios.TestScenario{ - //scenarios.PodEvents(5000, logger), - //scenarios.StuckDrain(100, 60, logger), + // scenarios.PodEvents(5000, logger), + // scenarios.StuckDrain(100, 60, logger), scenarios.StuckDrain(10, 1, logger), } From bb5195f9fbc4a347f4453b74310dd600f52be537 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 20 Mar 2025 15:57:31 +0200 Subject: [PATCH 22/22] Remove empty func --- loadtest/castai.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/loadtest/castai.go b/loadtest/castai.go index 20c5423c..71d5e4b6 100644 --- a/loadtest/castai.go +++ b/loadtest/castai.go @@ -34,10 +34,6 @@ func NewTestServer(logger *slog.Logger, cfg TestServerConfig) *CastAITestServer } } -func (c *CastAITestServer) Shutdown() { - // Drain -} - // ExecuteActions pushes the list of actions to the queue for cluster controller to process. // This method returns when all actions are acked or context is cancelled. func (c *CastAITestServer) ExecuteActions(ctx context.Context, actions []castai.ClusterAction) {