Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 45 additions & 22 deletions cmd/testserver/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@ import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"
"sync"
"time"

"github.com/sirupsen/logrus"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/flowcontrol"

"github.com/castai/cluster-controller/internal/helm"
"github.com/castai/cluster-controller/loadtest"
"github.com/castai/cluster-controller/loadtest/scenarios"
)
Expand All @@ -27,10 +33,11 @@ func run(ctx context.Context) error {
TimeoutWaitingForActions: 60 * time.Second,
})

clientSet, err := createK8SClient(cfg, logger)
clientSet, dynamicClient, apiExtClient, helmClient, err := createK8SClients(cfg, logger)
if err != nil {
return err
}
logger.Info(fmt.Sprintf("Created %d clients", len([]any{clientSet, dynamicClient, apiExtClient, helmClient})))

go func() {
logger.Info("Starting HTTP server for test")
Expand All @@ -41,10 +48,10 @@ func run(ctx context.Context) error {
}
}()

// Choose scenarios below by adding/removing/etc. instances of scenarios.XXX()
// All scenarios in the list run in parallel (but not necessarily at the same time if preparation takes different time).
testScenarios := []scenarios.TestScenario{
// scenarios.PodEvents(5000, logger),
// scenarios.StuckDrain(100, 60, logger),
scenarios.StuckDrain(10, 1, logger),
scenarios.CheckNodeDeletedStuck(300, logger),
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -76,34 +83,50 @@ func run(ctx context.Context) error {
return errors.Join(receivedErrors...)
}

func createK8SClient(cfg loadtest.Config, logger *slog.Logger) (*kubernetes.Clientset, error) {
if cfg.KubeConfig == "" {
logger.Info("Using in-cluster configuration")
restConfig, err := rest.InClusterConfig()
func createK8SClients(cfg loadtest.Config, logger *slog.Logger) (*kubernetes.Clientset, *dynamic.DynamicClient, *apiextensionsclientset.Clientset, helm.Client, error) {
rateLimiter := flowcontrol.NewTokenBucketRateLimiter(20, 50)

var restConfig *rest.Config
var err error

switch {
case cfg.KubeConfig != "":
logger.Info(fmt.Sprintf("Using kubeconfig from %q", cfg.KubeConfig))
data, err := os.ReadFile(cfg.KubeConfig)
if err != nil {
return nil, fmt.Errorf("error creating in-cluster config: %w", err)
return nil, nil, nil, nil, fmt.Errorf("reading kubeconfig at %s: %w", cfg.KubeConfig, err)
}
clientSet, err := kubernetes.NewForConfig(restConfig)

restConfig, err = clientcmd.RESTConfigFromKubeConfig(data)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("creating rest config from %q: %w", cfg.KubeConfig, err)
}
default:
logger.Info("Using in-cluster configuration")
restConfig, err = rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("obtaining kubernetes clientset: %w", err)
return nil, nil, nil, nil, fmt.Errorf("error creating in-cluster config: %w", err)
}
return clientSet, nil
}

logger.Info(fmt.Sprintf("Using kubeconfig from %q", cfg.KubeConfig))
data, err := os.ReadFile(cfg.KubeConfig)
restConfig.RateLimiter = rateLimiter

clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("reading kubeconfig at %s: %w", cfg.KubeConfig, err)
return nil, nil, nil, nil, fmt.Errorf("obtaining kubernetes clientset: %w", err)
}

restConfig, err := clientcmd.RESTConfigFromKubeConfig(data)
dynamicClient, err := dynamic.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("creating rest config from %q: %w", cfg.KubeConfig, err)
return nil, nil, nil, nil, fmt.Errorf("obtaining dynamic client: %w", err)
}

clientSet, err := kubernetes.NewForConfig(restConfig)
apiextensionsClient, err := apiextensionsclientset.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("obtaining kubernetes clientset: %w", err)
return nil, nil, nil, nil, fmt.Errorf("obtaining apiextensions client: %w", err)
}
return clientSet, nil

discard := logrus.New()
discard.Out = io.Discard
helmClient := helm.NewClient(discard, helm.NewChartLoader(discard), restConfig)

return clientSet, dynamicClient, apiextensionsClient, helmClient, nil
}
121 changes: 121 additions & 0 deletions loadtest/scenarios/check_node_deleted_stuck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package scenarios

import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"

"github.com/google/uuid"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/internal/castai"
)

// CheckNodeDeletedStuck simulates a case where the node is not deleted so the checker gets stuck.
func CheckNodeDeletedStuck(nodeCount int, log *slog.Logger) TestScenario {
return &checkNodeDeletedStuckScenario{
nodeCount: nodeCount,
log: log,
}
}

type checkNodeDeletedStuckScenario struct {
nodeCount int
log *slog.Logger

nodes []*corev1.Node
}

func (s *checkNodeDeletedStuckScenario) Name() string {
return "check node deleted"
}

func (s *checkNodeDeletedStuckScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error {
s.nodes = make([]*corev1.Node, 0, s.nodeCount)

var lock sync.Mutex
errGroup, ctx := errgroup.WithContext(ctx)

for i := range s.nodeCount {
errGroup.Go(func() error {
nodeName := fmt.Sprintf("kwok-check-deleted-%d", i)
s.log.Info(fmt.Sprintf("Creating node %s", nodeName))
node := NewKwokNode(KwokConfig{}, nodeName)

_, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create fake node: %w", err)
}
if err != nil && apierrors.IsAlreadyExists(err) {
s.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName)
}
lock.Lock()
s.nodes = append(s.nodes, node)
lock.Unlock()

return nil
})
}

return errGroup.Wait()
}

func (s *checkNodeDeletedStuckScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error {
var lock sync.Mutex
var errs []error
var wg sync.WaitGroup

wg.Add(len(s.nodes))
// We iterate through all nodes as they are not deleted with the ns and can leak => so we want to delete as many as possible.
for _, n := range s.nodes {
go func() {
defer wg.Done()

s.log.Info(fmt.Sprintf("Deleting node %s", n.Name))
err := clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
s.log.Warn("failed to delete fake node, will continue with other nodes", "nodeName", n.Name)
lock.Lock()
errs = append(errs, err)
lock.Unlock()
}
}()
}

wg.Wait()

if len(errs) > 0 {
return errors.Join(errs...)
}

s.log.Info("Finished up cleaning nodes for status check.")
return nil
}

func (s *checkNodeDeletedStuckScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error {
s.log.Info(fmt.Sprintf("Starting check node deleted action with %d nodes", len(s.nodes)))

actions := make([]castai.ClusterAction, 0, len(s.nodes))
// Note: there is no code that should delete the node so each action should fail with timeout
// -> this puts more load than "expected" to simulate such edge case.
for _, node := range s.nodes {
actions = append(actions, castai.ClusterAction{
ID: uuid.NewString(),
CreatedAt: time.Now().UTC(),
ActionCheckNodeDeleted: &castai.ActionCheckNodeDeleted{
NodeName: node.Name,
},
})
}

executor.ExecuteActions(ctx, actions)

return nil
}
119 changes: 119 additions & 0 deletions loadtest/scenarios/check_node_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package scenarios

import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"

"github.com/google/uuid"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/internal/castai"
)

func CheckNodeStatus(nodeCount int, log *slog.Logger) TestScenario {
return &checkNodeStatusScenario{
nodeCount: nodeCount,
log: log,
}
}

type checkNodeStatusScenario struct {
nodeCount int
log *slog.Logger

nodes []*corev1.Node
}

func (s *checkNodeStatusScenario) Name() string {
return "check node status"
}

func (s *checkNodeStatusScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error {
s.nodes = make([]*corev1.Node, 0, s.nodeCount)

var lock sync.Mutex
errGroup, ctx := errgroup.WithContext(ctx)

for i := range s.nodeCount {
errGroup.Go(func() error {
nodeName := fmt.Sprintf("kwok-check-status-%d", i)
s.log.Info(fmt.Sprintf("Creating node %s", nodeName))
node := NewKwokNode(KwokConfig{}, nodeName)

_, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create fake node: %w", err)
}
if err != nil && apierrors.IsAlreadyExists(err) {
s.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName)
}
lock.Lock()
s.nodes = append(s.nodes, node)
lock.Unlock()

return nil
})
}

return errGroup.Wait()
}

func (s *checkNodeStatusScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error {
var lock sync.Mutex
var errs []error
var wg sync.WaitGroup

wg.Add(len(s.nodes))
// We iterate through all nodes as they are not deleted with the ns and can leak => so we want to delete as many as possible.
for _, n := range s.nodes {
go func() {
defer wg.Done()

s.log.Info(fmt.Sprintf("Deleting node %s", n.Name))
err := clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
s.log.Warn("failed to delete fake node, will continue with other nodes", "nodeName", n.Name)
lock.Lock()
errs = append(errs, err)
lock.Unlock()
}
}()
}

wg.Wait()

if len(errs) > 0 {
return errors.Join(errs...)
}

s.log.Info("Finished up cleaning nodes for status check.")
return nil
}

func (s *checkNodeStatusScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error {
s.log.Info(fmt.Sprintf("Starting check node status action with %d nodes", len(s.nodes)))

actions := make([]castai.ClusterAction, 0, len(s.nodes))
for _, node := range s.nodes {
actions = append(actions, castai.ClusterAction{
ID: uuid.NewString(),
CreatedAt: time.Now().UTC(),
ActionCheckNodeStatus: &castai.ActionCheckNodeStatus{
NodeName: node.Name,
NodeStatus: castai.ActionCheckNodeStatus_READY,
},
})
}

executor.ExecuteActions(ctx, actions)

return nil
}
Loading
Loading