Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ func main() {
)

var capacityController *capacity.Controller
var topologyInformer topology.Informer
if *enableCapacity {
// Publishing storage capacity information uses its own client
// with separate rate limiting.
Expand Down Expand Up @@ -483,7 +484,6 @@ func main() {
klog.Infof("using %s/%s %s as owner of CSIStorageCapacity objects", controller.APIVersion, controller.Kind, controller.Name)
}

var topologyInformer topology.Informer
if nodeDeployment == nil {
topologyRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax)
topologyInformer = topology.NewNodeTopology(
Expand All @@ -503,7 +503,6 @@ func main() {
klog.Infof("producing CSIStorageCapacity objects with fixed topology segment %s", segment)
topologyInformer = topology.NewFixedNodeTopology(&segment)
}
go topologyInformer.RunWorker(ctx)

managedByID := "external-provisioner"
if *enableNodeDeployment {
Expand Down Expand Up @@ -662,10 +661,13 @@ func main() {

factory.Start(ctx.Done())
if factoryForNamespace != nil {
// Starting is enough, the capacity controller will
// Starting is enough, the capacityController and topologyInformer will
// wait for sync.
factoryForNamespace.Start(ctx.Done())
}
if topologyInformer != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is important that topologyInformer gets started here vs. where it was started before?

If it's important, then let's add a comment explainining why. If it's not important, then let's not change it.

Copy link
Copy Markdown
Contributor Author

@huww98 huww98 Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All other informers are started here. It is nature to start it here.
I think we should only start topologyInformer after factoryForNamespace.Start, to avoid cache.WaitForCacheSync waiting for not-started informers. Wasting a little CPU when waiting for leader election.

There is already a comment just before:

			// Starting is enough, the capacityController and topologyInformer will
			// wait for sync.

Do you think this is enough?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both approaches would be fine and in general I prefer to avoid drive-by enhancements that aren't related to what a PR is primarily trying to do (in this case "fix duplicate topology"). It causes churn and makes reviews harder.

But we can keep it.

go topologyInformer.RunWorker(ctx)
Comment thread
pohly marked this conversation as resolved.
}
cacheSyncResult := factory.WaitForCacheSync(ctx.Done())
for _, v := range cacheSyncResult {
if !v {
Expand Down
28 changes: 20 additions & 8 deletions pkg/capacity/topology/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"sort"
"sync"
"sync/atomic"

v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
Expand Down Expand Up @@ -159,6 +160,7 @@ type nodeTopology struct {
nodeInformer coreinformersv1.NodeInformer
csiNodeInformer storageinformersv1.CSINodeInformer
queue workqueue.TypedRateLimitingInterface[string]
hasSynced atomic.Bool

mutex sync.Mutex
// segments hold a list of all currently known topology segments.
Expand Down Expand Up @@ -201,23 +203,32 @@ func (nt *nodeTopology) List() []*Segment {
return segments
}

// RunWorker starts a worker that processes topology updates from the queue.
//
// It must only be called once per instance. Calling it more than once would
// result in simultaneous sync() calls that produce duplicate topology segments
// and pass them to callbacks. Consumers depend on the address of
// the same topology segment to be consistent for efficient hashing.
func (nt *nodeTopology) RunWorker(ctx context.Context) {
klog.Info("Started node topology worker")
defer klog.Info("Shutting node topology worker")

if !cache.WaitForCacheSync(ctx.Done(),
nt.nodeInformer.Informer().HasSynced, nt.csiNodeInformer.Informer().HasSynced) {
return
}

go func() {
<-ctx.Done()
nt.queue.ShutDown()
}()
Comment on lines +221 to +224
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synctest checks for leaked goroutines. So I have to clean it up.

nt.queue.Add("") // Initial sync to ensure HasSynced() will become true.
for nt.processNextWorkItem(ctx) {
}
}

func (nt *nodeTopology) HasSynced() bool {
if nt.nodeInformer.Informer().HasSynced() &&
nt.csiNodeInformer.Informer().HasSynced() {
// Now that both informers are up-to-date, use that
// information to update our own view of the world.
nt.sync(context.Background())
return true
}
return false
return nt.hasSynced.Load()
}

func (nt *nodeTopology) processNextWorkItem(ctx context.Context) bool {
Expand All @@ -227,6 +238,7 @@ func (nt *nodeTopology) processNextWorkItem(ctx context.Context) bool {
}
defer nt.queue.Done(obj)
nt.sync(ctx)
nt.hasSynced.Store(true)
return true
}

Expand Down
34 changes: 34 additions & 0 deletions pkg/capacity/topology/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"maps"
"sort"
"testing"
"testing/synctest"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -559,6 +560,39 @@ func TestNodeTopology(t *testing.T) {
}
}

func TestHasSynced(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
client := fakeclientset.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
nodeInformer := informerFactory.Core().V1().Nodes()
csiNodeInformer := informerFactory.Storage().V1().CSINodes()
rateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, 2*time.Second)
queue := workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "items"})

nt := NewNodeTopology(
driverName,
client,
nodeInformer,
csiNodeInformer,
queue,
).(*nodeTopology)

ctx := t.Context()
go nt.RunWorker(ctx)
time.Sleep(10 * time.Second)
if nt.HasSynced() {
t.Fatalf("upstream informer not started yet, expected HasSynced to return false")
}

informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
synctest.Wait()
if !nt.HasSynced() {
t.Fatalf("nt should be synced now")
}
})
}

type segmentsFound map[*Segment]bool

func (sf segmentsFound) Found() []*Segment {
Expand Down