Skip to content

feat(ui): test notification endpoints #16031

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion authorizer/notification_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Cont
}

// DeleteNotificationEndpoint checks to see if the authorizer on context has write access to the notification endpoint provided.
func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) {
func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) ([]*influxdb.SecretField, influxdb.ID, error) {
edp, err := s.FindNotificationEndpointByID(ctx, id)
if err != nil {
return nil, 0, err
Expand Down
4 changes: 2 additions & 2 deletions authorizer/notification_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func TestNotificationEndpointService_DeleteNotificationEndpoint(t *testing.T) {
},
}, nil
},
DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) {
DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) ([]*influxdb.SecretField, influxdb.ID, error) {
return nil, 0, nil
},
},
Expand Down Expand Up @@ -583,7 +583,7 @@ func TestNotificationEndpointService_DeleteNotificationEndpoint(t *testing.T) {
},
}, nil
},
DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) {
DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) ([]*influxdb.SecretField, influxdb.ID, error) {
return nil, 0, nil
},
},
Expand Down
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// Any time this set of constants changes, you must also update the swagger for Error.properties.code.enum.
const (
EInternal = "internal error"
EBadRequest = "bad request"
ENotFound = "not found"
EConflict = "conflict" // action cannot be performed
EInvalid = "invalid" // validation failed
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ github.com/jsternberg/zap-logfmt v1.2.0 h1:1v+PK4/B48cy8cfQbxL4FmmNZrjnIMr2BsnyE
github.com/jsternberg/zap-logfmt v1.2.0/go.mod h1:kz+1CUmCutPWABnNkOu9hOHKdT2q3TDYCcsFy9hpqb0=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef h1:2jNeR4YUziVtswNP9sEFAI913cVrzH85T+8Q6LpYbT0=
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F6iIOGgxJ5npU/IUOhOhqlVrGjyIZc8/MagT0=
Expand Down
1 change: 1 addition & 0 deletions http/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func InactiveUserError(ctx context.Context, h platform.HTTPErrorHandler, w http.
var statusCodePlatformError = map[string]int{
platform.EInternal: http.StatusInternalServerError,
platform.EInvalid: http.StatusBadRequest,
platform.EBadRequest: http.StatusBadRequest,
platform.EUnprocessableEntity: http.StatusUnprocessableEntity,
platform.EEmptyValue: http.StatusBadRequest,
platform.EConflict: http.StatusUnprocessableEntity,
Expand Down
148 changes: 141 additions & 7 deletions http/notification_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/lang"
"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb"
pctx "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/notification/endpoint"
"github.com/influxdata/influxdb/query"
"go.uber.org/zap"
)

Expand All @@ -26,6 +31,7 @@ type NotificationEndpointBackend struct {
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
SecretService influxdb.SecretService
QueryService query.ProxyQueryService
}

// NewNotificationEndpointBackend returns a new instance of NotificationEndpointBackend.
Expand All @@ -40,6 +46,7 @@ func NewNotificationEndpointBackend(b *APIBackend) *NotificationEndpointBackend
UserService: b.UserService,
OrganizationService: b.OrganizationService,
SecretService: b.SecretService,
QueryService: b.FluxService,
}
}

Expand All @@ -55,11 +62,14 @@ type NotificationEndpointHandler struct {
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
SecretService influxdb.SecretService
QueryService query.ProxyQueryService
}

const (
notificationEndpointsPath = "/api/v2/notificationEndpoints"
notificationEndpointsIDPath = "/api/v2/notificationEndpoints/:id"
notificationEndpointsIDTestPath = "/api/v2/notificationEndpoints/:id/test"
notificationEndpointsTestPath = "/api/v2/notificationEndpointsTest"
notificationEndpointsIDMembersPath = "/api/v2/notificationEndpoints/:id/members"
notificationEndpointsIDMembersIDPath = "/api/v2/notificationEndpoints/:id/members/:userID"
notificationEndpointsIDOwnersPath = "/api/v2/notificationEndpoints/:id/owners"
Expand All @@ -81,8 +91,10 @@ func NewNotificationEndpointHandler(b *NotificationEndpointBackend) *Notificatio
UserService: b.UserService,
OrganizationService: b.OrganizationService,
SecretService: b.SecretService,
QueryService: b.QueryService,
}
h.HandlerFunc("POST", notificationEndpointsPath, h.handlePostNotificationEndpoint)
h.HandlerFunc("PUT", notificationEndpointsIDTestPath, h.handlePutNotificationEndpointTest)
h.HandlerFunc("GET", notificationEndpointsPath, h.handleGetNotificationEndpoints)
h.HandlerFunc("GET", notificationEndpointsIDPath, h.handleGetNotificationEndpoint)
h.HandlerFunc("DELETE", notificationEndpointsIDPath, h.handleDeleteNotificationEndpoint)
Expand All @@ -109,6 +121,7 @@ func NewNotificationEndpointHandler(b *NotificationEndpointBackend) *Notificatio
UserResourceMappingService: b.UserResourceMappingService,
UserService: b.UserService,
}

h.HandlerFunc("POST", notificationEndpointsIDOwnersPath, newPostMemberHandler(ownerBackend))
h.HandlerFunc("GET", notificationEndpointsIDOwnersPath, newGetMembersHandler(ownerBackend))
h.HandlerFunc("DELETE", notificationEndpointsIDOwnersIDPath, newDeleteMemberHandler(ownerBackend))
Expand Down Expand Up @@ -229,7 +242,6 @@ func (h *NotificationEndpointHandler) handleGetNotificationEndpoints(w http.Resp
h.HandleHTTPError(ctx, err, w)
return
}
h.Logger.Debug("notificationEndpoints retrieved", zap.String("notificationEndpoints", fmt.Sprint(edps)))

if err := encodeResponse(ctx, w, http.StatusOK, newNotificationEndpointsResponse(ctx, edps, h.LabelService, filter, *opts)); err != nil {
logEncodingError(h.Logger, r, err)
Expand All @@ -249,7 +261,6 @@ func (h *NotificationEndpointHandler) handleGetNotificationEndpoint(w http.Respo
h.HandleHTTPError(ctx, err, w)
return
}
h.Logger.Debug("notificationEndpoint retrieved", zap.String("notificationEndpoint", fmt.Sprint(edp)))

labels, err := h.LabelService.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: edp.GetID()})
if err != nil {
Expand Down Expand Up @@ -403,6 +414,134 @@ func decodePatchNotificationEndpointRequest(ctx context.Context, r *http.Request
return req, nil
}

// handlePostNotificationEndpointTest is the HTTP handler for the PUT /api/v2/notificationEndpoints/:id/test route.
func (h *NotificationEndpointHandler) handlePutNotificationEndpointTest(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
edp, err := decodePutNotificationEndpointRequest(ctx, r)
if err != nil {
h.Logger.Debug("failed to decode request", zap.Error(err))
h.HandleHTTPError(ctx, err, w)
return
}

auth, err := pctx.GetAuthorizer(ctx)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}

var token *influxdb.Authorization
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks too hacky. @influxdata/compute-team has cleaned this up already for task. We should make sure the query proxy server can handle the context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kelwang this is what is currently done in the /api/v2/query endpoint. I agree it's hacky and that we need a long term solution, but I'm not sure exactly what that looks like.

switch a := auth.(type) {
case *influxdb.Authorization:
token = a
case *influxdb.Session:
token = a.EphemeralAuth(edp.GetOrgID())
default:
h.HandleHTTPError(ctx, influxdb.ErrAuthorizerNotSupported, w)
return
}

for _, fld := range edp.SecretFields() {
if fld.Value == nil {
v, err := h.SecretService.LoadSecret(ctx, edp.GetOrgID(), fld.Key)
if err != nil {
h.HandleHTTPError(ctx, &influxdb.Error{
Err: err,
}, w)
return
}

fld.Value = &v
}
}

q, err := edp.GenerateTestFlux()
if err != nil {
h.HandleHTTPError(ctx, err, w)
}

compiler := lang.FluxCompiler{
Now: time.Now(),
Query: q,
}

req := query.Request{
Compiler: compiler,
Authorization: token,
OrganizationID: edp.GetOrgID(),
}

pr := &query.ProxyRequest{
Request: req,
Dialect: csv.DefaultDialect(),
}

b := bytes.NewBuffer(nil)

if _, err := h.QueryService.Query(ctx, b, pr); err != nil {
h.Logger.Info("failed to execute query", zap.Error(err))
h.HandleHTTPError(ctx, err, w)
return
}

if err := encodeTestEndpointQueryResults(b, w); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}

w.WriteHeader(http.StatusNoContent)
}

func encodeTestEndpointQueryResults(b *bytes.Buffer, w http.ResponseWriter) error {
dec := csv.NewResultDecoder(csv.ResultDecoderConfig{})

res, err := dec.Decode(b)
if err != nil {
return err
}

if err := res.Tables().Do(func(t flux.Table) error {
return t.Do(func(r flux.ColReader) error {
cols := r.Cols()
idx := -1
for i, col := range cols {
if col.Label == "_sent" {
idx = i
break
}
}

if idx == -1 {
return &influxdb.Error{
Msg: "failed to send message",
Code: influxdb.EBadRequest,
}
}

arr := r.Strings(idx)
sent := false
for i := 0; i < arr.Len(); i++ {
if bytes.Equal(arr.Value(i), []byte("true")) {
sent = true
}
}

if !sent {
return &influxdb.Error{
Msg: "failed to send message",
Code: influxdb.EBadRequest,
}
}

return nil
})
}); err != nil {
return err
}

return nil
}

// handlePostNotificationEndpoint is the HTTP handler for the POST /api/v2/notificationEndpoints route.
func (h *NotificationEndpointHandler) handlePostNotificationEndpoint(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
Expand Down Expand Up @@ -438,8 +577,6 @@ func (h *NotificationEndpointHandler) handlePostNotificationEndpoint(w http.Resp

labels := h.mapNewNotificationEndpointLabels(ctx, edp.NotificationEndpoint, edp.Labels)

h.Logger.Debug("notificationEndpoint created", zap.String("notificationEndpoint", fmt.Sprint(edp)))

if err := encodeResponse(ctx, w, http.StatusCreated, newNotificationEndpointResponse(edp, labels)); err != nil {
logEncodingError(h.Logger, r, err)
return
Expand Down Expand Up @@ -516,7 +653,6 @@ func (h *NotificationEndpointHandler) handlePutNotificationEndpoint(w http.Respo
h.HandleHTTPError(ctx, err, w)
return
}
h.Logger.Debug("notificationEndpoint replaced", zap.String("notificationEndpoint", fmt.Sprint(edp)))

if err := encodeResponse(ctx, w, http.StatusOK, newNotificationEndpointResponse(edp, labels)); err != nil {
logEncodingError(h.Logger, r, err)
Expand Down Expand Up @@ -545,7 +681,6 @@ func (h *NotificationEndpointHandler) handlePatchNotificationEndpoint(w http.Res
h.HandleHTTPError(ctx, err, w)
return
}
h.Logger.Debug("notificationEndpoint patch", zap.String("notificationEndpoint", fmt.Sprint(edp)))

if err := encodeResponse(ctx, w, http.StatusOK, newNotificationEndpointResponse(edp, labels)); err != nil {
logEncodingError(h.Logger, r, err)
Expand Down Expand Up @@ -584,7 +719,6 @@ func (h *NotificationEndpointHandler) handleDeleteNotificationEndpoint(w http.Re
}, w)
return
}
h.Logger.Debug("notificationEndpoint deleted", zap.String("notificationEndpointID", fmt.Sprint(i)))

w.WriteHeader(http.StatusNoContent)
}
Loading