diff --git a/cached_collector.go b/cached_collector.go new file mode 100644 index 0000000..b97dde2 --- /dev/null +++ b/cached_collector.go @@ -0,0 +1,133 @@ +package main + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// MetricCache holds cached metrics and related metadata +type MetricCache struct { + metrics []prometheus.Metric + lastUpdated time.Time + updating bool + mu sync.RWMutex +} + +// CachedExporter wraps the original Exporter with caching capability +type CachedExporter struct { + *Exporter + cache *MetricCache + cacheInterval time.Duration + ctx context.Context + cancel context.CancelFunc +} + +// NewCachedExporter creates a new exporter with caching capabilities +func NewCachedExporter(connectionString string, namespace string, logger *slog.Logger, filterEmptyPools bool, cacheInterval time.Duration) *CachedExporter { + ctx, cancel := context.WithCancel(context.Background()) + + cached := &CachedExporter{ + Exporter: NewExporter(connectionString, namespace, logger, filterEmptyPools), + cache: &MetricCache{ + metrics: make([]prometheus.Metric, 0), + }, + cacheInterval: cacheInterval, + ctx: ctx, + cancel: cancel, + } + + // Start the background cache updater + go cached.updateMetricsLoop() + + // Perform initial cache population + cached.updateCache() + + return cached +} + +// updateMetricsLoop runs a periodic update of the cached metrics +func (ce *CachedExporter) updateMetricsLoop() { + ticker := time.NewTicker(ce.cacheInterval) + defer ticker.Stop() + + for { + select { + case <-ce.ctx.Done(): + return + case <-ticker.C: + ce.updateCache() + } + } +} + +// updateCache performs the actual metrics collection and updates the cache +func (ce *CachedExporter) updateCache() { + ce.cache.mu.Lock() + if ce.cache.updating { + ce.cache.mu.Unlock() + return + } + ce.cache.updating = true + ce.cache.mu.Unlock() + + // Create a channel to collect metrics + ch := make(chan prometheus.Metric) + done := make(chan struct{}) + + // Collect metrics in a separate goroutine + collected := make([]prometheus.Metric, 0) + go func() { + for metric := range ch { + collected = append(collected, metric) + } + close(done) + }() + + // Perform collection using parent's Collect + ce.Exporter.Collect(ch) + close(ch) + <-done + + // Update the cache with new metrics + ce.cache.mu.Lock() + ce.cache.metrics = collected + ce.cache.lastUpdated = time.Now() + ce.cache.updating = false + ce.cache.mu.Unlock() +} + +// Collect implements prometheus.Collector interface using cached metrics +func (ce *CachedExporter) Collect(ch chan<- prometheus.Metric) { + ce.cache.mu.RLock() + defer ce.cache.mu.RUnlock() + + // Send cached metrics + for _, m := range ce.cache.metrics { + ch <- m + } + + // Add a metric for cache age + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "cache_age_seconds"), + "Number of seconds since the metrics cache was last updated", + nil, nil, + ), + prometheus.GaugeValue, + time.Since(ce.cache.lastUpdated).Seconds(), + ) +} + +// Describe implements prometheus.Collector interface +func (ce *CachedExporter) Describe(ch chan<- *prometheus.Desc) { + ce.Exporter.Describe(ch) +} + +// Close stops the background updater +func (ce *CachedExporter) Close() { + ce.cancel() +} diff --git a/collector.go b/collector.go index fb527dd..e78a1a7 100644 --- a/collector.go +++ b/collector.go @@ -133,7 +133,7 @@ var ( ) ) -func NewExporter(connectionString string, namespace string, logger *slog.Logger) *Exporter { +func NewExporter(connectionString string, namespace string, logger *slog.Logger, filterEmptyPools bool) *Exporter { db, err := getDB(connectionString) @@ -143,9 +143,10 @@ func NewExporter(connectionString string, namespace string, logger *slog.Logger) } return &Exporter{ - metricMap: makeDescMap(metricMaps, namespace, logger), - db: db, - logger: logger, + metricMap: makeDescMap(metricMaps, namespace, logger), + db: db, + logger: logger, + filterEmptyPools: filterEmptyPools, } } @@ -235,9 +236,34 @@ func queryShowConfig(ch chan<- prometheus.Metric, db *sql.DB, logger *slog.Logge return nil } +func hasActiveConnections(columnData []interface{}, columnIdx map[string]int) (bool, error) { + // Connection metrics to check + metricsToCheck := []string{ + "cl_active", + "cl_waiting", + "sv_active", + "sv_idle", + "sv_used", + "sv_tested", + "sv_login", + } + + sum := float64(0) + + for _, metric := range metricsToCheck { + if idx, ok := columnIdx[metric]; ok { + if value, ok := dbToFloat64(columnData[idx], 1.0); ok { + sum += value + } + } + } + + return sum > 0, nil +} + // Query within a namespace mapping and emit metrics. Returns fatal errors if // the scrape fails, and a slice of errors if they were non-fatal. -func queryNamespaceMapping(ch chan<- prometheus.Metric, db *sql.DB, namespace string, mapping MetricMapNamespace, logger *slog.Logger) ([]error, error) { +func queryNamespaceMapping(ch chan<- prometheus.Metric, db *sql.DB, namespace string, mapping MetricMapNamespace, filterEmptyPools bool, logger *slog.Logger) ([]error, error) { query := fmt.Sprintf("SHOW %s;", namespace) // Don't fail on a bad scrape of one metric @@ -275,6 +301,17 @@ func queryNamespaceMapping(ch chan<- prometheus.Metric, db *sql.DB, namespace st return []error{}, fmt.Errorf("error retrieving rows: %v, error: %w", namespace, err) } + if namespace == "pools" && filterEmptyPools { + hasConnections, err := hasActiveConnections(columnData, columnIdx) + if err != nil { + nonfatalErrors = append(nonfatalErrors, fmt.Errorf("error checking active connections: %w", err)) + continue + } + if !hasConnections { + continue // Skip this pool + } + } + for i, label := range mapping.labels { for idx, columnName := range columnNames { if columnName == label { @@ -333,19 +370,26 @@ func queryNamespaceMapping(ch chan<- prometheus.Metric, db *sql.DB, namespace st } func getDB(conn string) (*sql.DB, error) { + // Open the database connection db, err := sql.Open("postgres", conn) if err != nil { return nil, err } + + // Configure connection pool settings + db.SetMaxOpenConns(1) // Limit to single connection since we're dealing with PgBouncer + db.SetMaxIdleConns(1) // Keep one idle connection ready + db.SetConnMaxLifetime(5 * time.Minute) // Recycle connections every 5 minutes to prevent staleness + db.SetConnMaxIdleTime(1 * time.Minute) // Close idle connections after 1 minute + + // Verify the connection is working rows, err := db.Query("SHOW STATS") if err != nil { + db.Close() // Clean up if connection test fails return nil, fmt.Errorf("error pinging pgbouncer: %w", err) } defer rows.Close() - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) - return db, nil } @@ -381,22 +425,22 @@ func dbToFloat64(t interface{}, factor float64) (float64, bool) { } // Iterate through all the namespace mappings in the exporter and run their queries. -func queryNamespaceMappings(ch chan<- prometheus.Metric, db *sql.DB, metricMap map[string]MetricMapNamespace, logger *slog.Logger) map[string]error { +func (e *Exporter) queryNamespaceMappings(ch chan<- prometheus.Metric, db *sql.DB, metricMap map[string]MetricMapNamespace) map[string]error { // Return a map of namespace -> errors namespaceErrors := make(map[string]error) for namespace, mapping := range metricMap { - logger.Debug("Querying namespace", "namespace", namespace) - nonFatalErrors, err := queryNamespaceMapping(ch, db, namespace, mapping, logger) + e.logger.Debug("Querying namespace", "namespace", namespace) + nonFatalErrors, err := queryNamespaceMapping(ch, db, namespace, mapping, e.filterEmptyPools, e.logger) // Serious error - a namespace disappeared if err != nil { namespaceErrors[namespace] = err - logger.Info("namespace disappeared", "err", err.Error()) + e.logger.Info("namespace disappeared", "err", err.Error()) } // Non-serious errors - likely version or parsing problems. if len(nonFatalErrors) > 0 { for _, err := range nonFatalErrors { - logger.Info("error parsing", "err", err.Error()) + e.logger.Info("error parsing", "err", err.Error()) } } } @@ -469,36 +513,62 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) { // Collect implements prometheus.Collector. func (e *Exporter) Collect(ch chan<- prometheus.Metric) { - e.logger.Info("Starting scrape") + start := time.Now() + e.logger.Info("Starting scrape", "timestamp", start) var up = 1.0 + // Version query timing + versionStart := time.Now() err := queryVersion(ch, e.db) if err != nil { e.logger.Error("error getting version", "err", err.Error()) up = 0 } + e.logger.Info("Version query completed", + "duration_ms", time.Since(versionStart).Milliseconds(), + "timestamp", time.Now()) + // SHOW LISTS timing + listsStart := time.Now() if err = queryShowLists(ch, e.db, e.logger); err != nil { e.logger.Error("error getting SHOW LISTS", "err", err.Error()) up = 0 } + e.logger.Info("SHOW LISTS query completed", + "duration_ms", time.Since(listsStart).Milliseconds(), + "timestamp", time.Now()) + // SHOW CONFIG timing + configStart := time.Now() if err = queryShowConfig(ch, e.db, e.logger); err != nil { e.logger.Error("error getting SHOW CONFIG", "err", err.Error()) up = 0 } + e.logger.Info("SHOW CONFIG query completed", + "duration_ms", time.Since(configStart).Milliseconds(), + "timestamp", time.Now()) - errMap := queryNamespaceMappings(ch, e.db, e.metricMap, e.logger) + // Namespace mappings timing + mappingsStart := time.Now() + errMap := e.queryNamespaceMappings(ch, e.db, e.metricMap) if len(errMap) > 0 { e.logger.Error("error querying namespace mappings", "err", errMap) up = 0 } + e.logger.Info("Namespace mappings completed", + "duration_ms", time.Since(mappingsStart).Milliseconds(), + "timestamp", time.Now()) if len(errMap) == len(e.metricMap) { up = 0 } + ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, up) + + e.logger.Info("Scrape completed", + "total_duration_ms", time.Since(start).Milliseconds(), + "timestamp", time.Now()) } // Turn the MetricMap column mapping into a prometheus descriptor mapping. diff --git a/pgbouncer_exporter.go b/pgbouncer_exporter.go index a7e112b..3743c38 100644 --- a/pgbouncer_exporter.go +++ b/pgbouncer_exporter.go @@ -14,6 +14,7 @@ package main import ( + "encoding/json" "net/http" "os" @@ -31,6 +32,12 @@ import ( const namespace = "pgbouncer" +type Config struct { + ConnectionString string `json:"pgBouncer.connectionString"` + MetricsPath string `json:"web.telemetry-path"` + PidFilePath string `json:"pgBouncer.pid-file"` +} + func main() { const pidFileHelpText = `Path to PgBouncer pid file. @@ -48,6 +55,9 @@ func main() { connectionStringPointer = kingpin.Flag("pgBouncer.connectionString", "Connection string for accessing pgBouncer.").Default("postgres://postgres:@localhost:6543/pgbouncer?sslmode=disable").Envar("PGBOUNCER_EXPORTER_CONNECTION_STRING").String() metricsPath = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() pidFilePath = kingpin.Flag("pgBouncer.pid-file", pidFileHelpText).Default("").String() + configFilePath = kingpin.Flag("config.file", "Path to config file.").Default("").String() + filterEmptyPools = kingpin.Flag("filter.empty_pools", "Filter out pools with no active clients").Default("false").Bool() + cacheInterval = kingpin.Flag("collector.cache-interval", "Interval at which to update metrics cache").Default("2m").Duration() ) toolkitFlags := kingpinflag.AddFlags(kingpin.CommandLine, ":9127") @@ -58,12 +68,42 @@ func main() { logger := promslog.New(promslogConfig) + // Read configuration from file if configFilePath is provided + if *configFilePath != "" { + file, err := os.ReadFile(*configFilePath) + if err != nil { + logger.Error("Error parsing config file", "file", *configFilePath, "err", err.Error()) + os.Exit(1) + } + var config Config + if err := json.Unmarshal(file, &config); err != nil { + logger.Error("Error parsing config file", "file", *configFilePath, "err", err.Error()) + os.Exit(1) + } + // Override flags with config file values + if config.ConnectionString != "" { + *connectionStringPointer = config.ConnectionString + } + if config.MetricsPath != "" { + *metricsPath = config.MetricsPath + } + if config.PidFilePath != "" { + *pidFilePath = config.PidFilePath + } + } + connectionString := *connectionStringPointer - exporter := NewExporter(connectionString, namespace, logger) + + // Create cached exporter instead of regular exporter + exporter := NewCachedExporter(connectionString, namespace, logger, *filterEmptyPools, *cacheInterval) + defer exporter.Close() // Ensure background worker is stopped on exit + prometheus.MustRegister(exporter) prometheus.MustRegister(versioncollector.NewCollector("pgbouncer_exporter")) - logger.Info("Starting pgbouncer_exporter", "version", version.Info()) + logger.Info("Starting pgbouncer_exporter", + "version", version.Info(), + "cache_interval", cacheInterval.String()) logger.Info("Build context", "build_context", version.BuildContext()) if *pidFilePath != "" { @@ -76,11 +116,12 @@ func main() { prometheus.MustRegister(procExporter) } + // Set up HTTP server http.Handle(*metricsPath, promhttp.Handler()) if *metricsPath != "/" && *metricsPath != "" { landingConfig := web.LandingConfig{ Name: "PgBouncer Exporter", - Description: "Prometheus Exporter for PgBouncer servers", + Description: "Prometheus Exporter for PgBouncer servers with metric caching", Version: version.Info(), Links: []web.LandingLinks{ { diff --git a/struct.go b/struct.go index 27d4a9f..464a9d9 100644 --- a/struct.go +++ b/struct.go @@ -107,4 +107,6 @@ type Exporter struct { db *sql.DB logger *slog.Logger + + filterEmptyPools bool }