Skip to content

Commit f241b96

Browse files
authored
Merge pull request #1 from AnyVisionltd/move-client-pkg
move client logic from main to client package
2 parents df80032 + 33cdef0 commit f241b96

File tree

3 files changed

+249
-191
lines changed

3 files changed

+249
-191
lines changed

Diff for: client/client.go

+227
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
package client
2+
3+
import (
4+
"bufio"
5+
"bytes"
6+
"context"
7+
"errors"
8+
"fmt"
9+
"io"
10+
"log/slog"
11+
"net/http"
12+
"net/url"
13+
"strings"
14+
"time"
15+
16+
"github.com/cenkalti/backoff/v4"
17+
"github.com/prometheus-community/pushprox/util"
18+
"github.com/prometheus/client_golang/prometheus"
19+
)
20+
21+
var (
22+
scrapeErrorCounter = prometheus.NewCounter(
23+
prometheus.CounterOpts{
24+
Name: "pushprox_client_scrape_errors_total",
25+
Help: "Number of scrape errors",
26+
},
27+
)
28+
pushErrorCounter = prometheus.NewCounter(
29+
prometheus.CounterOpts{
30+
Name: "pushprox_client_push_errors_total",
31+
Help: "Number of push errors",
32+
},
33+
)
34+
pollErrorCounter = prometheus.NewCounter(
35+
prometheus.CounterOpts{
36+
Name: "pushprox_client_poll_errors_total",
37+
Help: "Number of poll errors",
38+
},
39+
)
40+
)
41+
42+
func init() {
43+
prometheus.MustRegister(pushErrorCounter, pollErrorCounter, scrapeErrorCounter)
44+
}
45+
46+
func DefaultBackoff() backoff.BackOff {
47+
b := backoff.NewExponentialBackOff()
48+
b.InitialInterval = 1 * time.Second
49+
b.Multiplier = 1.5
50+
b.MaxInterval = 5 * time.Second
51+
b.MaxElapsedTime = time.Duration(0)
52+
return b
53+
}
54+
55+
// Coordinator for scrape requests and responses
56+
type Coordinator struct {
57+
logger *slog.Logger
58+
client *http.Client
59+
bo backoff.BackOff
60+
fqdn string
61+
proxyUrl string
62+
}
63+
64+
func NewCoordinator(logger *slog.Logger, bo backoff.BackOff, client *http.Client, fqdn, proxyURL string) (*Coordinator, error) {
65+
if fqdn == "" {
66+
return nil, errors.New("fqdn must be specified")
67+
}
68+
if proxyURL == "" {
69+
return nil, errors.New("proxyURL must be specified")
70+
}
71+
if bo == nil {
72+
logger.Warn("No backoff provided, using default")
73+
bo = DefaultBackoff()
74+
}
75+
c := &Coordinator{
76+
logger: logger,
77+
client: client,
78+
bo: bo,
79+
fqdn: fqdn,
80+
proxyUrl: proxyURL,
81+
}
82+
return c, nil
83+
}
84+
85+
func (c *Coordinator) Start(ctx context.Context) {
86+
c.loop(ctx)
87+
}
88+
89+
func (c *Coordinator) handleErr(request *http.Request, err error) {
90+
c.logger.Error("Coordinator error", "error", err)
91+
scrapeErrorCounter.Inc()
92+
resp := &http.Response{
93+
StatusCode: http.StatusInternalServerError,
94+
Body: io.NopCloser(strings.NewReader(err.Error())),
95+
Header: http.Header{},
96+
}
97+
if err = c.doPush(resp, request); err != nil {
98+
pushErrorCounter.Inc()
99+
c.logger.Warn("Failed to push failed scrape response:", "err", err)
100+
return
101+
}
102+
c.logger.Info("Pushed failed scrape response")
103+
}
104+
105+
func (c *Coordinator) doScrape(request *http.Request) {
106+
logger := c.logger.With("scrape_id", request.Header.Get("id"))
107+
timeout, err := util.GetHeaderTimeout(request.Header)
108+
if err != nil {
109+
c.handleErr(request, err)
110+
return
111+
}
112+
ctx, cancel := context.WithTimeout(request.Context(), timeout)
113+
defer cancel()
114+
request = request.WithContext(ctx)
115+
// We cannot handle https requests at the proxy, as we would only
116+
// see a CONNECT, so use a URL parameter to trigger it.
117+
params := request.URL.Query()
118+
if params.Get("_scheme") == "https" {
119+
request.URL.Scheme = "https"
120+
params.Del("_scheme")
121+
request.URL.RawQuery = params.Encode()
122+
}
123+
124+
if request.URL.Hostname() != c.fqdn {
125+
c.handleErr(request, errors.New("scrape target doesn't match client fqdn"))
126+
return
127+
}
128+
129+
scrapeResp, err := c.client.Do(request)
130+
if err != nil {
131+
c.handleErr(request, fmt.Errorf("failed to scrape %s: %w", request.URL.String(), err))
132+
return
133+
}
134+
logger.Info("Retrieved scrape response")
135+
if err = c.doPush(scrapeResp, request); err != nil {
136+
pushErrorCounter.Inc()
137+
logger.Warn("Failed to push scrape response:", "err", err)
138+
return
139+
}
140+
logger.Info("Pushed scrape result")
141+
}
142+
143+
// Report the result of the scrape back up to the proxy.
144+
func (c *Coordinator) doPush(resp *http.Response, origRequest *http.Request) error {
145+
resp.Header.Set("id", origRequest.Header.Get("id")) // Link the request and response
146+
// Remaining scrape deadline.
147+
deadline, _ := origRequest.Context().Deadline()
148+
resp.Header.Set("X-Prometheus-Scrape-Timeout", fmt.Sprintf("%f", float64(time.Until(deadline))/1e9))
149+
150+
base, err := url.Parse(c.proxyUrl)
151+
if err != nil {
152+
return err
153+
}
154+
u, err := url.Parse("push")
155+
if err != nil {
156+
return err
157+
}
158+
url := base.ResolveReference(u)
159+
160+
buf := &bytes.Buffer{}
161+
//nolint:errcheck // https://github.com/prometheus-community/PushProx/issues/111
162+
resp.Write(buf)
163+
request := &http.Request{
164+
Method: "POST",
165+
URL: url,
166+
Body: io.NopCloser(buf),
167+
ContentLength: int64(buf.Len()),
168+
}
169+
request = request.WithContext(origRequest.Context())
170+
if _, err = c.client.Do(request); err != nil {
171+
return err
172+
}
173+
return nil
174+
}
175+
176+
func (c *Coordinator) doPoll(ctx context.Context) error {
177+
base, err := url.Parse(c.proxyUrl)
178+
if err != nil {
179+
c.logger.Error("Error parsing url:", "err", err)
180+
return fmt.Errorf("error parsing url: %w", err)
181+
}
182+
u, err := url.Parse("poll")
183+
if err != nil {
184+
c.logger.Error("Error parsing url:", "err", err)
185+
return fmt.Errorf("error parsing url poll: %w", err)
186+
}
187+
pollUrl := base.ResolveReference(u)
188+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, pollUrl.String(), strings.NewReader(c.fqdn))
189+
if err != nil {
190+
c.logger.Error("Error creating request:", "err", err)
191+
}
192+
resp, err := c.client.Do(req)
193+
if err != nil {
194+
c.logger.Error("Error polling:", "err", err)
195+
return fmt.Errorf("error polling: %w", err)
196+
}
197+
defer resp.Body.Close()
198+
199+
request, err := http.ReadRequest(bufio.NewReader(resp.Body))
200+
if err != nil {
201+
c.logger.Error("Error reading request:", "err", err)
202+
return fmt.Errorf("error reading request: %w", err)
203+
}
204+
c.logger.Info("Got scrape request", "scrape_id", request.Header.Get("id"), "url", request.URL)
205+
206+
request.RequestURI = ""
207+
208+
go c.doScrape(request)
209+
210+
return nil
211+
}
212+
213+
func (c *Coordinator) loop(ctx context.Context) {
214+
ctx, cancel := context.WithCancel(ctx)
215+
defer cancel()
216+
op := func() error {
217+
return c.doPoll(ctx)
218+
}
219+
220+
for ctx.Err() == nil {
221+
if err := backoff.RetryNotify(op, c.bo, func(err error, _ time.Duration) {
222+
pollErrorCounter.Inc()
223+
}); err != nil {
224+
c.logger.Error("backoff returned error", "error", err)
225+
}
226+
}
227+
}

Diff for: cmd/client/main_test.go renamed to client/client_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14-
package main
14+
package client
1515

1616
import (
17+
"context"
1718
"errors"
1819
"fmt"
1920
"net/http"
@@ -23,13 +24,13 @@ import (
2324
"github.com/prometheus/common/promslog"
2425
)
2526

26-
func prepareTest() (*httptest.Server, Coordinator) {
27+
func prepareTest() (*httptest.Server, *Coordinator) {
2728
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2829
w.WriteHeader(http.StatusOK)
2930
fmt.Fprintln(w, "GET /index.html HTTP/1.0\n\nOK")
3031
}))
31-
c := Coordinator{logger: promslog.NewNopLogger()}
32-
*proxyURL = ts.URL
32+
c, _ := NewCoordinator(promslog.NewNopLogger(), nil, ts.Client(), ts.URL, ts.URL)
33+
3334
return ts, c
3435
}
3536

@@ -42,8 +43,7 @@ func TestDoScrape(t *testing.T) {
4243
t.Fatal(err)
4344
}
4445
req.Header.Add("X-Prometheus-Scrape-Timeout-Seconds", "10.0")
45-
*myFqdn = ts.URL
46-
c.doScrape(req, ts.Client())
46+
c.doScrape(req)
4747
}
4848

4949
func TestHandleErr(t *testing.T) {
@@ -54,13 +54,13 @@ func TestHandleErr(t *testing.T) {
5454
if err != nil {
5555
t.Fatal(err)
5656
}
57-
c.handleErr(req, ts.Client(), errors.New("test error"))
57+
c.handleErr(req, errors.New("test error"))
5858
}
5959

6060
func TestLoop(t *testing.T) {
6161
ts, c := prepareTest()
6262
defer ts.Close()
63-
if err := c.doPoll(ts.Client()); err != nil {
63+
if err := c.doPoll(context.Background()); err != nil {
6464
t.Fatal(err)
6565
}
6666
}

0 commit comments

Comments
 (0)