Skip to content

Add cert watcher for target allocator TLS config #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: aws-cwa-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion receiver/prometheusreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/promet
go 1.22.0

require (
github.com/fsnotify/fsnotify v1.8.0
github.com/go-kit/log v0.2.1
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
Expand Down Expand Up @@ -72,7 +73,6 @@ require (
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
83 changes: 75 additions & 8 deletions receiver/prometheusreceiver/targetallocator/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sort"
"time"

"github.com/fsnotify/fsnotify"
commonconfig "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
promconfig "github.com/prometheus/prometheus/config"
Expand All @@ -36,6 +37,8 @@ type Manager struct {
scrapeManager *scrape.Manager
discoveryManager *discovery.Manager
enableNativeHistograms bool
watcher *fsnotify.Watcher
host component.Host
}

func NewManager(set receiver.Settings, cfg *Config, promCfg *promconfig.Config, enableNativeHistograms bool) *Manager {
Expand All @@ -52,6 +55,8 @@ func NewManager(set receiver.Settings, cfg *Config, promCfg *promconfig.Config,
func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Manager, dm *discovery.Manager) error {
m.scrapeManager = sm
m.discoveryManager = dm
m.host = host

err := m.applyCfg()
if err != nil {
m.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err))
Expand All @@ -61,23 +66,24 @@ func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Man
// the target allocator is disabled
return nil
}
httpClient, err := m.cfg.ClientConfig.ToClient(ctx, host, m.settings.TelemetrySettings)
if err != nil {
m.settings.Logger.Error("Failed to create http client", zap.Error(err))
return err
}
m.settings.Logger.Info("Starting target allocator discovery")
// immediately sync jobs, not waiting for the first tick
savedHash, err := m.sync(uint64(0), httpClient)
savedHash, err := m.sync(ctx, uint64(0))
if err != nil {
m.settings.Logger.Error("Failed to sync target allocator", zap.Error(err))
}

// Setup fsnotify watchers for TLS files
if err := m.setupTLSWatchers(ctx); err != nil {
m.settings.Logger.Error("Error setting up TLS watchers", zap.Error(err))
}

go func() {
targetAllocatorIntervalTicker := time.NewTicker(m.cfg.Interval)
for {
select {
case <-targetAllocatorIntervalTicker.C:
hash, newErr := m.sync(savedHash, httpClient)
hash, newErr := m.sync(ctx, savedHash)
if newErr != nil {
m.settings.Logger.Error(newErr.Error())
continue
Expand All @@ -95,14 +101,75 @@ func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Man

func (m *Manager) Shutdown() {
close(m.shutdown)
if m.watcher != nil {
if err := m.watcher.Close(); err != nil {
m.settings.Logger.Warn("Error closing fsnotify watcher", zap.Error(err))
}
}
}

// setupTLSWatchers creates one fsnotify watcher and adds CAFile, CertFile, KeyFile
// so that we automatically re‐sync whenever they change.
func (m *Manager) setupTLSWatchers(ctx context.Context) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("failed to create fsnotify watcher: %w", err)
}
m.watcher = watcher

addFile := func(path string) {
if path == "" {
return
}
if err := watcher.Add(path); err != nil {
m.settings.Logger.Error("Failed to watch TLS file", zap.String("file", path), zap.Error(err))
}
}

addFile(m.cfg.TLSSetting.CAFile)
addFile(m.cfg.TLSSetting.CertFile)
addFile(m.cfg.TLSSetting.KeyFile)

go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename|fsnotify.Remove) != 0 {
m.settings.Logger.Info("TLS file changed; re-syncing",
zap.String("file", event.Name), zap.String("op", event.Op.String()))
if _, err := m.sync(ctx, uint64(0)); err != nil {
m.settings.Logger.Error("Failed to sync after TLS file change", zap.Error(err))
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
m.settings.Logger.Error("fsnotify watcher error", zap.Error(err))
case <-m.shutdown:
return
}
}
}()

return nil
}

// sync request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash.
// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs.
func (m *Manager) sync(compareHash uint64, httpClient *http.Client) (uint64, error) {
func (m *Manager) sync(ctx context.Context, compareHash uint64) (uint64, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just pass the Https client directly why do need to make one client every sync

m.settings.Logger.Debug("Syncing target allocator jobs")
m.settings.Logger.Debug("endpoint", zap.String("endpoint", m.cfg.Endpoint))

httpClient, err := m.cfg.ClientConfig.ToClient(ctx, m.host, m.settings.TelemetrySettings)
if err != nil {
m.settings.Logger.Error("Failed to create http client", zap.Error(err))
return 0, err
}

scrapeConfigsResponse, err := getScrapeConfigsResponse(httpClient, m.cfg.Endpoint)
if err != nil {
m.settings.Logger.Error("Failed to retrieve job list", zap.Error(err))
Expand Down
Loading