diff --git a/authorizer/notification_endpoint.go b/authorizer/notification_endpoint.go index d4f1e077413..c5e4e56a69c 100644 --- a/authorizer/notification_endpoint.go +++ b/authorizer/notification_endpoint.go @@ -122,7 +122,7 @@ func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Cont } // DeleteNotificationEndpoint checks to see if the authorizer on context has write access to the notification endpoint provided. -func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) { +func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) ([]*influxdb.SecretField, influxdb.ID, error) { edp, err := s.FindNotificationEndpointByID(ctx, id) if err != nil { return nil, 0, err diff --git a/authorizer/notification_endpoint_test.go b/authorizer/notification_endpoint_test.go index a8c43a43922..8283d11482a 100644 --- a/authorizer/notification_endpoint_test.go +++ b/authorizer/notification_endpoint_test.go @@ -543,7 +543,7 @@ func TestNotificationEndpointService_DeleteNotificationEndpoint(t *testing.T) { }, }, nil }, - DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) { + DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) ([]*influxdb.SecretField, influxdb.ID, error) { return nil, 0, nil }, }, @@ -583,7 +583,7 @@ func TestNotificationEndpointService_DeleteNotificationEndpoint(t *testing.T) { }, }, nil }, - DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) { + DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) ([]*influxdb.SecretField, influxdb.ID, error) { return nil, 0, nil }, }, diff --git a/errors.go b/errors.go index c584ad6d47a..24a29366f2b 100644 --- a/errors.go +++ b/errors.go @@ -14,6 +14,7 @@ import ( // Any time this set of constants changes, you must also update the swagger for Error.properties.code.enum. const ( EInternal = "internal error" + EBadRequest = "bad request" ENotFound = "not found" EConflict = "conflict" // action cannot be performed EInvalid = "invalid" // validation failed diff --git a/go.sum b/go.sum index 62a22351eaa..be87a556694 100644 --- a/go.sum +++ b/go.sum @@ -271,6 +271,7 @@ github.com/jsternberg/zap-logfmt v1.2.0 h1:1v+PK4/B48cy8cfQbxL4FmmNZrjnIMr2BsnyE github.com/jsternberg/zap-logfmt v1.2.0/go.mod h1:kz+1CUmCutPWABnNkOu9hOHKdT2q3TDYCcsFy9hpqb0= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef h1:2jNeR4YUziVtswNP9sEFAI913cVrzH85T+8Q6LpYbT0= github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F6iIOGgxJ5npU/IUOhOhqlVrGjyIZc8/MagT0= diff --git a/http/errors.go b/http/errors.go index 102eea48684..5dde84e9d22 100644 --- a/http/errors.go +++ b/http/errors.go @@ -148,6 +148,7 @@ func InactiveUserError(ctx context.Context, h platform.HTTPErrorHandler, w http. var statusCodePlatformError = map[string]int{ platform.EInternal: http.StatusInternalServerError, platform.EInvalid: http.StatusBadRequest, + platform.EBadRequest: http.StatusBadRequest, platform.EUnprocessableEntity: http.StatusUnprocessableEntity, platform.EEmptyValue: http.StatusBadRequest, platform.EConflict: http.StatusUnprocessableEntity, diff --git a/http/notification_endpoint.go b/http/notification_endpoint.go index 350e57203c6..9f2e596e4d1 100644 --- a/http/notification_endpoint.go +++ b/http/notification_endpoint.go @@ -6,11 +6,16 @@ import ( "encoding/json" "fmt" "net/http" + "time" + "github.com/influxdata/flux" + "github.com/influxdata/flux/csv" + "github.com/influxdata/flux/lang" "github.com/influxdata/httprouter" "github.com/influxdata/influxdb" pctx "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/notification/endpoint" + "github.com/influxdata/influxdb/query" "go.uber.org/zap" ) @@ -26,6 +31,7 @@ type NotificationEndpointBackend struct { UserService influxdb.UserService OrganizationService influxdb.OrganizationService SecretService influxdb.SecretService + QueryService query.ProxyQueryService } // NewNotificationEndpointBackend returns a new instance of NotificationEndpointBackend. @@ -40,6 +46,7 @@ func NewNotificationEndpointBackend(b *APIBackend) *NotificationEndpointBackend UserService: b.UserService, OrganizationService: b.OrganizationService, SecretService: b.SecretService, + QueryService: b.FluxService, } } @@ -55,11 +62,14 @@ type NotificationEndpointHandler struct { UserService influxdb.UserService OrganizationService influxdb.OrganizationService SecretService influxdb.SecretService + QueryService query.ProxyQueryService } const ( notificationEndpointsPath = "/api/v2/notificationEndpoints" notificationEndpointsIDPath = "/api/v2/notificationEndpoints/:id" + notificationEndpointsIDTestPath = "/api/v2/notificationEndpoints/:id/test" + notificationEndpointsTestPath = "/api/v2/notificationEndpointsTest" notificationEndpointsIDMembersPath = "/api/v2/notificationEndpoints/:id/members" notificationEndpointsIDMembersIDPath = "/api/v2/notificationEndpoints/:id/members/:userID" notificationEndpointsIDOwnersPath = "/api/v2/notificationEndpoints/:id/owners" @@ -81,8 +91,10 @@ func NewNotificationEndpointHandler(b *NotificationEndpointBackend) *Notificatio UserService: b.UserService, OrganizationService: b.OrganizationService, SecretService: b.SecretService, + QueryService: b.QueryService, } h.HandlerFunc("POST", notificationEndpointsPath, h.handlePostNotificationEndpoint) + h.HandlerFunc("PUT", notificationEndpointsIDTestPath, h.handlePutNotificationEndpointTest) h.HandlerFunc("GET", notificationEndpointsPath, h.handleGetNotificationEndpoints) h.HandlerFunc("GET", notificationEndpointsIDPath, h.handleGetNotificationEndpoint) h.HandlerFunc("DELETE", notificationEndpointsIDPath, h.handleDeleteNotificationEndpoint) @@ -109,6 +121,7 @@ func NewNotificationEndpointHandler(b *NotificationEndpointBackend) *Notificatio UserResourceMappingService: b.UserResourceMappingService, UserService: b.UserService, } + h.HandlerFunc("POST", notificationEndpointsIDOwnersPath, newPostMemberHandler(ownerBackend)) h.HandlerFunc("GET", notificationEndpointsIDOwnersPath, newGetMembersHandler(ownerBackend)) h.HandlerFunc("DELETE", notificationEndpointsIDOwnersIDPath, newDeleteMemberHandler(ownerBackend)) @@ -229,7 +242,6 @@ func (h *NotificationEndpointHandler) handleGetNotificationEndpoints(w http.Resp h.HandleHTTPError(ctx, err, w) return } - h.Logger.Debug("notificationEndpoints retrieved", zap.String("notificationEndpoints", fmt.Sprint(edps))) if err := encodeResponse(ctx, w, http.StatusOK, newNotificationEndpointsResponse(ctx, edps, h.LabelService, filter, *opts)); err != nil { logEncodingError(h.Logger, r, err) @@ -249,7 +261,6 @@ func (h *NotificationEndpointHandler) handleGetNotificationEndpoint(w http.Respo h.HandleHTTPError(ctx, err, w) return } - h.Logger.Debug("notificationEndpoint retrieved", zap.String("notificationEndpoint", fmt.Sprint(edp))) labels, err := h.LabelService.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: edp.GetID()}) if err != nil { @@ -403,6 +414,134 @@ func decodePatchNotificationEndpointRequest(ctx context.Context, r *http.Request return req, nil } +// handlePostNotificationEndpointTest is the HTTP handler for the PUT /api/v2/notificationEndpoints/:id/test route. +func (h *NotificationEndpointHandler) handlePutNotificationEndpointTest(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + edp, err := decodePutNotificationEndpointRequest(ctx, r) + if err != nil { + h.Logger.Debug("failed to decode request", zap.Error(err)) + h.HandleHTTPError(ctx, err, w) + return + } + + auth, err := pctx.GetAuthorizer(ctx) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + var token *influxdb.Authorization + switch a := auth.(type) { + case *influxdb.Authorization: + token = a + case *influxdb.Session: + token = a.EphemeralAuth(edp.GetOrgID()) + default: + h.HandleHTTPError(ctx, influxdb.ErrAuthorizerNotSupported, w) + return + } + + for _, fld := range edp.SecretFields() { + if fld.Value == nil { + v, err := h.SecretService.LoadSecret(ctx, edp.GetOrgID(), fld.Key) + if err != nil { + h.HandleHTTPError(ctx, &influxdb.Error{ + Err: err, + }, w) + return + } + + fld.Value = &v + } + } + + q, err := edp.GenerateTestFlux() + if err != nil { + h.HandleHTTPError(ctx, err, w) + } + + compiler := lang.FluxCompiler{ + Now: time.Now(), + Query: q, + } + + req := query.Request{ + Compiler: compiler, + Authorization: token, + OrganizationID: edp.GetOrgID(), + } + + pr := &query.ProxyRequest{ + Request: req, + Dialect: csv.DefaultDialect(), + } + + b := bytes.NewBuffer(nil) + + if _, err := h.QueryService.Query(ctx, b, pr); err != nil { + h.Logger.Info("failed to execute query", zap.Error(err)) + h.HandleHTTPError(ctx, err, w) + return + } + + if err := encodeTestEndpointQueryResults(b, w); err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func encodeTestEndpointQueryResults(b *bytes.Buffer, w http.ResponseWriter) error { + dec := csv.NewResultDecoder(csv.ResultDecoderConfig{}) + + res, err := dec.Decode(b) + if err != nil { + return err + } + + if err := res.Tables().Do(func(t flux.Table) error { + return t.Do(func(r flux.ColReader) error { + cols := r.Cols() + idx := -1 + for i, col := range cols { + if col.Label == "_sent" { + idx = i + break + } + } + + if idx == -1 { + return &influxdb.Error{ + Msg: "failed to send message", + Code: influxdb.EBadRequest, + } + } + + arr := r.Strings(idx) + sent := false + for i := 0; i < arr.Len(); i++ { + if bytes.Equal(arr.Value(i), []byte("true")) { + sent = true + } + } + + if !sent { + return &influxdb.Error{ + Msg: "failed to send message", + Code: influxdb.EBadRequest, + } + } + + return nil + }) + }); err != nil { + return err + } + + return nil +} + // handlePostNotificationEndpoint is the HTTP handler for the POST /api/v2/notificationEndpoints route. func (h *NotificationEndpointHandler) handlePostNotificationEndpoint(w http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -438,8 +577,6 @@ func (h *NotificationEndpointHandler) handlePostNotificationEndpoint(w http.Resp labels := h.mapNewNotificationEndpointLabels(ctx, edp.NotificationEndpoint, edp.Labels) - h.Logger.Debug("notificationEndpoint created", zap.String("notificationEndpoint", fmt.Sprint(edp))) - if err := encodeResponse(ctx, w, http.StatusCreated, newNotificationEndpointResponse(edp, labels)); err != nil { logEncodingError(h.Logger, r, err) return @@ -516,7 +653,6 @@ func (h *NotificationEndpointHandler) handlePutNotificationEndpoint(w http.Respo h.HandleHTTPError(ctx, err, w) return } - h.Logger.Debug("notificationEndpoint replaced", zap.String("notificationEndpoint", fmt.Sprint(edp))) if err := encodeResponse(ctx, w, http.StatusOK, newNotificationEndpointResponse(edp, labels)); err != nil { logEncodingError(h.Logger, r, err) @@ -545,7 +681,6 @@ func (h *NotificationEndpointHandler) handlePatchNotificationEndpoint(w http.Res h.HandleHTTPError(ctx, err, w) return } - h.Logger.Debug("notificationEndpoint patch", zap.String("notificationEndpoint", fmt.Sprint(edp))) if err := encodeResponse(ctx, w, http.StatusOK, newNotificationEndpointResponse(edp, labels)); err != nil { logEncodingError(h.Logger, r, err) @@ -584,7 +719,6 @@ func (h *NotificationEndpointHandler) handleDeleteNotificationEndpoint(w http.Re }, w) return } - h.Logger.Debug("notificationEndpoint deleted", zap.String("notificationEndpointID", fmt.Sprint(i))) w.WriteHeader(http.StatusNoContent) } diff --git a/http/notification_endpoint_test.go b/http/notification_endpoint_test.go index 87013b2aefc..10498f87cf5 100644 --- a/http/notification_endpoint_test.go +++ b/http/notification_endpoint_test.go @@ -32,6 +32,7 @@ func NewMockNotificationEndpointBackend() *NotificationEndpointBackend { UserService: mock.NewUserService(), OrganizationService: mock.NewOrganizationService(), SecretService: mock.NewSecretService(), + QueryService: mock.NewQueryService(), } } @@ -141,7 +142,8 @@ func TestService_handleGetNotificationEndpoints(t *testing.T) { }, "name": "hello", "orgID": "50f7ba1150f7ba11", - "status": "active", + "status": "active", + "testChannel": "", "type": "slack", "token": "", "updatedAt": "0001-01-01T00:00:00Z", @@ -579,9 +581,9 @@ func TestService_handleDeleteNotificationEndpoint(t *testing.T) { }, }, NotificationEndpointService: &mock.NotificationEndpointService{ - DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) { + DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) ([]*influxdb.SecretField, influxdb.ID, error) { if id == influxTesting.MustIDBase16("020f755c3c082000") { - return []influxdb.SecretField{ + return []*influxdb.SecretField{ {Key: "k1"}, }, influxTesting.MustIDBase16("020f755c3c082001"), nil } @@ -604,7 +606,7 @@ func TestService_handleDeleteNotificationEndpoint(t *testing.T) { name: "notification endpoint not found", fields: fields{ NotificationEndpointService: &mock.NotificationEndpointService{ - DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) { + DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) ([]*influxdb.SecretField, influxdb.ID, error) { return nil, 0, &influxdb.Error{ Code: influxdb.ENotFound, Msg: "notification endpoint not found", @@ -741,6 +743,7 @@ func TestService_handlePatchNotificationEndpoint(t *testing.T) { "url": "http://example.com", "name": "example", "status": "active", + "testChannel": "", "type": "slack", "token": "", "labels": [] @@ -900,6 +903,7 @@ func TestService_handleUpdateNotificationEndpoint(t *testing.T) { "url": "example.com", "type": "slack", "status": "active", + "testChannel": "", "token": "", "labels": [] } @@ -1174,3 +1178,114 @@ func TestService_handlePostNotificationEndpointOwner(t *testing.T) { }) } } + +func TestService_handleTestNotificationEndpoint(t *testing.T) { + type fields struct { + Secrets map[string]string + SecretService influxdb.SecretService + NotificationEndpointService influxdb.NotificationEndpointService + OrganizationService influxdb.OrganizationService + } + type args struct { + endpoint interface{} + } + type wants struct { + statusCode int + } + + var secrets map[string]string + + tests := []struct { + name string + fields fields + args args + wants wants + }{ + { + name: "test a notification endpoint", + fields: fields{ + Secrets: map[string]string{}, + SecretService: &mock.SecretService{ + PutSecretFn: func(ctx context.Context, orgID influxdb.ID, k string, v string) error { + secrets[orgID.String()+"-"+k] = v + return nil + }, + }, + NotificationEndpointService: &mock.NotificationEndpointService{ + TestNotificationEndpointF: func(ctx context.Context, edp influxdb.NotificationEndpoint, userID influxdb.ID) error { + edp.SetID(influxTesting.MustIDBase16("020f755c3c082000")) + edp.BackfillSecretKeys() + return nil + }, + }, + OrganizationService: &mock.OrganizationService{ + FindOrganizationF: func(ctx context.Context, f influxdb.OrganizationFilter) (*influxdb.Organization, error) { + return &influxdb.Organization{ID: influxTesting.MustIDBase16("6f626f7274697320")}, nil + }, + }, + }, + args: args{ + endpoint: map[string]interface{}{ + "name": "hello", + "type": "http", + "orgID": "6f626f7274697320", + "description": "desc1", + "status": "active", + "url": "example.com", + "username": "user1", + "password": "password1", + "authMethod": "basic", + "method": "POST", + "contentTemplate": "template", + }, + }, + wants: wants{ + statusCode: http.StatusNoContent, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + secrets = tt.fields.Secrets + notificationEndpointBackend := NewMockNotificationEndpointBackend() + notificationEndpointBackend.NotificationEndpointService = tt.fields.NotificationEndpointService + notificationEndpointBackend.OrganizationService = tt.fields.OrganizationService + notificationEndpointBackend.SecretService = tt.fields.SecretService + querySvc := mock.NewQueryService() + querySvc.Response = `#group,false,false,true,true,false,false,false,false,false,false +#datatype,string,long,string,string,long,dateTime:RFC3339,string,string,long,string +#default,experimental-to6,,,,,,,,, +,result,table,_measurement,_sent,_status_timestamp,_time,id,name,retentionPeriod,retentionPolicy +,,0,notifications,true,,2019-11-22T22:10:44.028232Z,id1,telegraf,0,` + notificationEndpointBackend.QueryService = querySvc + h := NewNotificationEndpointHandler(notificationEndpointBackend) + + b, err := json.Marshal(tt.args.endpoint) + if err != nil { + t.Fatalf("failed to unmarshal endpoint: %v", err) + } + r := httptest.NewRequest("GET", "http://any.url?org=30", bytes.NewReader(b)) + c := context.WithValue( + context.Background(), + httprouter.ParamsKey, + httprouter.Params{ + { + Key: "id", + Value: "0000000000000001", + }, + }) + r = r.WithContext(pcontext.SetAuthorizer(c, &influxdb.Session{UserID: 1})) + + w := httptest.NewRecorder() + + h.handlePutNotificationEndpointTest(w, r) + + res := w.Result() + + if res.StatusCode != tt.wants.statusCode { + t.Errorf("%q. handleTestNotificationEndpoint() = %v, want %v", tt.name, res.StatusCode, tt.wants.statusCode) + } + }) + } +} diff --git a/http/swagger.yml b/http/swagger.yml index 636b9b94eea..4ccf381b893 100644 --- a/http/swagger.yml +++ b/http/swagger.yml @@ -5935,6 +5935,42 @@ paths: application/json: schema: $ref: "#/components/schemas/Error" + '/notificationEndpoints/{endpointID}/test': + put: + operationId: PutNotificationEndpointsIDTest + tags: + - NotificationEndpoints + summary: Tests an existing notification endpoint + requestBody: + description: A notification endpoint to test + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/NotificationEndpoint" + parameters: + - $ref: '#/components/parameters/TraceSpan' + - in: path + name: endpointID + schema: + type: string + required: true + description: The notification endpoint ID. + responses: + '204': + description: Endpoint test sent + '400': + description: The notification endpoint test was not sent + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + default: + description: Unexpected error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" '/notificationEndpoints/{endpointID}': get: operationId: GetNotificationEndpointsID @@ -6068,6 +6104,28 @@ paths: application/json: schema: $ref: "#/components/schemas/Error" + '/notificationEndpointsTest': + post: + operationId: NotificationEndpointTest + tags: + - NotificationEndpoints + summary: Test a connection to a notification endpoint + requestBody: + description: Notification endpoint to test + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/PostNotificationEndpoint" + responses: + '204': + description: Notification endpoint tested + default: + description: Unexpected error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" '/notificationEndpoints/{endpointID}/labels': get: operationId: GetNotificationEndpointsIDLabels @@ -10340,6 +10398,9 @@ components: token: description: Specifies the API token string. Specify either `URL` or `Token`. type: string + testChannel: + description: Specifies the channel to send test notification to Slack. + type: string PagerDutyNotificationEndpoint: type: object allOf: diff --git a/kv/notification_endpoint.go b/kv/notification_endpoint.go index 057e25b880f..ca4d234b016 100644 --- a/kv/notification_endpoint.go +++ b/kv/notification_endpoint.go @@ -131,6 +131,36 @@ func (s *Service) createNotificationEndpoint(ctx context.Context, tx Tx, edp inf return s.createUserResourceMapping(ctx, tx, urm) } +// TestNotificationEndpoint tests a notification endpoint +func (s *Service) TestNotificationEndpoint(ctx context.Context, edp influxdb.NotificationEndpoint, userID influxdb.ID) error { + return s.kv.Update(ctx, func(tx Tx) error { + return s.testNotificationEndpoint(ctx, tx, edp, userID) + }) +} + +func (s *Service) testNotificationEndpoint(ctx context.Context, tx Tx, edp influxdb.NotificationEndpoint, userID influxdb.ID) error { + if edp.GetOrgID().Valid() { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + if _, err := s.findOrganizationByID(ctx, tx, edp.GetOrgID()); err != nil { + return err + } + } + + // notification endpoint name unique + if _, err := s.findNotificationEndpointByName(ctx, tx, edp.GetOrgID(), edp.GetName()); err == nil { + if err == nil { + return &influxdb.Error{ + Code: influxdb.EConflict, + Msg: fmt.Sprintf("notification endpoint with name %s already exists", edp.GetName()), + } + } + } + + return nil +} + func (s *Service) findNotificationEndpointByName(ctx context.Context, tx Tx, orgID influxdb.ID, n string) (influxdb.NotificationEndpoint, error) { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -502,7 +532,7 @@ func filterNotificationEndpointsFn(idMap map[influxdb.ID]bool, filter influxdb.N } // DeleteNotificationEndpoint removes a notification endpoint by ID. -func (s *Service) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) (flds []influxdb.SecretField, orgID influxdb.ID, err error) { +func (s *Service) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) (flds []*influxdb.SecretField, orgID influxdb.ID, err error) { err = s.kv.Update(ctx, func(tx Tx) error { flds, orgID, err = s.deleteNotificationEndpoint(ctx, tx, id) return err @@ -510,7 +540,7 @@ func (s *Service) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID return flds, orgID, err } -func (s *Service) deleteNotificationEndpoint(ctx context.Context, tx Tx, id influxdb.ID) (flds []influxdb.SecretField, orgID influxdb.ID, err error) { +func (s *Service) deleteNotificationEndpoint(ctx context.Context, tx Tx, id influxdb.ID) (flds []*influxdb.SecretField, orgID influxdb.ID, err error) { edp, encID, bucket, err := s.findNotificationEndpointByID(ctx, tx, id) if err != nil { return nil, 0, err diff --git a/mock/notification_endpoint_service.go b/mock/notification_endpoint_service.go index a8aeb7f52ca..f9dd725f823 100644 --- a/mock/notification_endpoint_service.go +++ b/mock/notification_endpoint_service.go @@ -17,7 +17,8 @@ type NotificationEndpointService struct { CreateNotificationEndpointF func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error UpdateNotificationEndpointF func(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) PatchNotificationEndpointF func(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) - DeleteNotificationEndpointF func(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) + DeleteNotificationEndpointF func(ctx context.Context, id influxdb.ID) ([]*influxdb.SecretField, influxdb.ID, error) + TestNotificationEndpointF func(ctx context.Context, nr influxdb.NotificationEndpoint, id influxdb.ID) error } // FindNotificationEndpointByID returns a single telegraf config by ID. @@ -49,6 +50,11 @@ func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Cont } // DeleteNotificationEndpoint removes a notification rule by ID. -func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) { +func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) ([]*influxdb.SecretField, influxdb.ID, error) { return s.DeleteNotificationEndpointF(ctx, id) } + +// TestNotificationEndpoint tests the an endpoints connection to a 3rd party service i.e. Slack, PagerDuty etc. +func (s *NotificationEndpointService) TestNotificationEndpoint(ctx context.Context, ne influxdb.NotificationEndpoint, id influxdb.ID) error { + return s.TestNotificationEndpointF(ctx, ne, id) +} diff --git a/mock/query.go b/mock/query.go new file mode 100644 index 00000000000..6de73fd47c2 --- /dev/null +++ b/mock/query.go @@ -0,0 +1,49 @@ +package mock + +import ( + "context" + "io" + + "github.com/influxdata/flux" + "github.com/influxdata/influxdb/kit/check" + "github.com/influxdata/influxdb/query" +) + +// ProxyQueryService performs queries and encodes the result into a writer. +// The results are opaque to a ProxyQueryService. +type ProxyQueryService struct { + CheckFn func(ctx context.Context) check.Response + + // Query performs the requested query and encodes the results into w. + // The number of bytes written to w is returned __independent__ of any error. + QueryFn func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) + + // Value that will be written to the io writer that QueryFn is provided. + Response string +} + +// NewQueryService returns a mock ProxyQueryService +func NewQueryService() *ProxyQueryService { + return &ProxyQueryService{ + CheckFn: func(ctx context.Context) check.Response { + return check.Response{} + }, + QueryFn: func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) { + return flux.Statistics{}, nil + }, + } +} + +// Check indicates a service whose health can be checked. +func (s *ProxyQueryService) Check(ctx context.Context) check.Response { + return s.CheckFn(ctx) +} + +// Query submits a query for execution returning a results iterator. +// Cancel must be called on any returned results to free resources. +func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) { + if _, err := w.Write([]byte(s.Response)); err != nil { + return flux.Statistics{}, nil + } + return s.QueryFn(ctx, w, req) +} diff --git a/notification/endpoint/http.go b/notification/endpoint/http.go index bfb530cf8f6..988e24ca2a5 100644 --- a/notification/endpoint/http.go +++ b/notification/endpoint/http.go @@ -7,7 +7,9 @@ import ( "net/http" "net/url" + "github.com/influxdata/flux/ast" "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/notification/flux" ) var _ influxdb.NotificationEndpoint = &HTTP{} @@ -33,6 +35,170 @@ type HTTP struct { ContentTemplate string `json:"contentTemplate"` } +// GenerateTestFlux generates the flux that tests PagerDuty endpoints +func (h *HTTP) GenerateTestFlux() (string, error) { + f, err := h.GenerateTestFluxAST() + if err != nil { + return "", err + } + + // used string interpolation here because flux's ast.Format returns malformed + // multi-line strings 😿 + data := `#group,false,false,false,false,true,false,false +#datatype,string,long,string,string,string,string,long +#default,_result,,,,,, +,result,table,name,id,_measurement,retentionPolicy,retentionPeriod +,,0,telegraf,id1,m1,,0` + + return fmt.Sprintf(ast.Format(f), data), nil +} + +// GenerateTestFluxAST generates a flux AST for the slack notification rule. +func (h *HTTP) GenerateTestFluxAST() (*ast.Package, error) { + f := flux.File( + h.Name, + h.imports(), + h.generateTestFluxASTBody(), + ) + + return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil +} + +func (h *HTTP) imports() []*ast.ImportDeclaration { + packages := []string{ + "influxdata/influxdb/monitor", + "http", + "csv", + "json", + } + + if h.AuthMethod == "bearer" || h.AuthMethod == "basic" { + packages = append(packages, "influxdata/influxdb/secrets") + } + + return flux.Imports(packages...) +} + +func (h *HTTP) generateTestFluxASTBody() []ast.Statement { + var statements []ast.Statement + statements = append(statements, h.generateFluxASTHeaders()) + statements = append(statements, h.generateFluxASTEndpoint()) + statements = append(statements, h.generateFluxASTCSVTable()...) + statements = append(statements, h.generateFluxASTNotifyPipe()) + + return statements +} + +func (h *HTTP) generateFluxASTHeaders() ast.Statement { + props := []*ast.Property{ + flux.Dictionary("Content-Type", flux.String("application/json")), + } + + switch h.AuthMethod { + case "bearer": + token := flux.Call( + flux.Member("secrets", "get"), + flux.Object( + flux.Property("key", flux.String(h.Token.Key)), + ), + ) + bearer := flux.Add( + flux.String("Bearer "), + token, + ) + auth := flux.Dictionary("Authorization", bearer) + props = append(props, auth) + case "basic": + username := flux.Call( + flux.Member("secrets", "get"), + flux.Object( + flux.Property("key", flux.String(h.Username.Key)), + ), + ) + passwd := flux.Call( + flux.Member("secrets", "get"), + flux.Object( + flux.Property("key", flux.String(h.Password.Key)), + ), + ) + + basic := flux.Call( + flux.Member("http", "basicAuth"), + flux.Object( + flux.Property("u", username), + flux.Property("p", passwd), + ), + ) + + auth := flux.Dictionary("Authorization", basic) + props = append(props, auth) + } + + return flux.DefineVariable("headers", flux.Object(props...)) +} + +func (h *HTTP) generateFluxASTEndpoint() ast.Statement { + props := []*ast.Property{} + props = append(props, flux.Property("url", flux.String(h.URL))) + call := flux.Call(flux.Member("http", "endpoint"), flux.Object(props...)) + + return flux.DefineVariable("http_endpoint", call) +} + +func (h *HTTP) generateFluxASTCSVTable() []ast.Statement { + props := []*ast.Property{} + // used string interpolation here because flux's ast.Format returns + // malformed multi-line strings 😿 + data := flux.DefineVariable("data", flux.String("%s")) + props = append(props, flux.Property("csv", flux.Identifier("data"))) + call := flux.Call(flux.Member("csv", "from"), flux.Object(props...)) + table := flux.DefineVariable("csvTable", call) + + return []ast.Statement{ + data, + table, + } +} + +func (s *HTTP) generateFluxASTNotifyPipe() ast.Statement { + endpointBody := flux.Call( + flux.Member("json", "encode"), + flux.Object(flux.Property("v", flux.Identifier("body"))), + ) + headers := flux.Property("headers", flux.Identifier("headers")) + + endpointProps := []*ast.Property{ + headers, + flux.Property("data", endpointBody), + } + endpointFn := flux.FuncBlock(flux.FunctionParams("r"), + s.generateBody(), + &ast.ReturnStatement{ + Argument: flux.Object(endpointProps...), + }, + ) + + props := []*ast.Property{} + props = append(props, flux.Property("endpoint", + flux.Call(flux.Identifier("http_endpoint"), flux.Object(flux.Property("mapFn", endpointFn))))) + + call := flux.Call(flux.Member("monitor", "notify"), flux.Object(props...)) + + return flux.ExpressionStatement(flux.Pipe(flux.Identifier("csvTable"), call)) +} + +func (s *HTTP) generateBody() ast.Statement { + // {r with "_version": 1} + props := []*ast.Property{ + flux.Property( + "_version", flux.Integer(1), + ), + } + + body := flux.ObjectWith("r", props...) + return flux.DefineVariable("body", body) +} + // BackfillSecretKeys fill back fill the secret field key during the unmarshalling // if value of that secret field is not nil. func (s *HTTP) BackfillSecretKeys() { @@ -48,16 +214,16 @@ func (s *HTTP) BackfillSecretKeys() { } // SecretFields return available secret fields. -func (s HTTP) SecretFields() []influxdb.SecretField { - arr := make([]influxdb.SecretField, 0) +func (s *HTTP) SecretFields() []*influxdb.SecretField { + arr := make([]*influxdb.SecretField, 0) if s.Token.Key != "" { - arr = append(arr, s.Token) + arr = append(arr, &s.Token) } if s.Username.Key != "" { - arr = append(arr, s.Username) + arr = append(arr, &s.Username) } if s.Password.Key != "" { - arr = append(arr, s.Password) + arr = append(arr, &s.Password) } return arr } diff --git a/notification/endpoint/http_test.go b/notification/endpoint/http_test.go new file mode 100644 index 00000000000..7b6d6e99d0e --- /dev/null +++ b/notification/endpoint/http_test.go @@ -0,0 +1,149 @@ +package endpoint_test + +import ( + "testing" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/notification/endpoint" +) + +func TestHTTP_GenerateFlux(t *testing.T) { + want := `package main +// http_test +import "influxdata/influxdb/monitor" +import "http" +import "csv" +import "json" + +headers = {"Content-Type": "application/json"} +http_endpoint = http.endpoint(url: "http://localhost:7777") +data = "#group,false,false,false,false,true,false,false +#datatype,string,long,string,string,string,string,long +#default,_result,,,,,, +,result,table,name,id,_measurement,retentionPolicy,retentionPeriod +,,0,telegraf,id1,m1,,0" +csvTable = csv.from(csv: data) + +csvTable + |> monitor.notify(endpoint: http_endpoint(mapFn: (r) => { + body = {r with _version: 1} + + return {headers: headers, data: json.encode(v: body)} + }))` + + ne := &endpoint.HTTP{ + Base: endpoint.Base{ + Name: "http_test", + }, + URL: "http://localhost:7777", + } + + got, err := ne.GenerateTestFlux() + + if err != nil { + t.Fatal(err) + } + + if got != want { + t.Errorf("\nExpected:\n\n%s\n\nGot:\n\n%s\n\n", want, got) + } +} + +func TestHTTP_GenerateFlux_basicAuth(t *testing.T) { + want := `package main +// http_test_basic_auth +import "influxdata/influxdb/monitor" +import "http" +import "csv" +import "json" +import "influxdata/influxdb/secrets" + +headers = {"Content-Type": "application/json", "Authorization": http.basicAuth(u: secrets.get(key: "000000000000000e-username"), p: secrets.get(key: "000000000000000e-password"))} +http_endpoint = http.endpoint(url: "http://localhost:7777") +data = "#group,false,false,false,false,true,false,false +#datatype,string,long,string,string,string,string,long +#default,_result,,,,,, +,result,table,name,id,_measurement,retentionPolicy,retentionPeriod +,,0,telegraf,id1,m1,,0" +csvTable = csv.from(csv: data) + +csvTable + |> monitor.notify(endpoint: http_endpoint(mapFn: (r) => { + body = {r with _version: 1} + + return {headers: headers, data: json.encode(v: body)} + }))` + + ne := &endpoint.HTTP{ + Base: endpoint.Base{ + ID: 2, + Name: "http_test_basic_auth", + }, + URL: "http://localhost:7777", + AuthMethod: "basic", + Username: influxdb.SecretField{ + Key: "000000000000000e-username", + }, + Password: influxdb.SecretField{ + Key: "000000000000000e-password", + }, + } + + got, err := ne.GenerateTestFlux() + + if err != nil { + t.Fatal(err) + } + + if got != want { + t.Errorf("\nExpected:\n\n%s\n\nGot:\n\n%s\n\n", want, got) + } +} + +func TestHTTP_GenerateFlux_bearer(t *testing.T) { + want := `package main +// http_test_bearer +import "influxdata/influxdb/monitor" +import "http" +import "csv" +import "json" +import "influxdata/influxdb/secrets" + +headers = {"Content-Type": "application/json", "Authorization": "Bearer " + secrets.get(key: "000000000000000e-token")} +http_endpoint = http.endpoint(url: "http://localhost:7777") +data = "#group,false,false,false,false,true,false,false +#datatype,string,long,string,string,string,string,long +#default,_result,,,,,, +,result,table,name,id,_measurement,retentionPolicy,retentionPeriod +,,0,telegraf,id1,m1,,0" +csvTable = csv.from(csv: data) + +csvTable + |> monitor.notify(endpoint: http_endpoint(mapFn: (r) => { + body = {r with _version: 1} + + return {headers: headers, data: json.encode(v: body)} + }))` + + ne := &endpoint.HTTP{ + Base: endpoint.Base{ + ID: 2, + Name: "http_test_bearer", + }, + URL: "http://localhost:7777", + AuthMethod: "bearer", + Token: influxdb.SecretField{ + Key: "000000000000000e-token", + }, + } + + got, err := ne.GenerateTestFlux() + + if err != nil { + t.Fatal(err) + } + + if got != want { + t.Errorf("\nExpected:\n\n%s\n\nGot:\n\n%s\n\n", want, got) + } +} diff --git a/notification/endpoint/pagerduty.go b/notification/endpoint/pagerduty.go index ca304e46646..63287cd3ea0 100644 --- a/notification/endpoint/pagerduty.go +++ b/notification/endpoint/pagerduty.go @@ -2,8 +2,11 @@ package endpoint import ( "encoding/json" + "fmt" + "github.com/influxdata/flux/ast" "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/notification/flux" ) var _ influxdb.NotificationEndpoint = &PagerDuty{} @@ -20,6 +23,116 @@ type PagerDuty struct { RoutingKey influxdb.SecretField `json:"routingKey"` } +// GenerateTestFlux generates a flux script for the pagerduty notification rule. +func (p *PagerDuty) GenerateTestFlux() (string, error) { + f, err := p.GenerateFluxAST() + if err != nil { + return "", err + } + + // used string interpolation here because flux's ast.Format returns malformed + // multi-line strings 😿 + data := `#group,false,false,false,false,true,false,false +#datatype,string,long,string,string,string,string,long +#default,_result,,,,,, +,result,table,name,id,_measurement,retentionPolicy,retentionPeriod +,,0,telegraf,id1,m1,,0` + + return fmt.Sprintf(ast.Format(f), data), nil +} + +// GenerateFluxAST generates a flux AST for the pagerduty notification rule. +func (p *PagerDuty) GenerateFluxAST() (*ast.Package, error) { + f := flux.File( + p.Name, + flux.Imports("influxdata/influxdb/monitor", "pagerduty", "csv", "influxdata/influxdb/secrets"), + p.generateFluxASTBody(), + ) + return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil +} + +func (p *PagerDuty) generateFluxASTBody() []ast.Statement { + var statements []ast.Statement + statements = append(statements, p.generateFluxASTEndpoint()) + statements = append(statements, p.generateFluxASTCSVTable()...) + statements = append(statements, p.generateFluxASTNotifyPipe()) + + return statements +} + +func (p *PagerDuty) generateFluxASTEndpoint() ast.Statement { + call := flux.Call(flux.Member("pagerduty", "endpoint"), + flux.Object(), + ) + + return flux.DefineVariable("pagerduty_endpoint", call) +} + +func (p *PagerDuty) generateFluxASTCSVTable() []ast.Statement { + props := []*ast.Property{} + // used string interpolation here because flux's ast.Format returns + // malformed multi-line strings 😿 + data := flux.DefineVariable("data", flux.String("%s")) + props = append(props, flux.Property("csv", flux.Identifier("data"))) + call := flux.Call(flux.Member("csv", "from"), flux.Object(props...)) + table := flux.DefineVariable("csvTable", call) + + return []ast.Statement{ + data, + table, + } +} + +func (p *PagerDuty) generateFluxASTNotifyPipe() ast.Statement { + endpointProps := []*ast.Property{} + + // routing_key: + // required + // string + // A version 4 UUID expressed as a 32-digit hexadecimal number. This is the Integration Key for an integration on any given service. + endpointProps = append(endpointProps, flux.Property("routingKey", flux.String(*p.RoutingKey.Value))) + + // clientURL + // optional + // string + // url of the client sending the alert. + endpointProps = append(endpointProps, flux.Property("clientURL", flux.String(p.ClientURL))) + + // severity: + // required + // string + // The perceived severity of the status the event is describing with respect to the affected system. This can be critical, error, warning or info. + endpointProps = append(endpointProps, flux.Property("severity", flux.String("info"))) + + // event_action: + // required + // string trigger + // The type of event. Can be trigger, acknowledge or resolve. See Event Action. + endpointProps = append(endpointProps, flux.Property("eventAction", flux.String("trigger"))) + + // source: + // required + // string + // The unique location of the affected system, preferably a hostname or FQDN + endpointProps = append(endpointProps, flux.Property("source", flux.String("pager_duty_test"))) + + // summary: + // required + // string + // A brief text summary of the event, used to generate the summaries/titles of any associated alerts. The maximum permitted length of this property is 1024 characters. + endpointProps = append(endpointProps, flux.Property("summary", flux.String("PagerDuty connection successful"))) + + endpointFn := flux.Function(flux.FunctionParams("r"), flux.Object(endpointProps...)) + + props := []*ast.Property{} + props = append(props, flux.Property("endpoint", + flux.Call(flux.Identifier("pagerduty_endpoint"), flux.Object(flux.Property("mapFn", endpointFn))))) + + call := flux.Call(flux.Member("monitor", "notify"), flux.Object(props...)) + + return flux.ExpressionStatement(flux.Pipe(flux.Identifier("csvTable"), call)) +} + // BackfillSecretKeys fill back fill the secret field key during the unmarshalling // if value of that secret field is not nil. func (s *PagerDuty) BackfillSecretKeys() { @@ -29,9 +142,9 @@ func (s *PagerDuty) BackfillSecretKeys() { } // SecretFields return available secret fields. -func (s PagerDuty) SecretFields() []influxdb.SecretField { - return []influxdb.SecretField{ - s.RoutingKey, +func (s *PagerDuty) SecretFields() []*influxdb.SecretField { + return []*influxdb.SecretField{ + &s.RoutingKey, } } diff --git a/notification/endpoint/pagerduty_test.go b/notification/endpoint/pagerduty_test.go new file mode 100644 index 00000000000..b39444641aa --- /dev/null +++ b/notification/endpoint/pagerduty_test.go @@ -0,0 +1,61 @@ +package endpoint_test + +import ( + "testing" + + "github.com/andreyvit/diff" + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/notification/endpoint" +) + +func TestPagerDuty_GenerateFlux(t *testing.T) { + want := `package main +// pager_duty_test +import "influxdata/influxdb/monitor" +import "pagerduty" +import "csv" +import "influxdata/influxdb/secrets" + +pagerduty_endpoint = pagerduty.endpoint() +data = "#group,false,false,false,false,true,false,false +#datatype,string,long,string,string,string,string,long +#default,_result,,,,,, +,result,table,name,id,_measurement,retentionPolicy,retentionPeriod +,,0,telegraf,id1,m1,,0" +csvTable = csv.from(csv: data) + +csvTable + |> monitor.notify(endpoint: pagerduty_endpoint(mapFn: (r) => + ({ + routingKey: "pagerduty_token", + clientURL: "dummyURL", + severity: "info", + eventAction: "trigger", + source: "pager_duty_test", + summary: "PagerDuty connection successful", + })))` + + e := &endpoint.PagerDuty{ + Base: endpoint.Base{ + ID: 2, + Name: "pager_duty_test", + }, + ClientURL: "dummyURL", + RoutingKey: influxdb.SecretField{ + Key: "pagerduty_token", + }, + } + + value := "pagerduty_token" + e.RoutingKey.Value = &value + + got, err := e.GenerateTestFlux() + + if err != nil { + t.Fatal(err) + } + + if got != want { + t.Errorf("\n\nStrings do not match:\n\n%s", diff.LineDiff(got, want)) + } +} diff --git a/notification/endpoint/slack.go b/notification/endpoint/slack.go index 2dbf99b4469..f9d8866f73a 100644 --- a/notification/endpoint/slack.go +++ b/notification/endpoint/slack.go @@ -5,6 +5,9 @@ import ( "fmt" "net/url" + "github.com/influxdata/flux/ast" + "github.com/influxdata/influxdb/notification/flux" + "github.com/influxdata/influxdb" ) @@ -21,6 +24,86 @@ type Slack struct { URL string `json:"url"` // Token is the bearer token for authorization Token influxdb.SecretField `json:"token"` + // TestChannel is the channel to test the connection between flux and slack + TestChannel string `json:"testChannel"` +} + +// GenerateTestFlux generates a flux script for the slack notification rule. +func (s *Slack) GenerateTestFlux() (string, error) { + f, err := s.GenerateTestFluxAST() + if err != nil { + return "", err + } + + // used string interpolation here because flux's ast.Format returns malformed + // multi-line strings 😿 + data := `#group,false,false,false,false,true,false,false +#datatype,string,long,string,string,string,string,long +#default,_result,,,,,, +,result,table,name,id,_measurement,retentionPolicy,retentionPeriod +,,0,telegraf,id1,m1,,0` + + return fmt.Sprintf(ast.Format(f), data), nil +} + +// GenerateTestFluxAST generates a flux AST for the slack notification rule. +func (s *Slack) GenerateTestFluxAST() (*ast.Package, error) { + f := flux.File( + s.Name, + flux.Imports("influxdata/influxdb/monitor", "slack", "csv"), + s.generateTestFluxASTBody(), + ) + return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil +} + +func (s *Slack) generateTestFluxASTBody() []ast.Statement { + var statements []ast.Statement + statements = append(statements, s.generateFluxASTEndpoint()) + statements = append(statements, s.generateFluxASTCSVTable()...) + statements = append(statements, s.generateFluxASTNotifyPipe()) + + return statements +} + +func (s *Slack) generateFluxASTEndpoint() ast.Statement { + props := []*ast.Property{} + props = append(props, flux.Property("url", flux.String(s.URL))) + call := flux.Call(flux.Member("slack", "endpoint"), flux.Object(props...)) + + return flux.DefineVariable("slack_endpoint", call) +} + +func (s *Slack) generateFluxASTCSVTable() []ast.Statement { + props := []*ast.Property{} + // used string interpolation here because flux's ast.Format returns + // malformed multi-line strings 😿 + data := flux.DefineVariable("data", flux.String("%s")) + props = append(props, flux.Property("csv", flux.Identifier("data"))) + call := flux.Call(flux.Member("csv", "from"), flux.Object(props...)) + table := flux.DefineVariable("csvTable", call) + + return []ast.Statement{ + data, + table, + } +} + +func (s *Slack) generateFluxASTNotifyPipe() ast.Statement { + endpointProps := []*ast.Property{} + messageText := fmt.Sprintf("Your endpoint named %q is working", s.Name) + endpointProps = append(endpointProps, flux.Property("channel", flux.String(s.TestChannel))) + endpointProps = append(endpointProps, flux.Property("text", flux.String(messageText))) + endpointProps = append(endpointProps, flux.Property("color", flux.String("good"))) + endpointFn := flux.Function(flux.FunctionParams("r"), flux.Object(endpointProps...)) + + props := []*ast.Property{} + props = append(props, flux.Property("data", flux.Object())) + props = append(props, flux.Property("endpoint", + flux.Call(flux.Identifier("slack_endpoint"), flux.Object(flux.Property("mapFn", endpointFn))))) + + call := flux.Call(flux.Member("monitor", "notify"), flux.Object(props...)) + + return flux.ExpressionStatement(flux.Pipe(flux.Identifier("csvTable"), call)) } // BackfillSecretKeys fill back fill the secret field key during the unmarshalling @@ -32,10 +115,10 @@ func (s *Slack) BackfillSecretKeys() { } // SecretFields return available secret fields. -func (s Slack) SecretFields() []influxdb.SecretField { - arr := []influxdb.SecretField{} +func (s *Slack) SecretFields() []*influxdb.SecretField { + arr := []*influxdb.SecretField{} if s.Token.Key != "" { - arr = append(arr, s.Token) + arr = append(arr, &s.Token) } return arr } diff --git a/notification/endpoint/slack_test.go b/notification/endpoint/slack_test.go new file mode 100644 index 00000000000..9f31918b6ac --- /dev/null +++ b/notification/endpoint/slack_test.go @@ -0,0 +1,46 @@ +package endpoint_test + +import ( + "testing" + + "github.com/andreyvit/diff" + "github.com/influxdata/influxdb/notification/endpoint" +) + +func TestSlack_GenerateFlux(t *testing.T) { + want := `package main +// test +import "influxdata/influxdb/monitor" +import "slack" +import "csv" + +slack_endpoint = slack.endpoint(url: "dummyURL") +data = "#group,false,false,false,false,true,false,false +#datatype,string,long,string,string,string,string,long +#default,_result,,,,,, +,result,table,name,id,_measurement,retentionPolicy,retentionPeriod +,,0,telegraf,id1,m1,,0" +csvTable = csv.from(csv: data) + +csvTable + |> monitor.notify(data: {}, endpoint: slack_endpoint(mapFn: (r) => + ({channel: "slack-test", text: "Your endpoint named \"test\" is working", color: "good"})))` + + ne := &endpoint.Slack{ + Base: endpoint.Base{ + Name: "test", + }, + URL: "dummyURL", + TestChannel: "slack-test", + } + + got, err := ne.GenerateTestFlux() + + if err != nil { + t.Fatal(err) + } + + if got != want { + t.Errorf("\n\nStrings do not match:\n\n%s", diff.LineDiff(got, want)) + } +} diff --git a/notification_endpoint.go b/notification_endpoint.go index f73eac1f25d..f0d82dec034 100644 --- a/notification_endpoint.go +++ b/notification_endpoint.go @@ -31,10 +31,11 @@ type NotificationEndpoint interface { GetDescription() string GetStatus() Status // SecretFields return available secret fields. - SecretFields() []SecretField + SecretFields() []*SecretField // BackfillSecretKeys fill back fill the secret field key during the unmarshalling // if value of that secret field is not nil. BackfillSecretKeys() + GenerateTestFlux() (string, error) } // ops for checks error @@ -45,6 +46,7 @@ var ( OpCreateNotificationEndpoint = "CreateNotificationEndpoint" OpUpdateNotificationEndpoint = "UpdateNotificationEndpoint" OpDeleteNotificationEndpoint = "DeleteNotificationEndpoint" + OpTestNotificationEndpoint = "TestNotificationEndpoint" ) // NotificationEndpointFilter represents a set of filter that restrict the returned notification endpoints. @@ -129,5 +131,5 @@ type NotificationEndpointService interface { PatchNotificationEndpoint(ctx context.Context, id ID, upd NotificationEndpointUpdate) (NotificationEndpoint, error) // DeleteNotificationEndpoint removes a notification endpoint by ID, returns secret fields, orgID for further deletion. - DeleteNotificationEndpoint(ctx context.Context, id ID) (flds []SecretField, orgID ID, err error) + DeleteNotificationEndpoint(ctx context.Context, id ID) (flds []*SecretField, orgID ID, err error) } diff --git a/testing/notification_endpoint.go b/testing/notification_endpoint.go index a056684baec..bd359ce0918 100644 --- a/testing/notification_endpoint.go +++ b/testing/notification_endpoint.go @@ -1690,7 +1690,7 @@ func DeleteNotificationEndpoint( type wants struct { notificationEndpoints []influxdb.NotificationEndpoint userResourceMappings []*influxdb.UserResourceMapping - secretFlds []influxdb.SecretField + secretFlds []*influxdb.SecretField orgID influxdb.ID err error } @@ -1960,7 +1960,7 @@ func DeleteNotificationEndpoint( userID: MustIDBase16(sixID), }, wants: wants{ - secretFlds: []influxdb.SecretField{ + secretFlds: []*influxdb.SecretField{ {Key: twoID + "-routing-key"}, }, orgID: MustIDBase16(fourID), diff --git a/ui/cypress/e2e/notificationEndpoints.test.ts b/ui/cypress/e2e/notificationEndpoints.test.ts index f6ea7c0317b..9c88bb35152 100644 --- a/ui/cypress/e2e/notificationEndpoints.test.ts +++ b/ui/cypress/e2e/notificationEndpoints.test.ts @@ -90,6 +90,8 @@ describe('Notification Endpoints', () => { cy.getByTestID('endpoint-save--button').click() + cy.getByTestID('notification-success').should('have.length', 1) + cy.getByTestID(`endpoint-card ${name}`).should('exist') cy.getByTestID('endpoint--overlay').should('not.be.visible') }) diff --git a/ui/src/alerting/actions/notifications/endpoints.ts b/ui/src/alerting/actions/notifications/endpoints.ts index 1e935070fab..7f0989422c2 100644 --- a/ui/src/alerting/actions/notifications/endpoints.ts +++ b/ui/src/alerting/actions/notifications/endpoints.ts @@ -1,5 +1,6 @@ // Libraries import {Dispatch} from 'react' +import {ThunkAction} from 'redux-thunk' // Actions import { @@ -79,7 +80,9 @@ export const getEndpoints = () => async ( } } -export const createEndpoint = (endpoint: NotificationEndpoint) => async ( +export const createEndpoint = ( + endpoint: NotificationEndpoint +): ThunkAction, GetState> => async ( dispatch: Dispatch< Action | NotificationAction | ReturnType > @@ -101,10 +104,47 @@ export const createEndpoint = (endpoint: NotificationEndpoint) => async ( type: 'SET_ENDPOINT', endpoint: resp.data, }) + + dispatch(notify(copy.createEndpointSuccess())) dispatch(checkEndpointsLimits()) } -export const updateEndpoint = (endpoint: NotificationEndpoint) => async ( +export const testExistingEndpoint = async (endpoint: NotificationEndpoint) => { + const labels = endpoint.labels || [] + + const data = { + ...endpoint, + labels: labels.map(l => l.id), + } as PostNotificationEndpoint + + const resp = await api.putNotificationEndpointsTest({ + endpointID: endpoint.id, + data, + }) + + if (resp.status !== 204) { + throw new Error('Test unsuccessful. Check endpoint parameters') + } +} + +export const testEndpoint = (endpoint: NotificationEndpoint) => async () => { + const labels = endpoint.labels || [] + + const data = { + ...endpoint, + labels: labels.map(l => l.id), + } as PostNotificationEndpoint + + const resp = await api.postNotificationEndpointsTest({data}) + + if (resp.status !== 204) { + throw new Error(resp.data.message) + } +} + +export const updateEndpoint = ( + endpoint: NotificationEndpoint +): ThunkAction, GetState> => async ( dispatch: Dispatch ) => { const resp = await api.putNotificationEndpoint({ diff --git a/ui/src/alerting/components/endpoints/EditEndpointOverlay.tsx b/ui/src/alerting/components/endpoints/EditEndpointOverlay.tsx index 96bb7f05a40..f167960ddf9 100644 --- a/ui/src/alerting/components/endpoints/EditEndpointOverlay.tsx +++ b/ui/src/alerting/components/endpoints/EditEndpointOverlay.tsx @@ -7,7 +7,10 @@ import {withRouter, WithRouterProps} from 'react-router' import {getEndpointFailed} from 'src/shared/copy/notifications' // Actions -import {updateEndpoint} from 'src/alerting/actions/notifications/endpoints' +import { + updateEndpoint, + testExistingEndpoint, +} from 'src/alerting/actions/notifications/endpoints' import {notify} from 'src/shared/actions/notifications' // Components @@ -21,6 +24,7 @@ import {NotificationEndpoint, AppState} from 'src/types' interface DispatchProps { onUpdateEndpoint: typeof updateEndpoint onNotify: typeof notify + onTestEndpoint: typeof testExistingEndpoint } interface StateProps { @@ -32,6 +36,7 @@ type Props = WithRouterProps & DispatchProps & StateProps const EditEndpointOverlay: FC = ({ params, router, + onTestEndpoint, onUpdateEndpoint, onNotify, endpoint, @@ -46,12 +51,16 @@ const EditEndpointOverlay: FC = ({ return null } - const handleEditEndpoint = (endpoint: NotificationEndpoint) => { - onUpdateEndpoint(endpoint) + const handleEditEndpoint = async (endpoint: NotificationEndpoint) => { + await onUpdateEndpoint(endpoint) handleDismiss() } + const handleTest = async (endpoint: NotificationEndpoint) => { + await onTestEndpoint(endpoint) + } + return ( @@ -62,6 +71,7 @@ const EditEndpointOverlay: FC = ({ /> = ({ const mdtp = { onUpdateEndpoint: updateEndpoint, onNotify: notify, + onTestEndpoint: testExistingEndpoint, } const mstp = ({endpoints}: AppState, {params}: Props): StateProps => { diff --git a/ui/src/alerting/components/endpoints/EndpointOptions.tsx b/ui/src/alerting/components/endpoints/EndpointOptions.tsx index fedcb54b40a..b41960bc93d 100644 --- a/ui/src/alerting/components/endpoints/EndpointOptions.tsx +++ b/ui/src/alerting/components/endpoints/EndpointOptions.tsx @@ -27,8 +27,14 @@ const EndpointOptions: FC = ({ }) => { switch (endpoint.type) { case 'slack': { - const {url} = endpoint as SlackNotificationEndpoint - return + const {url, testChannel} = endpoint as SlackNotificationEndpoint + return ( + + ) } case 'pagerduty': { const {clientURL, routingKey} = endpoint as PagerDutyNotificationEndpoint diff --git a/ui/src/alerting/components/endpoints/EndpointOptionsSlack.tsx b/ui/src/alerting/components/endpoints/EndpointOptionsSlack.tsx index a215dd27353..f506a2dadd9 100644 --- a/ui/src/alerting/components/endpoints/EndpointOptionsSlack.tsx +++ b/ui/src/alerting/components/endpoints/EndpointOptionsSlack.tsx @@ -6,10 +6,12 @@ import {Input, FormElement, Panel, Grid, Columns} from '@influxdata/clockface' interface Props { url: string + testChannel: string onChange: (e: ChangeEvent) => void } -const EndpointOptionsSlack: FC = ({url, onChange}) => { +const EndpointOptionsSlack: FC = ({url, testChannel, onChange}) => { + const {pathname} = window.location return ( @@ -27,6 +29,16 @@ const EndpointOptionsSlack: FC = ({url, onChange}) => { onChange={onChange} /> + {pathname.includes('/edit') && ( + + + + )} diff --git a/ui/src/alerting/components/endpoints/EndpointOverlayContents.tsx b/ui/src/alerting/components/endpoints/EndpointOverlayContents.tsx index ee716631b25..693eb428c31 100644 --- a/ui/src/alerting/components/endpoints/EndpointOverlayContents.tsx +++ b/ui/src/alerting/components/endpoints/EndpointOverlayContents.tsx @@ -15,7 +15,9 @@ import { } from '@influxdata/clockface' import EndpointOptions from 'src/alerting/components/endpoints/EndpointOptions' import EndpointTypeDropdown from 'src/alerting/components/endpoints/EndpointTypeDropdown' -import EndpointOverlayFooter from 'src/alerting/components/endpoints/EndpointOverlayFooter' +import EndpointOverlayFooter, { + Message, +} from 'src/alerting/components/endpoints/EndpointOverlayFooter' // Hooks import {useEndpointReducer} from './EndpointOverlayProvider' @@ -24,18 +26,20 @@ import {useEndpointReducer} from './EndpointOverlayProvider' import {NotificationEndpointType, NotificationEndpoint} from 'src/types' interface Props { - onSave: (endpoint: NotificationEndpoint) => void - onCancel: () => void saveButtonText: string + onSave: (endpoint: NotificationEndpoint) => Promise + onTest: (endpoint: NotificationEndpoint) => Promise + onCancel: () => void } const EndpointOverlayContents: FC = ({ onSave, + onTest, saveButtonText, onCancel, }) => { const [endpoint, dispatch] = useEndpointReducer() - const [errorMessage, setErrorMessage] = useState(null) + const [message, setMessage] = useState(null) const handleChange = ( e: ChangeEvent @@ -111,13 +115,21 @@ const EndpointOverlayContents: FC = ({ minHeight: '43px', }} > - {errorMessage && ( + {message && ( - {errorMessage} + {message.text} )} @@ -126,9 +138,10 @@ const EndpointOverlayContents: FC = ({ ) diff --git a/ui/src/alerting/components/endpoints/EndpointOverlayFooter.tsx b/ui/src/alerting/components/endpoints/EndpointOverlayFooter.tsx index aa7415c6dc8..0b75a80349a 100644 --- a/ui/src/alerting/components/endpoints/EndpointOverlayFooter.tsx +++ b/ui/src/alerting/components/endpoints/EndpointOverlayFooter.tsx @@ -12,39 +12,71 @@ import { // Hooks import {useEndpointState} from './EndpointOverlayProvider' +// API +import {testExistingEndpoint} from 'src/alerting/actions/notifications/endpoints' + // Types import {NotificationEndpoint, RemoteDataState} from 'src/types' -interface Props { +export interface Message { + text: string + remoteState: RemoteDataState +} +interface OwnProps { saveButtonText: string - onSave: (endpoint: NotificationEndpoint) => void + onSave: (endpoint: NotificationEndpoint) => Promise + onTest: (endpoint: NotificationEndpoint) => Promise onCancel: () => void - onSetErrorMessage: (error: string) => void + onSetMessage: (message: Message) => void } +type Props = OwnProps + const EndpointOverlayFooter: FC = ({ saveButtonText, onSave, onCancel, - onSetErrorMessage, + onSetMessage, }) => { const endpoint = useEndpointState() + const {pathname} = window.location const [saveStatus, setSaveStatus] = useState(RemoteDataState.NotStarted) + const [testStatus, setTestStatus] = useState(RemoteDataState.NotStarted) - const handleSave = () => { + const handleSave = async () => { if (saveStatus === RemoteDataState.Loading) { return } try { setSaveStatus(RemoteDataState.Loading) - onSetErrorMessage(null) + onSetMessage(null) - onSave(endpoint) + await onSave(endpoint) } catch (e) { setSaveStatus(RemoteDataState.Error) - onSetErrorMessage(e.message) + onSetMessage({remoteState: RemoteDataState.Error, text: e.message}) + } + } + + const handleTest = async () => { + if (testStatus === RemoteDataState.Loading) { + return + } + + try { + setTestStatus(RemoteDataState.Loading) + onSetMessage(null) + await testExistingEndpoint(endpoint) + setTestStatus(RemoteDataState.Done) + onSetMessage({ + text: `Test sent to ${endpoint.type}`, + remoteState: RemoteDataState.Done, + }) + } catch (e) { + onSetMessage({text: e.message, remoteState: RemoteDataState.Error}) + setTestStatus(RemoteDataState.Error) } } @@ -53,6 +85,11 @@ const EndpointOverlayFooter: FC = ({ ? ComponentStatus.Loading : ComponentStatus.Default + const testButtonStatus = + testStatus === RemoteDataState.Loading + ? ComponentStatus.Loading + : ComponentStatus.Default + return (