@@ -6,11 +6,16 @@ import (
6
6
"encoding/json"
7
7
"fmt"
8
8
"net/http"
9
+ "time"
9
10
11
+ "github.com/influxdata/flux"
12
+ "github.com/influxdata/flux/csv"
13
+ "github.com/influxdata/flux/lang"
10
14
"github.com/influxdata/httprouter"
11
15
"github.com/influxdata/influxdb"
12
16
pctx "github.com/influxdata/influxdb/context"
13
17
"github.com/influxdata/influxdb/notification/endpoint"
18
+ "github.com/influxdata/influxdb/query"
14
19
"go.uber.org/zap"
15
20
)
16
21
@@ -26,6 +31,7 @@ type NotificationEndpointBackend struct {
26
31
UserService influxdb.UserService
27
32
OrganizationService influxdb.OrganizationService
28
33
SecretService influxdb.SecretService
34
+ QueryService query.ProxyQueryService
29
35
}
30
36
31
37
// NewNotificationEndpointBackend returns a new instance of NotificationEndpointBackend.
@@ -40,6 +46,7 @@ func NewNotificationEndpointBackend(b *APIBackend) *NotificationEndpointBackend
40
46
UserService : b .UserService ,
41
47
OrganizationService : b .OrganizationService ,
42
48
SecretService : b .SecretService ,
49
+ QueryService : b .FluxService ,
43
50
}
44
51
}
45
52
@@ -55,11 +62,14 @@ type NotificationEndpointHandler struct {
55
62
UserService influxdb.UserService
56
63
OrganizationService influxdb.OrganizationService
57
64
SecretService influxdb.SecretService
65
+ QueryService query.ProxyQueryService
58
66
}
59
67
60
68
const (
61
69
notificationEndpointsPath = "/api/v2/notificationEndpoints"
62
70
notificationEndpointsIDPath = "/api/v2/notificationEndpoints/:id"
71
+ notificationEndpointsIDTestPath = "/api/v2/notificationEndpoints/:id/test"
72
+ notificationEndpointsTestPath = "/api/v2/notificationEndpointsTest"
63
73
notificationEndpointsIDMembersPath = "/api/v2/notificationEndpoints/:id/members"
64
74
notificationEndpointsIDMembersIDPath = "/api/v2/notificationEndpoints/:id/members/:userID"
65
75
notificationEndpointsIDOwnersPath = "/api/v2/notificationEndpoints/:id/owners"
@@ -81,8 +91,10 @@ func NewNotificationEndpointHandler(b *NotificationEndpointBackend) *Notificatio
81
91
UserService : b .UserService ,
82
92
OrganizationService : b .OrganizationService ,
83
93
SecretService : b .SecretService ,
94
+ QueryService : b .QueryService ,
84
95
}
85
96
h .HandlerFunc ("POST" , notificationEndpointsPath , h .handlePostNotificationEndpoint )
97
+ h .HandlerFunc ("PUT" , notificationEndpointsIDTestPath , h .handlePutNotificationEndpointTest )
86
98
h .HandlerFunc ("GET" , notificationEndpointsPath , h .handleGetNotificationEndpoints )
87
99
h .HandlerFunc ("GET" , notificationEndpointsIDPath , h .handleGetNotificationEndpoint )
88
100
h .HandlerFunc ("DELETE" , notificationEndpointsIDPath , h .handleDeleteNotificationEndpoint )
@@ -109,6 +121,7 @@ func NewNotificationEndpointHandler(b *NotificationEndpointBackend) *Notificatio
109
121
UserResourceMappingService : b .UserResourceMappingService ,
110
122
UserService : b .UserService ,
111
123
}
124
+
112
125
h .HandlerFunc ("POST" , notificationEndpointsIDOwnersPath , newPostMemberHandler (ownerBackend ))
113
126
h .HandlerFunc ("GET" , notificationEndpointsIDOwnersPath , newGetMembersHandler (ownerBackend ))
114
127
h .HandlerFunc ("DELETE" , notificationEndpointsIDOwnersIDPath , newDeleteMemberHandler (ownerBackend ))
@@ -229,7 +242,6 @@ func (h *NotificationEndpointHandler) handleGetNotificationEndpoints(w http.Resp
229
242
h .HandleHTTPError (ctx , err , w )
230
243
return
231
244
}
232
- h .Logger .Debug ("notificationEndpoints retrieved" , zap .String ("notificationEndpoints" , fmt .Sprint (edps )))
233
245
234
246
if err := encodeResponse (ctx , w , http .StatusOK , newNotificationEndpointsResponse (ctx , edps , h .LabelService , filter , * opts )); err != nil {
235
247
logEncodingError (h .Logger , r , err )
@@ -249,7 +261,6 @@ func (h *NotificationEndpointHandler) handleGetNotificationEndpoint(w http.Respo
249
261
h .HandleHTTPError (ctx , err , w )
250
262
return
251
263
}
252
- h .Logger .Debug ("notificationEndpoint retrieved" , zap .String ("notificationEndpoint" , fmt .Sprint (edp )))
253
264
254
265
labels , err := h .LabelService .FindResourceLabels (ctx , influxdb.LabelMappingFilter {ResourceID : edp .GetID ()})
255
266
if err != nil {
@@ -403,6 +414,134 @@ func decodePatchNotificationEndpointRequest(ctx context.Context, r *http.Request
403
414
return req , nil
404
415
}
405
416
417
+ // handlePostNotificationEndpointTest is the HTTP handler for the PUT /api/v2/notificationEndpoints/:id/test route.
418
+ func (h * NotificationEndpointHandler ) handlePutNotificationEndpointTest (w http.ResponseWriter , r * http.Request ) {
419
+ ctx := r .Context ()
420
+ edp , err := decodePutNotificationEndpointRequest (ctx , r )
421
+ if err != nil {
422
+ h .Logger .Debug ("failed to decode request" , zap .Error (err ))
423
+ h .HandleHTTPError (ctx , err , w )
424
+ return
425
+ }
426
+
427
+ auth , err := pctx .GetAuthorizer (ctx )
428
+ if err != nil {
429
+ h .HandleHTTPError (ctx , err , w )
430
+ return
431
+ }
432
+
433
+ var token * influxdb.Authorization
434
+ switch a := auth .(type ) {
435
+ case * influxdb.Authorization :
436
+ token = a
437
+ case * influxdb.Session :
438
+ token = a .EphemeralAuth (edp .GetOrgID ())
439
+ default :
440
+ h .HandleHTTPError (ctx , influxdb .ErrAuthorizerNotSupported , w )
441
+ return
442
+ }
443
+
444
+ for _ , fld := range edp .SecretFields () {
445
+ if fld .Value == nil {
446
+ v , err := h .SecretService .LoadSecret (ctx , edp .GetOrgID (), fld .Key )
447
+ if err != nil {
448
+ h .HandleHTTPError (ctx , & influxdb.Error {
449
+ Err : err ,
450
+ }, w )
451
+ return
452
+ }
453
+
454
+ fld .Value = & v
455
+ }
456
+ }
457
+
458
+ q , err := edp .GenerateTestFlux ()
459
+ if err != nil {
460
+ h .HandleHTTPError (ctx , err , w )
461
+ }
462
+
463
+ compiler := lang.FluxCompiler {
464
+ Now : time .Now (),
465
+ Query : q ,
466
+ }
467
+
468
+ req := query.Request {
469
+ Compiler : compiler ,
470
+ Authorization : token ,
471
+ OrganizationID : edp .GetOrgID (),
472
+ }
473
+
474
+ pr := & query.ProxyRequest {
475
+ Request : req ,
476
+ Dialect : csv .DefaultDialect (),
477
+ }
478
+
479
+ b := bytes .NewBuffer (nil )
480
+
481
+ if _ , err := h .QueryService .Query (ctx , b , pr ); err != nil {
482
+ h .Logger .Info ("failed to execute query" , zap .Error (err ))
483
+ h .HandleHTTPError (ctx , err , w )
484
+ return
485
+ }
486
+
487
+ if err := encodeTestEndpointQueryResults (b , w ); err != nil {
488
+ h .HandleHTTPError (ctx , err , w )
489
+ return
490
+ }
491
+
492
+ w .WriteHeader (http .StatusNoContent )
493
+ }
494
+
495
+ func encodeTestEndpointQueryResults (b * bytes.Buffer , w http.ResponseWriter ) error {
496
+ dec := csv .NewResultDecoder (csv.ResultDecoderConfig {})
497
+
498
+ res , err := dec .Decode (b )
499
+ if err != nil {
500
+ return err
501
+ }
502
+
503
+ if err := res .Tables ().Do (func (t flux.Table ) error {
504
+ return t .Do (func (r flux.ColReader ) error {
505
+ cols := r .Cols ()
506
+ idx := - 1
507
+ for i , col := range cols {
508
+ if col .Label == "_sent" {
509
+ idx = i
510
+ break
511
+ }
512
+ }
513
+
514
+ if idx == - 1 {
515
+ return & influxdb.Error {
516
+ Msg : "failed to send message" ,
517
+ Code : influxdb .EBadRequest ,
518
+ }
519
+ }
520
+
521
+ arr := r .Strings (idx )
522
+ sent := false
523
+ for i := 0 ; i < arr .Len (); i ++ {
524
+ if bytes .Equal (arr .Value (i ), []byte ("true" )) {
525
+ sent = true
526
+ }
527
+ }
528
+
529
+ if ! sent {
530
+ return & influxdb.Error {
531
+ Msg : "failed to send message" ,
532
+ Code : influxdb .EBadRequest ,
533
+ }
534
+ }
535
+
536
+ return nil
537
+ })
538
+ }); err != nil {
539
+ return err
540
+ }
541
+
542
+ return nil
543
+ }
544
+
406
545
// handlePostNotificationEndpoint is the HTTP handler for the POST /api/v2/notificationEndpoints route.
407
546
func (h * NotificationEndpointHandler ) handlePostNotificationEndpoint (w http.ResponseWriter , r * http.Request ) {
408
547
ctx := r .Context ()
@@ -438,8 +577,6 @@ func (h *NotificationEndpointHandler) handlePostNotificationEndpoint(w http.Resp
438
577
439
578
labels := h .mapNewNotificationEndpointLabels (ctx , edp .NotificationEndpoint , edp .Labels )
440
579
441
- h .Logger .Debug ("notificationEndpoint created" , zap .String ("notificationEndpoint" , fmt .Sprint (edp )))
442
-
443
580
if err := encodeResponse (ctx , w , http .StatusCreated , newNotificationEndpointResponse (edp , labels )); err != nil {
444
581
logEncodingError (h .Logger , r , err )
445
582
return
@@ -516,7 +653,6 @@ func (h *NotificationEndpointHandler) handlePutNotificationEndpoint(w http.Respo
516
653
h .HandleHTTPError (ctx , err , w )
517
654
return
518
655
}
519
- h .Logger .Debug ("notificationEndpoint replaced" , zap .String ("notificationEndpoint" , fmt .Sprint (edp )))
520
656
521
657
if err := encodeResponse (ctx , w , http .StatusOK , newNotificationEndpointResponse (edp , labels )); err != nil {
522
658
logEncodingError (h .Logger , r , err )
@@ -545,7 +681,6 @@ func (h *NotificationEndpointHandler) handlePatchNotificationEndpoint(w http.Res
545
681
h .HandleHTTPError (ctx , err , w )
546
682
return
547
683
}
548
- h .Logger .Debug ("notificationEndpoint patch" , zap .String ("notificationEndpoint" , fmt .Sprint (edp )))
549
684
550
685
if err := encodeResponse (ctx , w , http .StatusOK , newNotificationEndpointResponse (edp , labels )); err != nil {
551
686
logEncodingError (h .Logger , r , err )
@@ -584,7 +719,6 @@ func (h *NotificationEndpointHandler) handleDeleteNotificationEndpoint(w http.Re
584
719
}, w )
585
720
return
586
721
}
587
- h .Logger .Debug ("notificationEndpoint deleted" , zap .String ("notificationEndpointID" , fmt .Sprint (i )))
588
722
589
723
w .WriteHeader (http .StatusNoContent )
590
724
}
0 commit comments