Skip to content

Commit d8e4a5e

Browse files
committed
Add cert watcher for target allocator TLS config
1 parent 47132e6 commit d8e4a5e

File tree

1 file changed

+73
-8
lines changed
  • receiver/prometheusreceiver/targetallocator

1 file changed

+73
-8
lines changed

receiver/prometheusreceiver/targetallocator/manager.go

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"sort"
1616
"time"
1717

18+
"github.com/fsnotify/fsnotify"
1819
commonconfig "github.com/prometheus/common/config"
1920
"github.com/prometheus/common/model"
2021
promconfig "github.com/prometheus/prometheus/config"
@@ -36,6 +37,8 @@ type Manager struct {
3637
scrapeManager *scrape.Manager
3738
discoveryManager *discovery.Manager
3839
enableNativeHistograms bool
40+
watcher *fsnotify.Watcher
41+
host component.Host
3942
}
4043

4144
func NewManager(set receiver.Settings, cfg *Config, promCfg *promconfig.Config, enableNativeHistograms bool) *Manager {
@@ -52,6 +55,8 @@ func NewManager(set receiver.Settings, cfg *Config, promCfg *promconfig.Config,
5255
func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Manager, dm *discovery.Manager) error {
5356
m.scrapeManager = sm
5457
m.discoveryManager = dm
58+
m.host = host
59+
5560
err := m.applyCfg()
5661
if err != nil {
5762
m.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err))
@@ -61,23 +66,24 @@ func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Man
6166
// the target allocator is disabled
6267
return nil
6368
}
64-
httpClient, err := m.cfg.ClientConfig.ToClient(ctx, host, m.settings.TelemetrySettings)
65-
if err != nil {
66-
m.settings.Logger.Error("Failed to create http client", zap.Error(err))
67-
return err
68-
}
6969
m.settings.Logger.Info("Starting target allocator discovery")
7070
// immediately sync jobs, not waiting for the first tick
71-
savedHash, err := m.sync(uint64(0), httpClient)
71+
savedHash, err := m.sync(ctx, uint64(0))
7272
if err != nil {
7373
m.settings.Logger.Error("Failed to sync target allocator", zap.Error(err))
7474
}
75+
76+
// Setup fsnotify watchers for TLS files
77+
if err := m.setupTLSWatchers(ctx); err != nil {
78+
m.settings.Logger.Error("Error setting up TLS watchers", zap.Error(err))
79+
}
80+
7581
go func() {
7682
targetAllocatorIntervalTicker := time.NewTicker(m.cfg.Interval)
7783
for {
7884
select {
7985
case <-targetAllocatorIntervalTicker.C:
80-
hash, newErr := m.sync(savedHash, httpClient)
86+
hash, newErr := m.sync(ctx, savedHash)
8187
if newErr != nil {
8288
m.settings.Logger.Error(newErr.Error())
8389
continue
@@ -95,14 +101,73 @@ func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Man
95101

96102
func (m *Manager) Shutdown() {
97103
close(m.shutdown)
104+
if err := m.watcher.Close(); err != nil {
105+
m.settings.Logger.Warn("Error closing fsnotify watcher", zap.Error(err))
106+
}
107+
}
108+
109+
// setupTLSWatchers creates one fsnotify watcher and adds CAFile, CertFile, KeyFile
110+
// so that we automatically re‐sync whenever they change.
111+
func (m *Manager) setupTLSWatchers(ctx context.Context) error {
112+
watcher, err := fsnotify.NewWatcher()
113+
if err != nil {
114+
return fmt.Errorf("failed to create fsnotify watcher: %w", err)
115+
}
116+
m.watcher = watcher
117+
118+
addFile := func(path string) {
119+
if path == "" {
120+
return
121+
}
122+
if err := watcher.Add(path); err != nil {
123+
m.settings.Logger.Error("Failed to watch TLS file", zap.String("file", path), zap.Error(err))
124+
}
125+
}
126+
127+
addFile(m.cfg.TLSSetting.CAFile)
128+
addFile(m.cfg.TLSSetting.CertFile)
129+
addFile(m.cfg.TLSSetting.KeyFile)
130+
131+
go func() {
132+
for {
133+
select {
134+
case event, ok := <-watcher.Events:
135+
if !ok {
136+
return
137+
}
138+
if event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename|fsnotify.Remove) != 0 {
139+
m.settings.Logger.Info("TLS file changed; re-syncing",
140+
zap.String("file", event.Name), zap.String("op", event.Op.String()))
141+
if _, err := m.sync(ctx, uint64(0)); err != nil {
142+
m.settings.Logger.Error("Failed to sync after TLS file change", zap.Error(err))
143+
}
144+
}
145+
case err, ok := <-watcher.Errors:
146+
if !ok {
147+
return
148+
}
149+
m.settings.Logger.Error("fsnotify watcher error", zap.Error(err))
150+
case <-m.shutdown:
151+
return
152+
}
153+
}
154+
}()
155+
156+
return nil
98157
}
99158

100159
// sync request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash.
101160
// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs.
102-
func (m *Manager) sync(compareHash uint64, httpClient *http.Client) (uint64, error) {
161+
func (m *Manager) sync(ctx context.Context, compareHash uint64) (uint64, error) {
103162
m.settings.Logger.Debug("Syncing target allocator jobs")
104163
m.settings.Logger.Debug("endpoint", zap.String("endpoint", m.cfg.Endpoint))
105164

165+
httpClient, err := m.cfg.ClientConfig.ToClient(ctx, m.host, m.settings.TelemetrySettings)
166+
if err != nil {
167+
m.settings.Logger.Error("Failed to create http client", zap.Error(err))
168+
return 0, err
169+
}
170+
106171
scrapeConfigsResponse, err := getScrapeConfigsResponse(httpClient, m.cfg.Endpoint)
107172
if err != nil {
108173
m.settings.Logger.Error("Failed to retrieve job list", zap.Error(err))

0 commit comments

Comments
 (0)