Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ data:
SKIP_SSL_VERIFY: "#{SKIP_SSL_VERIFY}"
CUSTOM_TLS_CERT: "#{CUSTOM_TLS_CERT}"
IS_INFRA_CONFIRMED: #{IS_INFRA_CONFIRMED}
K8S_CLIENT_QPS: "50"
K8S_CLIENT_BURST: "100"
K8S_CLIENT_TIMEOUT: "30"
COMPONENTS: |
DEPLOYMENTS: #{INFRA_DEPLOYMENTS}
---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ data:
SKIP_SSL_VERIFY: "#{SKIP_SSL_VERIFY}"
CUSTOM_TLS_CERT: "#{CUSTOM_TLS_CERT}"
IS_INFRA_CONFIRMED: #{IS_INFRA_CONFIRMED}
K8S_CLIENT_QPS: "50"
K8S_CLIENT_BURST: "100"
K8S_CLIENT_TIMEOUT: "30"
COMPONENTS: |
DEPLOYMENTS: #{INFRA_DEPLOYMENTS}
---
Expand Down
6 changes: 4 additions & 2 deletions chaoscenter/subscriber/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# BUILD STAGE
FROM golang:1.24 AS builder
FROM golang:1.26 AS builder

LABEL maintainer="LitmusChaos"

Expand All @@ -17,12 +17,14 @@ RUN CGO_ENABLED=0 go build -o /output/subscriber -v

# Packaging stage
# Use RedHat UBI minimal image as base
FROM registry.access.redhat.com/ubi9/ubi-minimal:9.6
FROM registry.access.redhat.com/ubi9/ubi-minimal:9.7

LABEL maintainer="LitmusChaos"

ENV APP_DIR="/litmus"

RUN microdnf update -y && microdnf clean all

COPY --from=builder /output/subscriber $APP_DIR/
RUN chown 65534:0 $APP_DIR/subscriber && chmod 755 $APP_DIR/subscriber

Expand Down
2 changes: 1 addition & 1 deletion chaoscenter/subscriber/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module subscriber

go 1.24.0
go 1.26.0

require (
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24
Expand Down
76 changes: 69 additions & 7 deletions chaoscenter/subscriber/pkg/k8s/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package k8s

import (
"os"
"strconv"
"sync"
"time"

wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
v1alpha12 "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"k8s.io/client-go/discovery"
Expand All @@ -10,24 +15,81 @@ import (
"k8s.io/client-go/tools/clientcmd"
)

var KubeConfig *string
const (
// Default values for K8s client configuration
DefaultQPS = 50.0
DefaultBurst = 100
DefaultTimeout = 30 // seconds
)

// getEnvAsFloat reads a float64 from the given env variable, returning defaultVal if not set or invalid
func getEnvAsFloat(name string, defaultVal float64) float64 {
valueStr := os.Getenv(name)
if value, err := strconv.ParseFloat(valueStr, 64); err == nil {
return value
}
return defaultVal
}

// getEnvAsInt reads an int from the given env variable, returning defaultVal if not set or invalid
func getEnvAsInt(name string, defaultVal int) int {
valueStr := os.Getenv(name)
if value, err := strconv.Atoi(valueStr); err == nil {
return value
}
return defaultVal
}

var (
KubeConfig *string

// Singleton clientset to avoid creating new clients per request
clientsetOnce sync.Once
clientsetInstance *kubernetes.Clientset
clientsetErr error
)

// getKubeConfig setup the config for access cluster resource
func (k8s *k8sSubscriber) GetKubeConfig() (*rest.Config, error) {
var config *rest.Config
var err error

// Use in-cluster config if kubeconfig path is not specified
if *KubeConfig == "" {
return rest.InClusterConfig()
config, err = rest.InClusterConfig()
} else {
config, err = clientcmd.BuildConfigFromFlags("", *KubeConfig)
}
return clientcmd.BuildConfigFromFlags("", *KubeConfig)
}

func (k8s *k8sSubscriber) GetGenericK8sClient() (*kubernetes.Clientset, error) {
config, err := k8s.GetKubeConfig()
if err != nil {
return nil, err
}

return kubernetes.NewForConfig(config)
// K8s client tuning - configurable via env variables for cluster-specific needs
// Default QPS=5 and Burst=10 are too low for clusters with 100+ namespaces
config.QPS = float32(getEnvAsFloat("K8S_CLIENT_QPS", DefaultQPS))
config.Burst = getEnvAsInt("K8S_CLIENT_BURST", DefaultBurst)
config.Timeout = time.Duration(getEnvAsInt("K8S_CLIENT_TIMEOUT", DefaultTimeout)) * time.Second

return config, nil
}

func (k8s *k8sSubscriber) GetGenericK8sClient() (*kubernetes.Clientset, error) {
// This eliminates TCP handshake and TLS negotiation overhead
clientsetOnce.Do(func() {
config, err := k8s.GetKubeConfig()
if err != nil {
clientsetErr = err
return
}
clientsetInstance, clientsetErr = kubernetes.NewForConfig(config)
})

if clientsetErr != nil {
return nil, clientsetErr
}

return clientsetInstance, nil
}

// This function returns dynamic client and discovery client
Expand Down
45 changes: 28 additions & 17 deletions chaoscenter/subscriber/pkg/k8s/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/sirupsen/logrus"

Expand All @@ -15,7 +16,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
)

var (
Expand All @@ -35,30 +35,41 @@ func (k8s *k8sSubscriber) GetKubernetesNamespaces(request types.KubeNamespaceReq
}
namespaceData = append(namespaceData, KubeNamespace)
} else {
// In case of cluster scope, get all the namespaces
conf, err := k8s.GetKubeConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(conf)
// Cached clientset instead of creating new one
clientset, err := k8s.GetGenericK8sClient()
if err != nil {
return nil, err
}

namespace, err := clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
// Add context timeout to prevent indefinite hangs
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Add pagination support for clusters with many namespaces
listOpts := metav1.ListOptions{
Limit: 500, // Fetch in batches of 500
}
if len(namespace.Items) > 0 {
for _, namespace := range namespace.Items {

KubeNamespace := &types.KubeNamespace{
Name: namespace.GetName(),
}
for {
namespaceList, err := clientset.CoreV1().Namespaces().List(ctx, listOpts)
if err != nil {
return nil, err
}

for _, ns := range namespaceList.Items {
namespaceData = append(namespaceData, &types.KubeNamespace{
Name: ns.GetName(),
})
}

namespaceData = append(namespaceData, KubeNamespace)
// Check if there are more results to fetch
if namespaceList.Continue == "" {
break
}
} else {
listOpts.Continue = namespaceList.Continue
}

if len(namespaceData) == 0 {
return nil, errors.New("No namespace available")
}
}
Expand Down
Loading
Loading