Skip to content

feat(bouncer): add multiple improvements such as : ignore unused pool… #197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions cached_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package main

import (
"context"
"log/slog"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)

// MetricCache holds cached metrics and related metadata
type MetricCache struct {
metrics []prometheus.Metric
lastUpdated time.Time
updating bool
mu sync.RWMutex
}

// CachedExporter wraps the original Exporter with caching capability
type CachedExporter struct {
*Exporter
cache *MetricCache
cacheInterval time.Duration
ctx context.Context
cancel context.CancelFunc
}

// NewCachedExporter creates a new exporter with caching capabilities
func NewCachedExporter(connectionString string, namespace string, logger *slog.Logger, filterEmptyPools bool, cacheInterval time.Duration) *CachedExporter {
ctx, cancel := context.WithCancel(context.Background())

cached := &CachedExporter{
Exporter: NewExporter(connectionString, namespace, logger, filterEmptyPools),
cache: &MetricCache{
metrics: make([]prometheus.Metric, 0),
},
cacheInterval: cacheInterval,
ctx: ctx,
cancel: cancel,
}

// Start the background cache updater
go cached.updateMetricsLoop()

// Perform initial cache population
cached.updateCache()

return cached
}

// updateMetricsLoop runs a periodic update of the cached metrics
func (ce *CachedExporter) updateMetricsLoop() {
ticker := time.NewTicker(ce.cacheInterval)
defer ticker.Stop()

for {
select {
case <-ce.ctx.Done():
return
case <-ticker.C:
ce.updateCache()
}
}
}

// updateCache performs the actual metrics collection and updates the cache
func (ce *CachedExporter) updateCache() {
ce.cache.mu.Lock()
if ce.cache.updating {
ce.cache.mu.Unlock()
return
}
ce.cache.updating = true
ce.cache.mu.Unlock()

// Create a channel to collect metrics
ch := make(chan prometheus.Metric)
done := make(chan struct{})

// Collect metrics in a separate goroutine
collected := make([]prometheus.Metric, 0)
go func() {
for metric := range ch {
collected = append(collected, metric)
}
close(done)
}()

// Perform collection using parent's Collect
ce.Exporter.Collect(ch)
close(ch)
<-done

// Update the cache with new metrics
ce.cache.mu.Lock()
ce.cache.metrics = collected
ce.cache.lastUpdated = time.Now()
ce.cache.updating = false
ce.cache.mu.Unlock()
}

// Collect implements prometheus.Collector interface using cached metrics
func (ce *CachedExporter) Collect(ch chan<- prometheus.Metric) {
ce.cache.mu.RLock()
defer ce.cache.mu.RUnlock()

// Send cached metrics
for _, m := range ce.cache.metrics {
ch <- m
}

// Add a metric for cache age
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "cache_age_seconds"),
"Number of seconds since the metrics cache was last updated",
nil, nil,
),
prometheus.GaugeValue,
time.Since(ce.cache.lastUpdated).Seconds(),
)
}

// Describe implements prometheus.Collector interface
func (ce *CachedExporter) Describe(ch chan<- *prometheus.Desc) {
ce.Exporter.Describe(ch)
}

// Close stops the background updater
func (ce *CachedExporter) Close() {
ce.cancel()
}
100 changes: 85 additions & 15 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var (
)
)

func NewExporter(connectionString string, namespace string, logger *slog.Logger) *Exporter {
func NewExporter(connectionString string, namespace string, logger *slog.Logger, filterEmptyPools bool) *Exporter {

db, err := getDB(connectionString)

Expand All @@ -143,9 +143,10 @@ func NewExporter(connectionString string, namespace string, logger *slog.Logger)
}

return &Exporter{
metricMap: makeDescMap(metricMaps, namespace, logger),
db: db,
logger: logger,
metricMap: makeDescMap(metricMaps, namespace, logger),
db: db,
logger: logger,
filterEmptyPools: filterEmptyPools,
}
}

Expand Down Expand Up @@ -235,9 +236,34 @@ func queryShowConfig(ch chan<- prometheus.Metric, db *sql.DB, logger *slog.Logge
return nil
}

func hasActiveConnections(columnData []interface{}, columnIdx map[string]int) (bool, error) {
// Connection metrics to check
metricsToCheck := []string{
"cl_active",
"cl_waiting",
"sv_active",
"sv_idle",
"sv_used",
"sv_tested",
"sv_login",
}

sum := float64(0)

for _, metric := range metricsToCheck {
if idx, ok := columnIdx[metric]; ok {
if value, ok := dbToFloat64(columnData[idx], 1.0); ok {
sum += value
}
}
}

return sum > 0, nil
}

// Query within a namespace mapping and emit metrics. Returns fatal errors if
// the scrape fails, and a slice of errors if they were non-fatal.
func queryNamespaceMapping(ch chan<- prometheus.Metric, db *sql.DB, namespace string, mapping MetricMapNamespace, logger *slog.Logger) ([]error, error) {
func queryNamespaceMapping(ch chan<- prometheus.Metric, db *sql.DB, namespace string, mapping MetricMapNamespace, filterEmptyPools bool, logger *slog.Logger) ([]error, error) {
query := fmt.Sprintf("SHOW %s;", namespace)

// Don't fail on a bad scrape of one metric
Expand Down Expand Up @@ -275,6 +301,17 @@ func queryNamespaceMapping(ch chan<- prometheus.Metric, db *sql.DB, namespace st
return []error{}, fmt.Errorf("error retrieving rows: %v, error: %w", namespace, err)
}

if namespace == "pools" && filterEmptyPools {
hasConnections, err := hasActiveConnections(columnData, columnIdx)
if err != nil {
nonfatalErrors = append(nonfatalErrors, fmt.Errorf("error checking active connections: %w", err))
continue
}
if !hasConnections {
continue // Skip this pool
}
}

for i, label := range mapping.labels {
for idx, columnName := range columnNames {
if columnName == label {
Expand Down Expand Up @@ -333,19 +370,26 @@ func queryNamespaceMapping(ch chan<- prometheus.Metric, db *sql.DB, namespace st
}

func getDB(conn string) (*sql.DB, error) {
// Open the database connection
db, err := sql.Open("postgres", conn)
if err != nil {
return nil, err
}

// Configure connection pool settings
db.SetMaxOpenConns(1) // Limit to single connection since we're dealing with PgBouncer
db.SetMaxIdleConns(1) // Keep one idle connection ready
db.SetConnMaxLifetime(5 * time.Minute) // Recycle connections every 5 minutes to prevent staleness
db.SetConnMaxIdleTime(1 * time.Minute) // Close idle connections after 1 minute

// Verify the connection is working
rows, err := db.Query("SHOW STATS")
if err != nil {
db.Close() // Clean up if connection test fails
return nil, fmt.Errorf("error pinging pgbouncer: %w", err)
}
defer rows.Close()

db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)

return db, nil
}

Expand Down Expand Up @@ -381,22 +425,22 @@ func dbToFloat64(t interface{}, factor float64) (float64, bool) {
}

// Iterate through all the namespace mappings in the exporter and run their queries.
func queryNamespaceMappings(ch chan<- prometheus.Metric, db *sql.DB, metricMap map[string]MetricMapNamespace, logger *slog.Logger) map[string]error {
func (e *Exporter) queryNamespaceMappings(ch chan<- prometheus.Metric, db *sql.DB, metricMap map[string]MetricMapNamespace) map[string]error {
// Return a map of namespace -> errors
namespaceErrors := make(map[string]error)

for namespace, mapping := range metricMap {
logger.Debug("Querying namespace", "namespace", namespace)
nonFatalErrors, err := queryNamespaceMapping(ch, db, namespace, mapping, logger)
e.logger.Debug("Querying namespace", "namespace", namespace)
nonFatalErrors, err := queryNamespaceMapping(ch, db, namespace, mapping, e.filterEmptyPools, e.logger)
// Serious error - a namespace disappeared
if err != nil {
namespaceErrors[namespace] = err
logger.Info("namespace disappeared", "err", err.Error())
e.logger.Info("namespace disappeared", "err", err.Error())
}
// Non-serious errors - likely version or parsing problems.
if len(nonFatalErrors) > 0 {
for _, err := range nonFatalErrors {
logger.Info("error parsing", "err", err.Error())
e.logger.Info("error parsing", "err", err.Error())
}
}
}
Expand Down Expand Up @@ -469,36 +513,62 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {

// Collect implements prometheus.Collector.
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
e.logger.Info("Starting scrape")
start := time.Now()
e.logger.Info("Starting scrape", "timestamp", start)

var up = 1.0

// Version query timing
versionStart := time.Now()
err := queryVersion(ch, e.db)
if err != nil {
e.logger.Error("error getting version", "err", err.Error())
up = 0
}
e.logger.Info("Version query completed",
"duration_ms", time.Since(versionStart).Milliseconds(),
"timestamp", time.Now())

// SHOW LISTS timing
listsStart := time.Now()
if err = queryShowLists(ch, e.db, e.logger); err != nil {
e.logger.Error("error getting SHOW LISTS", "err", err.Error())
up = 0
}
e.logger.Info("SHOW LISTS query completed",
"duration_ms", time.Since(listsStart).Milliseconds(),
"timestamp", time.Now())

// SHOW CONFIG timing
configStart := time.Now()
if err = queryShowConfig(ch, e.db, e.logger); err != nil {
e.logger.Error("error getting SHOW CONFIG", "err", err.Error())
up = 0
}
e.logger.Info("SHOW CONFIG query completed",
"duration_ms", time.Since(configStart).Milliseconds(),
"timestamp", time.Now())

errMap := queryNamespaceMappings(ch, e.db, e.metricMap, e.logger)
// Namespace mappings timing
mappingsStart := time.Now()
errMap := e.queryNamespaceMappings(ch, e.db, e.metricMap)
if len(errMap) > 0 {
e.logger.Error("error querying namespace mappings", "err", errMap)
up = 0
}
e.logger.Info("Namespace mappings completed",
"duration_ms", time.Since(mappingsStart).Milliseconds(),
"timestamp", time.Now())

if len(errMap) == len(e.metricMap) {
up = 0
}

ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, up)

e.logger.Info("Scrape completed",
"total_duration_ms", time.Since(start).Milliseconds(),
"timestamp", time.Now())
}

// Turn the MetricMap column mapping into a prometheus descriptor mapping.
Expand Down
Loading