Skip to content

Commit bade8dc

Browse files
authored
pod informer added / node and manager update (#225)
1 parent 1c1502e commit bade8dc

File tree

6 files changed

+898
-63
lines changed

6 files changed

+898
-63
lines changed

internal/informer/manager.go

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/sirupsen/logrus"
1111
authorizationv1 "k8s.io/api/authorization/v1"
12+
core "k8s.io/api/core/v1"
1213
storagev1 "k8s.io/api/storage/v1"
1314
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415
"k8s.io/client-go/informers"
@@ -42,7 +43,7 @@ type Manager struct {
4243
factory informers.SharedInformerFactory
4344
cacheSyncTimeout time.Duration
4445

45-
nodes NodeInformer
46+
nodes *nodeInformer
4647
pods *podInformer
4748
volumeAttachments *vaInformer
4849

@@ -63,10 +64,11 @@ func WithCacheSyncTimeout(timeout time.Duration) Option {
6364

6465
func EnablePodInformer() Option {
6566
return func(m *Manager) {
66-
m.pods = &podInformer{
67-
informer: m.factory.Core().V1().Pods().Informer(),
68-
lister: m.factory.Core().V1().Pods().Lister(),
69-
}
67+
m.pods = NewPodInformer(
68+
m.factory.Core().V1().Pods().Informer(),
69+
m.factory.Core().V1().Pods().Lister(),
70+
nil,
71+
)
7072
}
7173
}
7274

@@ -82,7 +84,26 @@ func EnableNodeInformer() Option {
8284
// WithNodeIndexers sets custom indexers for the node informer.
8385
func WithNodeIndexers(indexers cache.Indexers) Option {
8486
return func(n *Manager) {
85-
n.nodes.SetIndexers(indexers)
87+
n.nodes.indexers = indexers
88+
}
89+
}
90+
91+
func WithDefaultPodNodeNameIndexer() Option {
92+
return WithPodIndexers(cache.Indexers{
93+
PodIndexerName: func(obj any) ([]string, error) {
94+
pod, ok := obj.(*core.Pod)
95+
if !ok {
96+
return nil, nil
97+
}
98+
return []string{pod.Spec.NodeName}, nil
99+
},
100+
})
101+
}
102+
103+
// WithPodIndexers sets custom indexers for the pod informer.
104+
func WithPodIndexers(indexers cache.Indexers) Option {
105+
return func(n *Manager) {
106+
n.pods.indexers = indexers
86107
}
87108
}
88109

@@ -168,7 +189,7 @@ func (m *Manager) Start(ctx context.Context) error {
168189
}
169190

170191
m.log.Info("waiting for node informer cache to sync...")
171-
if !cache.WaitForCacheSync(syncCtx.Done(), m.nodes.HasSynced) {
192+
if !cache.WaitForCacheSync(syncCtx.Done(), m.nodes.informer.HasSynced) {
172193
cancel()
173194
return fmt.Errorf("failed to sync node informer cache within %v", m.cacheSyncTimeout)
174195
}
@@ -178,7 +199,7 @@ func (m *Manager) Start(ctx context.Context) error {
178199

179200
if m.pods != nil {
180201
m.log.Info("waiting for pod informer cache to sync...")
181-
if !cache.WaitForCacheSync(syncCtx.Done(), m.pods.HasSynced) {
202+
if !cache.WaitForCacheSync(syncCtx.Done(), m.pods.informer.HasSynced) {
182203
cancel()
183204
return fmt.Errorf("failed to sync pod informer cache within %v", m.cacheSyncTimeout)
184205
}
@@ -247,7 +268,7 @@ func (m *Manager) GetNodeLister() listerv1.NodeLister {
247268
if m.nodes == nil {
248269
return nil
249270
}
250-
return m.nodes.Lister()
271+
return m.nodes.lister
251272
}
252273

253274
// GetNodeInformer returns the node informer for watching node events.
@@ -258,20 +279,26 @@ func (m *Manager) GetNodeInformer() NodeInformer {
258279
return m.nodes
259280
}
260281

282+
// Nodes returns the node informer for node domain operations.
283+
// This is a convenience method that returns the same as GetNodeInformer().
284+
func (m *Manager) Nodes() NodeInformer {
285+
return m.GetNodeInformer()
286+
}
287+
261288
// GetPodLister returns the pod lister for querying the pod cache.
262289
func (m *Manager) GetPodLister() listerv1.PodLister {
263290
if m.pods == nil {
264291
return nil
265292
}
266-
return m.pods.Lister()
293+
return m.pods.lister
267294
}
268295

269-
// GetPodInformer returns the pod informer for watching pod events.
270-
func (m *Manager) GetPodInformer() cache.SharedIndexInformer {
296+
// Pods returns the pod informer for pod domain operations.
297+
func (m *Manager) Pods() PodInformer {
271298
if m.pods == nil {
272299
return nil
273300
}
274-
return m.pods.Informer()
301+
return m.pods
275302
}
276303

277304
// IsVAAvailable indicates whether the VolumeAttachment informer is available.
@@ -346,8 +373,8 @@ func (m *Manager) checkVAPermissions(ctx context.Context) error {
346373
}
347374

348375
func (m *Manager) addIndexers() error {
349-
if m.nodes != nil && m.nodes.Indexers() != nil {
350-
if err := m.nodes.Informer().AddIndexers(m.nodes.Indexers()); err != nil {
376+
if m.nodes != nil && m.nodes.indexers != nil {
377+
if err := m.nodes.informer.AddIndexers(m.nodes.indexers); err != nil {
351378
return fmt.Errorf("adding node indexers: %w", err)
352379
}
353380
}
@@ -374,14 +401,14 @@ func (m *Manager) reportCacheSize(ctx context.Context) {
374401
return
375402
case <-ticker.C:
376403
if m.nodes != nil {
377-
nodes := m.nodes.Informer().GetStore().ListKeys()
404+
nodes := m.nodes.informer.GetStore().ListKeys()
378405
size := len(nodes)
379406
m.log.WithField("cache_size", size).Debug("node informer cache size")
380407
metrics.SetInformerCacheSize("node", size)
381408
}
382409

383410
if m.pods != nil {
384-
pods := m.pods.Informer().GetStore().ListKeys()
411+
pods := m.pods.informer.GetStore().ListKeys()
385412
size := len(pods)
386413
m.log.WithField("cache_size", size).Debug("pod informer cache size")
387414
metrics.SetInformerCacheSize("pod", size)

internal/informer/manager_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func TestNewManager(t *testing.T) {
3030
require.NotNil(t, manager.GetFactory())
3131
require.NotNil(t, manager.GetNodeInformer())
3232
require.NotNil(t, manager.GetPodLister())
33-
require.NotNil(t, manager.GetPodInformer())
33+
require.NotNil(t, manager.Pods())
3434

3535
// VA getters return nil before Start() because vaAvailable is false by default
3636
require.False(t, manager.IsVAAvailable())
@@ -260,17 +260,17 @@ func TestManager_VAInformer(t *testing.T) {
260260
require.True(t, manager.IsVAAvailable())
261261

262262
// Test lister
263-
vas, err := manager.GetVALister().List(labels.Everything())
263+
volumeAttachments, err := manager.GetVALister().List(labels.Everything())
264264
require.NoError(t, err)
265-
require.Len(t, vas, 1)
266-
require.Equal(t, "va-1", vas[0].Name)
265+
require.Len(t, volumeAttachments, 1)
266+
require.Equal(t, "va-1", volumeAttachments[0].Name)
267267

268268
// Test indexer with node name lookup
269269
indexed, err := manager.GetVAIndexer().ByIndex(VANodeNameIndexer, "node-1")
270270
require.NoError(t, err)
271271
require.Len(t, indexed, 1)
272272

273-
// Verify no VAs on non-existent node
273+
// Verify no volumeAttachments on non-existent node
274274
indexed, err = manager.GetVAIndexer().ByIndex(VANodeNameIndexer, "node-2")
275275
require.NoError(t, err)
276276
require.Len(t, indexed, 0)
@@ -324,7 +324,7 @@ func TestManager_DisabledInformers(t *testing.T) {
324324
require.Nil(t, manager.GetNodeLister())
325325
require.Nil(t, manager.GetNodeInformer())
326326
require.Nil(t, manager.GetPodLister())
327-
require.Nil(t, manager.GetPodInformer())
327+
require.Nil(t, manager.Pods())
328328

329329
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
330330
defer cancel()
@@ -338,7 +338,7 @@ func TestManager_DisabledInformers(t *testing.T) {
338338
require.Nil(t, manager.GetNodeLister())
339339
require.Nil(t, manager.GetNodeInformer())
340340
require.Nil(t, manager.GetPodLister())
341-
require.Nil(t, manager.GetPodInformer())
341+
require.Nil(t, manager.Pods())
342342

343343
// VA should still work (always enabled)
344344
require.True(t, manager.IsVAAvailable())

internal/informer/node_informer.go

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,26 @@ import (
77

88
"github.com/sirupsen/logrus"
99
corev1 "k8s.io/api/core/v1"
10+
"k8s.io/apimachinery/pkg/labels"
1011
listerv1 "k8s.io/client-go/listers/core/v1"
1112
"k8s.io/client-go/tools/cache"
1213
)
1314

1415
type Predicate func(node *corev1.Node) (bool, error)
1516

17+
// NodeInformer provides domain operations for nodes.
18+
// Lifecycle (start/stop/sync) is managed by Manager.
1619
type NodeInformer interface {
17-
Start(ctx context.Context) error
20+
// Get retrieves a node by name from cache
21+
Get(name string) (*corev1.Node, error)
22+
23+
// List returns all nodes from cache
24+
List() ([]*corev1.Node, error)
25+
26+
// Wait watches for a node to meet a condition.
27+
// Returns a channel that signals when condition is met or context is canceled.
28+
// Used for event-driven waiting (e.g., wait for node to become ready).
1829
Wait(ctx context.Context, name string, condition Predicate) chan error
19-
Informer() cache.SharedIndexInformer
20-
Lister() listerv1.NodeLister
21-
Indexers() cache.Indexers
22-
SetIndexers(indexers cache.Indexers)
23-
HasSynced() bool
2430
}
2531

2632
type observable struct {
@@ -44,7 +50,7 @@ type nodeInformer struct {
4450
func NewNodeInformer(
4551
informer cache.SharedIndexInformer,
4652
lister listerv1.NodeLister,
47-
) NodeInformer {
53+
) *nodeInformer {
4854
n := &nodeInformer{
4955
informer: informer,
5056
lister: lister,
@@ -123,6 +129,18 @@ func (n *nodeInformer) onEvent(object any) {
123129
}
124130
}
125131

132+
func (n *nodeInformer) Get(name string) (*corev1.Node, error) {
133+
return n.lister.Get(name)
134+
}
135+
136+
func (n *nodeInformer) List() ([]*corev1.Node, error) {
137+
nodes, err := n.lister.List(labels.Everything())
138+
if err != nil {
139+
return nil, err
140+
}
141+
return nodes, nil
142+
}
143+
126144
func (n *nodeInformer) Wait(ctx context.Context, name string, condition Predicate) chan error {
127145
done := make(chan error, 1)
128146

@@ -165,23 +183,3 @@ func (n *nodeInformer) Wait(ctx context.Context, name string, condition Predicat
165183

166184
return done
167185
}
168-
169-
func (n *nodeInformer) Informer() cache.SharedIndexInformer {
170-
return n.informer
171-
}
172-
173-
func (n *nodeInformer) Lister() listerv1.NodeLister {
174-
return n.lister
175-
}
176-
177-
func (n *nodeInformer) Indexers() cache.Indexers {
178-
return n.indexers
179-
}
180-
181-
func (n *nodeInformer) SetIndexers(indexers cache.Indexers) {
182-
n.indexers = indexers
183-
}
184-
185-
func (n *nodeInformer) HasSynced() bool {
186-
return n.informer.HasSynced()
187-
}

0 commit comments

Comments
 (0)