Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Slack chat.postMessage error handling #1086

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions internal/notifier/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,17 @@ func (s *Alertmanager) Post(ctx context.Context, event eventv1.Event) error {
},
}

var opts []requestOptFunc
opts := []messageOption{
withProxy(s.ProxyURL),
withCertPool(s.CertPool),
}
if s.Token != "" {
opts = append(opts, func(request *retryablehttp.Request) {
request.Header.Add("Authorization", "Bearer "+s.Token)
})
opts = append(opts,
withRequestOption(func(request *retryablehttp.Request) {
request.Header.Add("Authorization", "Bearer "+s.Token)
}))
}
err := postMessage(ctx, s.URL, s.ProxyURL, s.CertPool, payload, opts...)
err := postMessage(ctx, s.URL, payload, opts...)
if err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
Expand Down
135 changes: 97 additions & 38 deletions internal/notifier/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,117 @@ import (
"github.com/hashicorp/go-retryablehttp"
)

type messageOptions struct {
proxy string
certPool *x509.CertPool
reqOpts []requestOptFunc
validateResponse func(*http.Response) bool
}
type requestOptFunc func(*retryablehttp.Request)

func postMessage(ctx context.Context, address, proxy string, certPool *x509.CertPool, payload interface{}, reqOpts ...requestOptFunc) error {
type messageOption func(*messageOptions)

func postMessage(ctx context.Context, address string, payload interface{}, options ...messageOption) error {
opts := &messageOptions{
// Default validateResponse function varifies that the response status code is 200, 202 or 201.
validateResponse: func(resp *http.Response) bool {
return resp.StatusCode == http.StatusOK ||
resp.StatusCode == http.StatusAccepted ||
resp.StatusCode == http.StatusCreated
},
}

for _, o := range options {
o(opts)
}

httpClient, err := newHTTPClient(opts)
if err != nil {
return err
}

data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshalling notification payload failed: %w", err)
}

req, err := retryablehttp.NewRequest(http.MethodPost, address, data)
if err != nil {
return fmt.Errorf("failed to create a new request: %w", err)
}

if ctx != nil {
req = req.WithContext(ctx)
}
req.Header.Set("Content-Type", "application/json")
for _, o := range opts.reqOpts {
o(req)
}

resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to execute request: %w", err)
}
defer resp.Body.Close()

if !opts.validateResponse(resp) {
b, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("unable to read response body, %s", err)
}
return fmt.Errorf("request failed with status code %d, %s", resp.StatusCode, string(b))
}

return nil
}

func withProxy(proxy string) messageOption {
return func(opts *messageOptions) {
opts.proxy = proxy
}
}

func withCertPool(certPool *x509.CertPool) messageOption {
return func(opts *messageOptions) {
opts.certPool = certPool
}
}

func withRequestOption(reqOpt requestOptFunc) messageOption {
return func(opts *messageOptions) {
opts.reqOpts = append(opts.reqOpts, reqOpt)
}
}

func withValidateResponse(validateResponse func(*http.Response) bool) messageOption {
return func(opts *messageOptions) {
opts.validateResponse = validateResponse
}
}

func newHTTPClient(opts *messageOptions) (*retryablehttp.Client, error) {
httpClient := retryablehttp.NewClient()
if certPool != nil {
if opts.certPool != nil {
httpClient.HTTPClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certPool,
RootCAs: opts.certPool,
},
}
}

if proxy != "" {
proxyURL, err := url.Parse(proxy)
if opts.proxy != "" {
proxyURL, err := url.Parse(opts.proxy)
if err != nil {
return fmt.Errorf("unable to parse proxy URL '%s', error: %w", proxy, err)
return nil, fmt.Errorf("unable to parse proxy URL '%s', error: %w", opts.proxy, err)
}

var tlsConfig *tls.Config
if certPool != nil {
if opts.certPool != nil {
tlsConfig = &tls.Config{
RootCAs: certPool,
RootCAs: opts.certPool,
}
}

httpClient.HTTPClient.Transport = &http.Transport{
Proxy: http.ProxyURL(proxyURL),
TLSClientConfig: tlsConfig,
Expand All @@ -79,34 +167,5 @@ func postMessage(ctx context.Context, address, proxy string, certPool *x509.Cert
httpClient.RetryMax = 4
httpClient.Logger = nil

data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshalling notification payload failed: %w", err)
}

req, err := retryablehttp.NewRequest(http.MethodPost, address, data)
if err != nil {
return fmt.Errorf("failed to create a new request: %w", err)
}
if ctx != nil {
req = req.WithContext(ctx)
}
req.Header.Set("Content-Type", "application/json")
for _, o := range reqOpts {
o(req)
}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to execute request: %w", err)
}

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted && resp.StatusCode != http.StatusCreated {
b, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("unable to read response body, %s", err)
}
return fmt.Errorf("request failed with status code %d, %s", resp.StatusCode, string(b))
}

return nil
return httpClient, nil
}
6 changes: 3 additions & 3 deletions internal/notifier/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func Test_postMessage(t *testing.T) {
require.Equal(t, "success", payload["status"])
}))
defer ts.Close()
err := postMessage(context.Background(), ts.URL, "", nil, map[string]string{"status": "success"})
err := postMessage(context.Background(), ts.URL, map[string]string{"status": "success"})
require.NoError(t, err)
}

Expand All @@ -56,7 +56,7 @@ func Test_postMessage_timeout(t *testing.T) {
defer ts.Close()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := postMessage(ctx, ts.URL, "", nil, map[string]string{"status": "success"})
err := postMessage(ctx, ts.URL, map[string]string{"status": "success"})
require.Error(t, err, "context deadline exceeded")
}

Expand All @@ -77,7 +77,7 @@ func Test_postSelfSignedCert(t *testing.T) {
require.NoError(t, err)
certpool := x509.NewCertPool()
certpool.AddCert(cert)
err = postMessage(context.Background(), ts.URL, "", certpool, map[string]string{"status": "success"})
err = postMessage(context.Background(), ts.URL, map[string]string{"status": "success"}, withCertPool(certpool))
require.NoError(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/notifier/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *Discord) Post(ctx context.Context, event eventv1.Event) error {

payload.Attachments = []SlackAttachment{a}

err := postMessage(ctx, s.URL, s.ProxyURL, nil, payload)
err := postMessage(ctx, s.URL, payload, withProxy(s.ProxyURL))
if err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
Expand Down
20 changes: 11 additions & 9 deletions internal/notifier/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,17 @@ func (f *Forwarder) Post(ctx context.Context, event eventv1.Event) error {
}
sig = fmt.Sprintf("sha256=%s", sign(eventJSON, f.HMACKey))
}
err := postMessage(ctx, f.URL, f.ProxyURL, f.CertPool, event, func(req *retryablehttp.Request) {
req.Header.Set(NotificationHeader, event.ReportingController)
for key, val := range f.Headers {
req.Header.Set(key, val)
}
if sig != "" {
req.Header.Set("X-Signature", sig)
}
})
err := postMessage(
ctx, f.URL, event, withProxy(f.ProxyURL), withCertPool(f.CertPool),
withRequestOption(func(req *retryablehttp.Request) {
req.Header.Set(NotificationHeader, event.ReportingController)
for key, val := range f.Headers {
req.Header.Set(key, val)
}
if sig != "" {
req.Header.Set("X-Signature", sig)
}
}))

if err != nil {
return fmt.Errorf("postMessage failed: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/notifier/google_chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *GoogleChat) Post(ctx context.Context, event eventv1.Event) error {
Cards: []GoogleChatCard{card},
}

err := postMessage(ctx, s.URL, s.ProxyURL, nil, payload)
err := postMessage(ctx, s.URL, payload, withProxy(s.ProxyURL))
if err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
Expand Down
19 changes: 11 additions & 8 deletions internal/notifier/grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,17 @@ func (g *Grafana) Post(ctx context.Context, event eventv1.Event) error {
Tags: sfields,
}

err := postMessage(ctx, g.URL, g.ProxyURL, g.CertPool, payload, func(request *retryablehttp.Request) {
if (g.Username != "" && g.Password != "") && g.Token == "" {
request.Header.Add("Authorization", "Basic "+basicAuth(g.Username, g.Password))
}
if g.Token != "" {
request.Header.Add("Authorization", "Bearer "+g.Token)
}
})
err := postMessage(
ctx, g.URL, payload, withProxy(g.ProxyURL), withCertPool(g.CertPool),
withRequestOption(func(req *retryablehttp.Request) {
if (g.Username != "" && g.Password != "") && g.Token == "" {
req.Header.Add("Authorization", "Basic "+basicAuth(g.Username, g.Password))
}
if g.Token != "" {
req.Header.Add("Authorization", "Bearer "+g.Token)
}
}),
)
if err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/notifier/lark.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,5 @@ func (l *Lark) Post(ctx context.Context, event eventv1.Event) error {
Card: card,
}

return postMessage(ctx, l.URL, "", nil, payload)
return postMessage(ctx, l.URL, payload)
}
11 changes: 7 additions & 4 deletions internal/notifier/matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ func (m *Matrix) Post(ctx context.Context, event eventv1.Event) error {
MsgType: "m.text",
}

err = postMessage(ctx, fullURL, "", m.CertPool, payload, func(request *retryablehttp.Request) {
request.Method = http.MethodPut
request.Header.Add("Authorization", "Bearer "+m.Token)
})
err = postMessage(
ctx, fullURL, payload, withCertPool(m.CertPool),
withRequestOption(func(request *retryablehttp.Request) {
request.Method = http.MethodPut
request.Header.Add("Authorization", "Bearer "+m.Token)
}),
)
if err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
Expand Down
9 changes: 6 additions & 3 deletions internal/notifier/opsgenie.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,12 @@ func (s *Opsgenie) Post(ctx context.Context, event eventv1.Event) error {
Details: details,
}

err := postMessage(ctx, s.URL, s.ProxyURL, s.CertPool, payload, func(req *retryablehttp.Request) {
req.Header.Set("Authorization", "GenieKey "+s.ApiKey)
})
err := postMessage(
ctx, s.URL, payload, withProxy(s.ProxyURL), withCertPool(s.CertPool),
withRequestOption(func(req *retryablehttp.Request) {
req.Header.Set("Authorization", "GenieKey "+s.ApiKey)
}),
)

if err != nil {
return fmt.Errorf("postMessage failed: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/notifier/pagerduty.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ func (p *PagerDuty) Post(ctx context.Context, event eventv1.Event) error {
return nil
}
e := toPagerDutyV2Event(event, p.RoutingKey)
err := postMessage(ctx, p.Endpoint+"/v2/enqueue", p.ProxyURL, p.CertPool, e)
err := postMessage(ctx, p.Endpoint+"/v2/enqueue", e, withProxy(p.ProxyURL), withCertPool(p.CertPool))
if err != nil {
return fmt.Errorf("failed sending event: %w", err)
}
// Send a change event for info events
if event.Severity == eventv1.EventSeverityInfo {
ce := toPagerDutyChangeEvent(event, p.RoutingKey)
err = postMessage(ctx, p.Endpoint+"/v2/change/enqueue", p.ProxyURL, p.CertPool, ce)
err = postMessage(ctx, p.Endpoint+"/v2/change/enqueue", ce, withProxy(p.ProxyURL), withCertPool(p.CertPool))
if err != nil {
return fmt.Errorf("failed sending change event: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/notifier/rocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *Rocket) Post(ctx context.Context, event eventv1.Event) error {

payload.Attachments = []SlackAttachment{a}

err := postMessage(ctx, s.URL, s.ProxyURL, s.CertPool, payload)
err := postMessage(ctx, s.URL, payload, withProxy(s.ProxyURL), withCertPool(s.CertPool))
if err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
Expand Down
Loading