@@ -26,6 +26,7 @@ import (
2626 "os"
2727 "path/filepath"
2828 "strings"
29+ "sync"
2930 "time"
3031
3132 "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/clientset"
@@ -56,14 +57,20 @@ type manager struct {
5657 metricsEndpoint string
5758 fuseSocketDir string
5859 clientset clientset.Interface
60+
61+ maximumNumberOfCollectors int
62+ registeredCollectorsCount int
63+ mutex sync.Mutex
5964}
6065
61- func NewMetricsManager (metricsEndpoint , fuseSocketDir string , clientset clientset.Interface ) Manager {
66+ func NewMetricsManager (metricsEndpoint , fuseSocketDir string , maximumNumberOfCollectors int , clientset clientset.Interface ) Manager {
6267 mm := & manager {
63- registry : prometheus .NewRegistry (),
64- metricsEndpoint : metricsEndpoint ,
65- fuseSocketDir : fuseSocketDir ,
66- clientset : clientset ,
68+ registry : prometheus .NewRegistry (),
69+ metricsEndpoint : metricsEndpoint ,
70+ fuseSocketDir : fuseSocketDir ,
71+ clientset : clientset ,
72+ maximumNumberOfCollectors : maximumNumberOfCollectors ,
73+ mutex : sync.Mutex {},
6774 }
6875
6976 return mm
@@ -115,8 +122,25 @@ func (mm *manager) RegisterMetricsCollector(targetPath, podNamespace, podName, b
115122 "bucket_name" : bucketName ,
116123 "pod_uid" : podUID ,
117124 }, mm .clientset )
118- if err := mm .registry .Register (c ); err != nil && ! strings .Contains (err .Error (), prometheus.AlreadyRegisteredError {}.Error ()) {
119- klog .Errorf ("failed to register metrics collector for pod %v/%v, volume %q, bucket %q: %v" , podNamespace , podName , volumeName , bucketName , err )
125+
126+ // Lock the number of registered collectors while we attemtp to register a new collector.
127+ mm .mutex .Lock ()
128+ defer mm .mutex .Unlock ()
129+
130+ // We skip registration when we already registered all supported metrics collectors. If a limit is
131+ // unset (maximumNumberOfCollectors less than zero), we continue to registration.
132+ if mm .maximumNumberOfCollectors >= 0 && mm .registeredCollectorsCount >= mm .maximumNumberOfCollectors {
133+ return
134+ }
135+
136+ err = mm .registry .Register (c )
137+ if err != nil {
138+ if ! strings .Contains (err .Error (), prometheus.AlreadyRegisteredError {}.Error ()) {
139+ klog .Errorf ("failed to register metrics collector for pod %v/%v, volume %q, bucket %q: %v" , podNamespace , podName , volumeName , bucketName , err )
140+ }
141+ } else {
142+ mm .registeredCollectorsCount += 1
143+ klog .Infof ("successfully registered a new metrics collector: podUID: %s, volume: %s. there's %d collectors registerd." , podUID , bucketName , mm .registeredCollectorsCount )
120144 }
121145}
122146
@@ -126,8 +150,16 @@ func (mm *manager) UnregisterMetricsCollector(targetPath string) {
126150
127151 // metricsCollector uses a hash of pod UID and volume name as an identifier.
128152 c := NewMetricsCollector ("" , "" , "" , "" , podUID , volumeName , nil , nil )
153+
154+ // Lock the number of registered collectors while we attempt to unregister a collector.
155+ mm .mutex .Lock ()
156+ defer mm .mutex .Unlock ()
157+
129158 if ok := mm .registry .Unregister (c ); ! ok {
130159 klog .Infof ("Unregister metrics collector for targetPath %q is not needed since the collector is not registered" , targetPath )
160+ } else {
161+ mm .registeredCollectorsCount -= 1
162+ klog .Infof ("successfully unregistered a metrics collector: podUID: %s, volume: %s. there's %d collectors registerd." , podUID , volumeName , mm .registeredCollectorsCount )
131163 }
132164}
133165
0 commit comments