From d08bc18f6a9e37a1fb4a8754b3bb5b155204aded Mon Sep 17 00:00:00 2001 From: Brian Akins Date: Tue, 16 Apr 2019 11:57:21 -0400 Subject: [PATCH 1/2] Initial support for multiple php-fpm targets --- cmd/php-fpm-exporter/main.go | 13 ++- collector.go | 161 ++++++++++++++++++++++++----------- exporter.go | 47 ++++------ 3 files changed, 136 insertions(+), 85 deletions(-) diff --git a/cmd/php-fpm-exporter/main.go b/cmd/php-fpm-exporter/main.go index a193256..945180e 100644 --- a/cmd/php-fpm-exporter/main.go +++ b/cmd/php-fpm-exporter/main.go @@ -13,6 +13,7 @@ func main() { endpoint = kingpin.Flag("endpoint", "url for php-fpm status").Default("http://127.0.0.1:9000/status").String() fcgiEndpoint = kingpin.Flag("fastcgi", "fastcgi url. If this is set, fastcgi will be used instead of HTTP").String() metricsEndpoint = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics. Cannot be /").Default("/metrics").String() + targets = kingpin.Flag("targets", "targets for scraping in the form name=url").StringMap() ) kingpin.HelpFlag.Short('h') @@ -23,10 +24,18 @@ func main() { panic(err) } + t := *targets + if len(t) == 0 { + if *fcgiEndpoint != "" { + t["default"] = *fcgiEndpoint + } else { + t["default"] = *endpoint + } + } + e, err := exporter.New( exporter.SetAddress(*addr), - exporter.SetEndpoint(*endpoint), - exporter.SetFastcgi(*fcgiEndpoint), + exporter.SetTargets(t), exporter.SetLogger(logger), exporter.SetMetricsEndpoint(*metricsEndpoint), ) diff --git a/collector.go b/collector.go index 882283c..655b608 100644 --- a/collector.go +++ b/collector.go @@ -1,11 +1,13 @@ package exporter import ( + "container/list" "io/ioutil" "net/http" "net/url" "regexp" "strconv" + "sync" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -29,7 +31,7 @@ type collector struct { maxChildrenReached *prometheus.Desc slowRequests *prometheus.Desc scrapeFailures *prometheus.Desc - failureCount int + targets []*targetCollector } const metricsNamespace = "phpfpm" @@ -42,19 +44,24 @@ func newFuncMetric(metricName string, docString string, labels []string) *promet } func (e *Exporter) newCollector() *collector { - return &collector{ + c := &collector{ exporter: e, - up: newFuncMetric("up", "able to contact php-fpm", nil), - acceptedConn: newFuncMetric("accepted_connections_total", "Total number of accepted connections", nil), - listenQueue: newFuncMetric("listen_queue_connections", "Number of connections that have been initiated but not yet accepted", nil), - maxListenQueue: newFuncMetric("listen_queue_max_connections", "Max number of connections the listen queue has reached since FPM start", nil), - listenQueueLength: newFuncMetric("listen_queue_length_connections", "The length of the socket queue, dictating maximum number of pending connections", nil), - phpProcesses: newFuncMetric("processes_total", "process count", []string{"state"}), - maxActiveProcesses: newFuncMetric("active_max_processes", "Maximum active process count", nil), - maxChildrenReached: newFuncMetric("max_children_reached_total", "Number of times the process limit has been reached", nil), - slowRequests: newFuncMetric("slow_requests_total", "Number of requests that exceed request_slowlog_timeout", nil), - scrapeFailures: newFuncMetric("scrape_failures_total", "Number of errors while scraping php_fpm", nil), + up: newFuncMetric("up", "able to contact php-fpm", []string{"target"}), + acceptedConn: newFuncMetric("accepted_connections_total", "Total number of accepted connections", []string{"target"}), + listenQueue: newFuncMetric("listen_queue_connections", "Number of connections that have been initiated but not yet accepted", []string{"target"}), + maxListenQueue: newFuncMetric("listen_queue_max_connections", "Max number of connections the listen queue has reached since FPM start", []string{"target"}), + listenQueueLength: newFuncMetric("listen_queue_length_connections", "The length of the socket queue, dictating maximum number of pending connections", []string{"target"}), + phpProcesses: newFuncMetric("processes_total", "process count", []string{"target", "state"}), + maxActiveProcesses: newFuncMetric("active_max_processes", "Maximum active process count", []string{"target"}), + maxChildrenReached: newFuncMetric("max_children_reached_total", "Number of times the process limit has been reached", []string{"target"}), + slowRequests: newFuncMetric("slow_requests_total", "Number of requests that exceed request_slowlog_timeout", []string{"target"}), + scrapeFailures: newFuncMetric("scrape_failures_total", "Number of errors while scraping php_fpm", []string{"target"}), } + + for k, v := range e.targets { + c.targets = append(c.targets, newTargetCollector(k, v, c)) + } + return c } func (c *collector) Describe(ch chan<- *prometheus.Desc) { @@ -143,37 +150,96 @@ func getDataHTTP(u *url.URL) ([]byte, error) { } func (c *collector) Collect(ch chan<- prometheus.Metric) { + var wg sync.WaitGroup + + // simple "queue" of metrics + metrics := list.New() + var mutex sync.Mutex + + for _, t := range c.targets { + t := t + wg.Add(1) + go func() { + defer wg.Done() + out, err := t.collect() + if err != nil { + c.exporter.logger.Error("error collecting php-fpm metrics", zap.String("target", t.name), zap.Error(err)) + } + mutex.Lock() + defer mutex.Unlock() + + for _, m := range out { + metrics.PushBack(m) + } + }() + } + + wg.Wait() + + // should be no writers to the list, but just in case + mutex.Lock() + defer mutex.Unlock() + + for e := metrics.Front(); e != nil; e = e.Next() { + m := e.Value.(prometheus.Metric) + ch <- m + } +} + +type targetCollector struct { + name string + url *url.URL + failures int + up bool + collector *collector +} + +func newTargetCollector(name string, url *url.URL, collector *collector) *targetCollector { + return &targetCollector{ + name: name, + url: url, + collector: collector, + } +} + +func (t *targetCollector) collect() ([]prometheus.Metric, error) { up := 1.0 var ( body []byte err error + out []prometheus.Metric ) - if c.exporter.fcgiEndpoint != nil && c.exporter.fcgiEndpoint.String() != "" { - body, err = getDataFastcgi(c.exporter.fcgiEndpoint) - } else { - body, err = getDataHTTP(c.exporter.endpoint) + switch t.url.Scheme { + case "http", "https": + body, err = getDataHTTP(t.url) + default: + body, err = getDataFastcgi(t.url) } if err != nil { up = 0.0 - c.exporter.logger.Error("failed to get php-fpm status", zap.Error(err)) - c.failureCount++ + t.failures++ } - ch <- prometheus.MustNewConstMetric( - c.up, - prometheus.GaugeValue, - up, - ) - ch <- prometheus.MustNewConstMetric( - c.scrapeFailures, - prometheus.CounterValue, - float64(c.failureCount), - ) + out = append(out, + prometheus.MustNewConstMetric( + t.collector.up, + prometheus.GaugeValue, + up, + t.name, + )) + + out = append(out, + prometheus.MustNewConstMetric( + t.collector.scrapeFailures, + prometheus.CounterValue, + float64(t.failures), + t.name, + )) - if up == 0.0 { - return + if err != nil { + return out, err } matches := statusLineRegexp.FindAllStringSubmatch(string(body), -1) @@ -186,52 +252,45 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { var desc *prometheus.Desc var valueType prometheus.ValueType - labels := []string{} + labels := []string{t.name} switch key { case "accepted conn": - desc = c.acceptedConn + desc = t.collector.acceptedConn valueType = prometheus.CounterValue case "listen queue": - desc = c.listenQueue + desc = t.collector.listenQueue valueType = prometheus.GaugeValue case "max listen queue": - desc = c.maxListenQueue + desc = t.collector.maxListenQueue valueType = prometheus.CounterValue case "listen queue len": - desc = c.listenQueueLength + desc = t.collector.listenQueueLength valueType = prometheus.GaugeValue case "idle processes": - desc = c.phpProcesses + desc = t.collector.phpProcesses valueType = prometheus.GaugeValue labels = append(labels, "idle") case "active processes": - desc = c.phpProcesses + desc = t.collector.phpProcesses valueType = prometheus.GaugeValue labels = append(labels, "active") case "max active processes": - desc = c.maxActiveProcesses + desc = t.collector.maxActiveProcesses valueType = prometheus.CounterValue case "max children reached": - desc = c.maxChildrenReached + desc = t.collector.maxChildrenReached valueType = prometheus.CounterValue case "slow requests": - desc = c.slowRequests + desc = t.collector.slowRequests valueType = prometheus.CounterValue default: continue } - m, err := prometheus.NewConstMetric(desc, valueType, float64(value), labels...) - if err != nil { - c.exporter.logger.Error( - "failed to create metrics", - zap.String("key", key), - zap.Error(err), - ) - continue - } - - ch <- m + out = append(out, + prometheus.MustNewConstMetric(desc, valueType, float64(value), labels...)) } + + return out, nil } diff --git a/exporter.go b/exporter.go index 1bd48c9..49f240f 100644 --- a/exporter.go +++ b/exporter.go @@ -20,8 +20,7 @@ import ( // Exporter handles serving the metrics type Exporter struct { addr string - endpoint *url.URL - fcgiEndpoint *url.URL + targets map[string]*url.URL logger *zap.Logger metricsEndpoint string } @@ -32,7 +31,8 @@ type OptionsFunc func(*Exporter) error // New creates an exporter. func New(options ...OptionsFunc) (*Exporter, error) { e := &Exporter{ - addr: ":9090", + addr: ":9090", + targets: make(map[string]*url.URL), } for _, f := range options { @@ -49,9 +49,9 @@ func New(options ...OptionsFunc) (*Exporter, error) { e.logger = l } - if e.endpoint == nil && e.fcgiEndpoint == nil { + if len(e.targets) == 0 { u, _ := url.Parse("http://localhost:9000/status") - e.endpoint = u + e.targets["default"] = u } return e, nil } @@ -78,36 +78,19 @@ func SetAddress(addr string) func(*Exporter) error { } } -// SetEndpoint creates a function that will set the URL endpoint to contact -// php-fpm. +// SetTargets creates a function that will set the target URL's for scraping // Generally only used when create a new Exporter. -func SetEndpoint(rawurl string) func(*Exporter) error { +func SetTargets(in map[string]string) func(*Exporter) error { return func(e *Exporter) error { - if rawurl == "" { - return nil - } - u, err := url.Parse(rawurl) - if err != nil { - return errors.Wrap(err, "failed to parse url") - } - e.endpoint = u - return nil - } -} - -// SetFastcgi creates a function that will set the fastcgi URL endpoint to contact -// php-fpm. If this is set, then fastcgi is used rather than HTTP. -// Generally only used when create a new Exporter. -func SetFastcgi(rawurl string) func(*Exporter) error { - return func(e *Exporter) error { - if rawurl == "" { - return nil - } - u, err := url.Parse(rawurl) - if err != nil { - return errors.Wrap(err, "failed to parse url") + targets := make(map[string]*url.URL) + for k, v := range in { + u, err := url.Parse(v) + if err != nil { + return errors.Wrap(err, "failed to parse url") + } + targets[k] = u } - e.fcgiEndpoint = u + e.targets = targets return nil } } From 08c0eee2484b97f16ddba51a27b26a2df907ece6 Mon Sep 17 00:00:00 2001 From: Brian Akins Date: Tue, 16 Apr 2019 12:01:16 -0400 Subject: [PATCH 2/2] Initial support for multiple php-fpm targets --- collector.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/collector.go b/collector.go index 655b608..046bc0b 100644 --- a/collector.go +++ b/collector.go @@ -1,7 +1,6 @@ package exporter import ( - "container/list" "io/ioutil" "net/http" "net/url" @@ -153,7 +152,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { var wg sync.WaitGroup // simple "queue" of metrics - metrics := list.New() + metrics := make([]prometheus.Metric, 0, len(c.targets)*10) var mutex sync.Mutex for _, t := range c.targets { @@ -167,10 +166,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { } mutex.Lock() defer mutex.Unlock() - - for _, m := range out { - metrics.PushBack(m) - } + metrics = append(metrics, out...) }() } @@ -180,8 +176,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { mutex.Lock() defer mutex.Unlock() - for e := metrics.Front(); e != nil; e = e.Next() { - m := e.Value.(prometheus.Metric) + for _, m := range metrics { ch <- m } }