diff --git a/internal/informer/manager.go b/internal/informer/manager.go new file mode 100644 index 00000000..17b8a799 --- /dev/null +++ b/internal/informer/manager.go @@ -0,0 +1,210 @@ +// Package informer provides a shared informer manager for Kubernetes resources. +package informer + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + listerv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + + "github.com/castai/cluster-controller/internal/metrics" +) + +const ( + defaultCacheSyncTimeout = 30 * time.Second +) + +// Manager manages the global SharedInformerFactory and provides +// access to specific informers and listers. +type Manager struct { + log logrus.FieldLogger + factory informers.SharedInformerFactory + cacheSyncTimeout time.Duration + + nodes *nodeInformer + pods *podInformer + + started bool + cancelFunc context.CancelFunc + mu sync.RWMutex +} + +// Option is a functional option for configuring the Manager. +type Option func(*Manager) + +// WithCacheSyncTimeout sets the timeout for waiting for informer caches to sync. +func WithCacheSyncTimeout(timeout time.Duration) Option { + return func(m *Manager) { + m.cacheSyncTimeout = timeout + } +} + +// WithNodeIndexers sets custom indexers for the node informer. +func WithNodeIndexers(indexers cache.Indexers) Option { + return func(m *Manager) { + m.nodes.indexers = indexers + } +} + +// WithPodIndexers sets custom indexers for the pod informer. +func WithPodIndexers(indexers cache.Indexers) Option { + return func(m *Manager) { + m.pods.indexers = indexers + } +} + +// NewManager creates a new Manager with the given clientset and resync period. +func NewManager( + log logrus.FieldLogger, + clientset kubernetes.Interface, + resyncPeriod time.Duration, + opts ...Option, +) *Manager { + factory := informers.NewSharedInformerFactory(clientset, resyncPeriod) + + nodes := &nodeInformer{ + informer: factory.Core().V1().Nodes().Informer(), + lister: factory.Core().V1().Nodes().Lister(), + } + + pods := &podInformer{ + informer: factory.Core().V1().Pods().Informer(), + lister: factory.Core().V1().Pods().Lister(), + } + + m := &Manager{ + log: log, + factory: factory, + cacheSyncTimeout: defaultCacheSyncTimeout, + nodes: nodes, + pods: pods, + } + + for _, opt := range opts { + opt(m) + } + + return m +} + +// Start starts the informer factory and waits for all caches to sync. +// This method blocks until caches are synchronized or the context is canceled. +func (m *Manager) Start(ctx context.Context) error { + if m.started { + m.log.Warn("informer manager already started") + return nil + } + + ctx, cancel := context.WithCancel(ctx) + m.cancelFunc = cancel + + if err := m.addIndexers(); err != nil { + cancel() + return fmt.Errorf("adding indexers: %w", err) + } + + m.log.Info("starting shared informer factory...") + m.factory.Start(ctx.Done()) + + syncCtx, syncCancel := context.WithTimeout(ctx, m.cacheSyncTimeout) + defer syncCancel() + + m.log.Info("waiting for informer caches to sync...") + if !cache.WaitForCacheSync(syncCtx.Done(), m.nodes.HasSynced, m.pods.HasSynced) { + cancel() + return fmt.Errorf("failed to sync informer caches within %v", m.cacheSyncTimeout) + } + + metrics.IncrementInformerCacheSyncs("node", "success") + metrics.IncrementInformerCacheSyncs("pod", "success") + + m.started = true + + m.log.Info("informer caches synced successfully") + + go m.reportCacheSize(ctx) + + return nil +} + +// Stop gracefully stops the informer factory. +func (m *Manager) Stop() { + if !m.started { + return + } + + m.log.Info("stopping informer manager...") + if m.cancelFunc != nil { + m.cancelFunc() + m.cancelFunc = nil + } + m.started = false + m.log.Info("informer manager stopped") +} + +// GetNodeLister returns the node lister for querying the node cache. +func (m *Manager) GetNodeLister() listerv1.NodeLister { + return m.nodes.Lister() +} + +// GetNodeInformer returns the node informer for watching node events. +func (m *Manager) GetNodeInformer() cache.SharedIndexInformer { + return m.nodes.Informer() +} + +// GetPodLister returns the pod lister for querying the pod cache. +func (m *Manager) GetPodLister() listerv1.PodLister { + return m.pods.Lister() +} + +// GetPodInformer returns the pod informer for watching pod events. +func (m *Manager) GetPodInformer() cache.SharedIndexInformer { + return m.pods.Informer() +} + +// GetFactory returns the underlying SharedInformerFactory for advanced use cases. +func (m *Manager) GetFactory() informers.SharedInformerFactory { + return m.factory +} + +func (m *Manager) addIndexers() error { + if m.nodes.indexers != nil { + if err := m.nodes.informer.AddIndexers(m.nodes.indexers); err != nil { + return fmt.Errorf("adding node indexers: %w", err) + } + } + if m.pods.indexers != nil { + if err := m.pods.informer.AddIndexers(m.pods.indexers); err != nil { + return fmt.Errorf("adding pod indexers: %w", err) + } + } + return nil +} + +func (m *Manager) reportCacheSize(ctx context.Context) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + nodes := m.nodes.Informer().GetStore().ListKeys() + size := len(nodes) + m.log.WithField("cache_size", size).Debug("node informer cache size") + metrics.SetInformerCacheSize("node", size) + + pods := m.pods.Informer().GetStore().ListKeys() + size = len(pods) + m.log.WithField("cache_size", size).Debug("pod informer cache size") + metrics.SetInformerCacheSize("pod", size) + } + } +} diff --git a/internal/informer/manager_test.go b/internal/informer/manager_test.go new file mode 100644 index 00000000..1ecd1da7 --- /dev/null +++ b/internal/informer/manager_test.go @@ -0,0 +1,219 @@ +package informer + +import ( + "context" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +func TestNewManager(t *testing.T) { + t.Parallel() + + log := logrus.New() + clientset := fake.NewClientset() + + manager := NewManager(log, clientset, time.Hour) + + require.NotNil(t, manager) + require.NotNil(t, manager.GetFactory()) + require.NotNil(t, manager.GetNodeLister()) + require.NotNil(t, manager.GetNodeInformer()) + require.NotNil(t, manager.GetPodLister()) + require.NotNil(t, manager.GetPodInformer()) +} + +func TestManager_Start(t *testing.T) { + t.Parallel() + tests := []struct { + name string + objects []runtime.Object + setupCtx func(t *testing.T) (context.Context, context.CancelFunc) + wantErr bool + errContains string + expectedNodes int + expectedPods int + }{ + { + name: "success with resources", + objects: []runtime.Object{ + &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}, + &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", Namespace: "default"}}, + }, + setupCtx: func(t *testing.T) (context.Context, context.CancelFunc) { + return context.WithTimeout(t.Context(), 5*time.Second) + }, + wantErr: false, + expectedNodes: 1, + expectedPods: 1, + }, + { + name: "success with empty cluster", + objects: nil, + setupCtx: func(t *testing.T) (context.Context, context.CancelFunc) { + return context.WithTimeout(t.Context(), 5*time.Second) + }, + wantErr: false, + expectedNodes: 0, + expectedPods: 0, + }, + { + name: "context canceled", + objects: nil, + setupCtx: func(t *testing.T) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(t.Context()) + cancel() + return ctx, func() {} + }, + wantErr: true, + errContains: "failed to sync informer caches", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + clientset := fake.NewClientset(tt.objects...) + manager := NewManager(log, clientset, 0) + + ctx, cancel := tt.setupCtx(t) + defer cancel() + + err := manager.Start(ctx) + + if tt.wantErr { + require.Error(t, err) + if tt.errContains != "" { + require.Contains(t, err.Error(), tt.errContains) + } + return + } + + require.NoError(t, err) + defer manager.Stop() + + nodes, err := manager.GetNodeLister().List(labels.Everything()) + require.NoError(t, err) + require.Len(t, nodes, tt.expectedNodes) + + pods, err := manager.GetPodLister().List(labels.Everything()) + require.NoError(t, err) + require.Len(t, pods, tt.expectedPods) + }) + } +} + +func TestManager_Start_AlreadyStarted(t *testing.T) { + t.Parallel() + + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + clientset := fake.NewClientset() + manager := NewManager(log, clientset, 0) + + ctx := t.Context() + + err := manager.Start(ctx) + require.NoError(t, err) + defer manager.Stop() + + err = manager.Start(ctx) + require.NoError(t, err) +} + +func TestManager_Stop(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + setup func(t *testing.T, m *Manager) + }{ + { + name: "stop before start", + setup: func(t *testing.T, m *Manager) {}, + }, + { + name: "stop after start", + setup: func(t *testing.T, m *Manager) { + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + err := m.Start(ctx) + require.NoError(t, err) + }, + }, + { + name: "stop multiple times", + setup: func(t *testing.T, m *Manager) { + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + err := m.Start(ctx) + require.NoError(t, err) + m.Stop() + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + clientset := fake.NewClientset() + manager := NewManager(log, clientset, 0) + + tt.setup(t, manager) + manager.Stop() + }) + } +} + +func TestManager_CacheUpdates(t *testing.T) { + t.Parallel() + + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + } + + clientset := fake.NewClientset(node) + manager := NewManager(log, clientset, 0) + + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + err := manager.Start(ctx) + require.NoError(t, err) + defer manager.Stop() + + nodes, err := manager.GetNodeLister().List(labels.Everything()) + require.NoError(t, err) + require.Len(t, nodes, 1) + + node2 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, + } + _, err = clientset.CoreV1().Nodes().Create(t.Context(), node2, metav1.CreateOptions{}) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + nodes, err = manager.GetNodeLister().List(labels.Everything()) + require.NoError(t, err) + require.Len(t, nodes, 2) +} diff --git a/internal/informer/node_informer.go b/internal/informer/node_informer.go new file mode 100644 index 00000000..ea8fc787 --- /dev/null +++ b/internal/informer/node_informer.go @@ -0,0 +1,24 @@ +package informer + +import ( + listerv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" +) + +type nodeInformer struct { + informer cache.SharedIndexInformer + lister listerv1.NodeLister + indexers cache.Indexers +} + +func (n *nodeInformer) Informer() cache.SharedIndexInformer { + return n.informer +} + +func (n *nodeInformer) Lister() listerv1.NodeLister { + return n.lister +} + +func (n *nodeInformer) HasSynced() bool { + return n.informer.HasSynced() +} diff --git a/internal/informer/node_informer_test.go b/internal/informer/node_informer_test.go new file mode 100644 index 00000000..3995f9c1 --- /dev/null +++ b/internal/informer/node_informer_test.go @@ -0,0 +1,25 @@ +package informer + +import ( + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes/fake" +) + +func TestNodeInformer_Informer(t *testing.T) { + t.Parallel() + + log := logrus.New() + clientset := fake.NewClientset() + manager := NewManager(log, clientset, time.Hour) + + informer := manager.nodes.Informer() + lister := manager.nodes.Lister() + + require.False(t, manager.nodes.HasSynced()) + require.NotNil(t, informer) + require.NotNil(t, lister) +} diff --git a/internal/informer/pod_informer.go b/internal/informer/pod_informer.go new file mode 100644 index 00000000..f30bf5a3 --- /dev/null +++ b/internal/informer/pod_informer.go @@ -0,0 +1,24 @@ +package informer + +import ( + listerv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" +) + +type podInformer struct { + informer cache.SharedIndexInformer + lister listerv1.PodLister + indexers cache.Indexers +} + +func (p *podInformer) Informer() cache.SharedIndexInformer { + return p.informer +} + +func (p *podInformer) Lister() listerv1.PodLister { + return p.lister +} + +func (p *podInformer) HasSynced() bool { + return p.informer.HasSynced() +} diff --git a/internal/informer/pod_informer_test.go b/internal/informer/pod_informer_test.go new file mode 100644 index 00000000..a413ed1c --- /dev/null +++ b/internal/informer/pod_informer_test.go @@ -0,0 +1,25 @@ +package informer + +import ( + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes/fake" +) + +func TestPodInformer_Informer(t *testing.T) { + t.Parallel() + + log := logrus.New() + clientset := fake.NewClientset() + manager := NewManager(log, clientset, time.Hour) + + informer := manager.pods.Informer() + lister := manager.pods.Lister() + + require.NotNil(t, informer) + require.NotNil(t, lister) + require.False(t, manager.pods.HasSynced()) +} diff --git a/internal/metrics/custom_metrics.go b/internal/metrics/custom_metrics.go index 9c041d25..d1701e06 100644 --- a/internal/metrics/custom_metrics.go +++ b/internal/metrics/custom_metrics.go @@ -51,3 +51,29 @@ func ActionFinished(actionType string, success bool, duration time.Duration) { actionExecutedCounter.With(prometheus.Labels{"success": strconv.FormatBool(success), "type": actionType}).Inc() actionExecutedDuration.With(prometheus.Labels{"type": actionType}).Observe(duration.Seconds()) } + +// informerCacheSize tracks the size of informer caches. +var informerCacheSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "informer_cache_size", + Help: "Number of objects in informer cache by resource type.", + }, + []string{"resource"}, +) + +// informerCacheSyncs tracks informer cache sync attempts. +var informerCacheSyncs = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "informer_cache_syncs_total", + Help: "Informer cache sync attempts by resource and status.", + }, + []string{"resource", "status"}, +) + +func SetInformerCacheSize(resource string, size int) { + informerCacheSize.With(prometheus.Labels{"resource": resource}).Set(float64(size)) +} + +func IncrementInformerCacheSyncs(resource, status string) { + informerCacheSyncs.With(prometheus.Labels{"resource": resource, "status": status}).Inc() +} diff --git a/internal/metrics/register.go b/internal/metrics/register.go index 047d0dca..84fada03 100644 --- a/internal/metrics/register.go +++ b/internal/metrics/register.go @@ -9,5 +9,7 @@ func RegisterCustomMetrics() { actionStartedCounter, actionExecutedCounter, actionExecutedDuration, + informerCacheSize, + informerCacheSyncs, ) }