Skip to content
This repository has been archived by the owner on Dec 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #9 from phsm/master
Browse files Browse the repository at this point in the history
Make exporters be polled concurrently
  • Loading branch information
svenwltr authored Apr 12, 2019
2 parents 61bf1db + 5ef26a7 commit 200a6a6
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 42 deletions.
57 changes: 36 additions & 21 deletions cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"io"
"net/http"
"sort"
"sync"
"time"

prom "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
Expand All @@ -19,30 +21,45 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
"RequestURI": r.RequestURI,
"UserAgent": r.UserAgent(),
}).Debug("handling new request")
err := h.Merge(w)
if err != nil {
log.Error(err)
w.WriteHeader(500)
}
h.Merge(w)
}

func (h Handler) Merge(w io.Writer) error {
func (h Handler) Merge(w io.Writer) {
mfs := map[string]*prom.MetricFamily{}
tp := new(expfmt.TextParser)

responses := make([]map[string]*prom.MetricFamily, 1024)
responsesMu := sync.Mutex{}

httpClientTimeout := time.Second * 10

wg := sync.WaitGroup{}
for _, url := range h.Exporters {
log.WithField("url", url).Debug("getting remote metrics")
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
wg.Add(1)
go func(u string) {
defer wg.Done()
log.WithField("url", u).Debug("getting remote metrics")
httpClient := http.Client{Timeout: httpClientTimeout}
resp, err := httpClient.Get(u)
if err != nil {
log.WithField("url", u).Errorf("HTTP connection failed: %v", err)
return
}
defer resp.Body.Close()

part, err := tp.TextToMetricFamilies(resp.Body)
if err != nil {
return err
}
tp := new(expfmt.TextParser)
part, err := tp.TextToMetricFamilies(resp.Body)
if err != nil {
log.WithField("url", u).Errorf("Parse response body to metrics: %v", err)
return
}
responsesMu.Lock()
responses = append(responses, part)
responsesMu.Unlock()
}(url)
}
wg.Wait()

for _, part := range responses {
for n, mf := range part {
mfo, ok := mfs[n]
if ok {
Expand All @@ -64,10 +81,8 @@ func (h Handler) Merge(w io.Writer) error {
for _, n := range names {
err := enc.Encode(mfs[n])
if err != nil {
return err
log.Error(err)
return
}
}

return nil

}
106 changes: 85 additions & 21 deletions cmd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,29 @@ package cmd_test

import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"sort"
"testing"

"github.com/prometheus/common/expfmt"
"github.com/rebuy-de/exporter-merger/cmd"
log "github.com/sirupsen/logrus"
)

func Equal(a, b []float64) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}

func testExporter(t testing.TB, content string) (string, func()) {
t.Helper()

Expand Down Expand Up @@ -50,27 +64,77 @@ func TestHandler(t *testing.T) {
t.Fatalf("Received non-200 response: %d\n", resp.StatusCode)
}

want := `# TYPE bar untyped
bar 4
# TYPE conflict untyped
conflict 2
conflict 5
# TYPE foo untyped
foo 1
# TYPE shared untyped
shared{meh="a"} 3
shared{meh="b"} 6
`
have, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
// want := `# TYPE bar untyped
// bar 4
// # TYPE conflict untyped
// conflict 2
// conflict 5
// # TYPE foo untyped
// foo 1
// # TYPE shared untyped
// shared{meh="a"} 3
// shared{meh="b"} 6
// `
// have, err := ioutil.ReadAll(resp.Body)
// if err != nil {
// t.Fatal(err)
// }

eFmt := new(expfmt.TextParser)
part, err := eFmt.TextToMetricFamilies(resp.Body)

fooWanted := 1.0
var foo float64

barWanted := 4.0
var bar float64

var conflictWanted sort.Float64Slice = []float64{2.0, 5.0}
var conflict sort.Float64Slice = make([]float64, 0)

sharedWanted := map[string]float64{"a": 3.0, "b": 6.0}
shared := make(map[string]float64)

for n, mf := range part {
if n == "bar" {
bar = mf.GetMetric()[0].GetUntyped().GetValue()
}

if n == "foo" {
foo = mf.GetMetric()[0].GetUntyped().GetValue()
}

if n == "conflict" {
for _, metric := range mf.GetMetric() {
conflict = append(conflict, metric.GetUntyped().GetValue())
}
}

if n == "shared" {
for _, metric := range mf.GetMetric() {
label := metric.GetLabel()[0].GetValue()
value := metric.GetUntyped().GetValue()
shared[label] = value
}
}
}

if bar != barWanted {
t.Errorf("bar is %f but wanted %f", bar, barWanted)
}

if foo != 1.0 {
t.Errorf("foo is %f but wanted %f", foo, fooWanted)
}

conflictWanted.Sort()
conflict.Sort()

if !Equal(conflict, conflictWanted) {
t.Errorf("conflict is %v but wanted %v", conflict, conflictWanted)
}

if want != string(have) {
t.Error("Got wrong response.")
t.Error("Want:")
t.Error(want)
t.Error("Have:")
t.Error(string(have))
if !reflect.DeepEqual(shared, sharedWanted) {
t.Errorf("shared is %v but wanted %v", shared, sharedWanted)
}
}

0 comments on commit 200a6a6

Please sign in to comment.