Skip to content

Commit df73ddb

Browse files
Add metrics which can be used to track our Notification Queue
1 parent 060a32b commit df73ddb

File tree

6 files changed

+407
-0
lines changed

6 files changed

+407
-0
lines changed

backend/Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ clean:
1818
test:
1919
go test ./...
2020
21+
test-integration:
22+
go test -v -tags integration
23+
2124
lint:
2225
go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.60.1
2326
golangci-lint run --timeout 5m

backend/pkg/api/api_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//go:build integration
2+
13
package api_test
24

35
import (

backend/pkg/commons/metrics/metrics.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,22 @@ var (
7878
Name: "counter",
7979
Help: "Generic counter of events with name in labels",
8080
}, []string{"name"})
81+
NotificationsQueue_Event_Size = promauto.NewGaugeVec(prometheus.GaugeOpts{
82+
Name: "notifications_queue_event_pending_size",
83+
Help: "Number of pending notifications in the queue by event type",
84+
}, []string{"event_type", "status"})
85+
NotificationsQueue_Channel_Size = promauto.NewGaugeVec(prometheus.GaugeOpts{
86+
Name: "notifications_queue_channel_pending_size",
87+
Help: "Number of pending notifications in the queue by channel",
88+
}, []string{"channel", "status"})
89+
NotificationsQueue_Pending_Time = promauto.NewHistogramVec(prometheus.HistogramOpts{
90+
Name: "notifications_queue_pending_duration_milliseconds",
91+
Help: "Duration of still pending notifications in the queue",
92+
}, []string{"channel", "event_type"})
93+
NotificationsQueue_Sent_Time = promauto.NewHistogramVec(prometheus.HistogramOpts{
94+
Name: "notifications_queue_sent_duration_milliseconds",
95+
Help: "Amount of time notification took to be successfully sent",
96+
}, []string{"channel", "event_type"})
8197
)
8298

8399
func init() {

backend/pkg/consapi/client_node_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//go:build integration
2+
13
package consapi_test
24

35
import (

backend/pkg/notification/sending.go

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"fmt"
88
"io"
9+
"math"
910
"net/http"
1011
"net/url"
1112
"strings"
@@ -62,6 +63,9 @@ func notificationSender() {
6263
log.Error(err, "error dispatching notifications", 0)
6364
}
6465

66+
// Record metrics related to Notification Queue like size of queue and duration of pending notifications
67+
collectNotificationQueueMetrics()
68+
6569
err = garbageCollectNotificationQueue()
6670
if err != nil {
6771
log.Error(err, "error garbage collecting notification queue", 0)
@@ -528,3 +532,188 @@ func SendTestWebhookNotification(ctx context.Context, userId types.UserId, webho
528532
}
529533
return nil
530534
}
535+
536+
type Notification struct {
537+
Id *uint64 `db:"id"`
538+
Created *time.Time `db:"created"`
539+
Sent *time.Time `db:"sent"`
540+
Channel string `db:"channel"`
541+
Content string `db:"content"`
542+
}
543+
544+
type NotificationStatus string
545+
546+
const (
547+
Sent NotificationStatus = "sent"
548+
Pending NotificationStatus = "pending"
549+
550+
// Metrics label for EventType where the event could not be mapped
551+
UnknownEvent string = "unknown_event"
552+
)
553+
554+
/**
555+
* Get all notifications which were marked as sent.
556+
*/
557+
func GetSentNotifications() ([]Notification, error) {
558+
notificationRecords := []Notification{}
559+
560+
err := db.ReaderDb.Select(&notificationRecords,
561+
`SELECT id, created, sent, channel, content
562+
FROM notification_queue
563+
WHERE sent IS NOT NULL`)
564+
if err != nil {
565+
return nil, fmt.Errorf("error querying sent notifications: %w", err)
566+
}
567+
568+
return notificationRecords, nil
569+
}
570+
571+
/**
572+
* Get all notifications which have not yet been sent.
573+
*/
574+
func GetPendingNotifications() ([]Notification, error) {
575+
notificationRecords := []Notification{}
576+
577+
err := db.ReaderDb.Select(&notificationRecords,
578+
`SELECT id, created, sent, channel, content
579+
FROM notification_queue
580+
WHERE sent IS NULL`)
581+
if err != nil {
582+
return nil, fmt.Errorf("error querying pending notifications: %w", err)
583+
}
584+
585+
return notificationRecords, nil
586+
}
587+
588+
/**
589+
* Collects metrics for the Notification system, specifically about its queue.
590+
* Provides metrics related to how many notifications are pending and sent, and of what event type they are.
591+
* Also provides metrics related to how long the notifications have been in the queue
592+
*/
593+
func collectNotificationQueueMetrics() {
594+
sentNotifications, err := GetSentNotifications()
595+
if err != nil {
596+
log.Error(err, "Error retrieving sent notifications. Will skip sending metrics", 0)
597+
return // Don't return an error, we don't want to disrupt actual notification sending simply because we couldn't record metrics
598+
}
599+
pendingNotifications, err := GetPendingNotifications()
600+
if err != nil {
601+
log.Error(err, "Error retrieving pending notifications. Will skip sending metrics", 0)
602+
return // Don't return an error, we don't want to disrupt actual notification sending simply because we couldn't record metrics
603+
}
604+
605+
now := time.Now() // Checking the time once so that it is consistent across all metrics for this collection attempt
606+
607+
// Record for each sent notification how long it took to send. Honestly, can probably remove this later since this metric can be sent once when the notification itself is delivered.
608+
for _, notification := range sentNotifications {
609+
eventType := GetEventLabelForNotification(notification)
610+
611+
// Record the amount of time records that were sent (and that still exist in the queue) took to sent
612+
metrics.NotificationsQueue_Sent_Time.WithLabelValues(notification.Channel, eventType).Observe(GetTimeDiffMilliseconds(*notification.Sent, *notification.Created))
613+
}
614+
615+
// Record for each pending notification how long it has been in the queue
616+
for _, notification := range pendingNotifications {
617+
eventType := GetEventLabelForNotification(notification)
618+
619+
// Record the amount of time these records have been waiting to been sent
620+
metrics.NotificationsQueue_Pending_Time.WithLabelValues(notification.Channel, eventType).Observe(GetTimeDiffMilliseconds(*notification.Created, now))
621+
}
622+
623+
// Count number of pending notifications in the queue by event type
624+
eventTypeCount := CountByEventType(pendingNotifications)
625+
for eventType, numNotifications := range eventTypeCount {
626+
metrics.NotificationsQueue_Event_Size.WithLabelValues(eventType, string(Pending)).Set(float64(numNotifications))
627+
}
628+
629+
// Count number of sent notifications in the queue by event type
630+
eventTypeCount = CountByEventType(sentNotifications)
631+
for eventType, numNotifications := range eventTypeCount {
632+
metrics.NotificationsQueue_Event_Size.WithLabelValues(eventType, string(Sent)).Set(float64(numNotifications))
633+
}
634+
635+
// Count number of pending notifications in the queue by channel
636+
channelCount := CountByChannel(pendingNotifications)
637+
for channelType, numNotifications := range channelCount {
638+
metrics.NotificationsQueue_Channel_Size.WithLabelValues(channelType, string(Pending)).Set(float64(numNotifications))
639+
}
640+
641+
// Count number of sent notifications in the queue by channel
642+
channelCount = CountByChannel(sentNotifications)
643+
for channelType, numNotifications := range channelCount {
644+
metrics.NotificationsQueue_Channel_Size.WithLabelValues(channelType, string(Sent)).Set(float64(numNotifications))
645+
}
646+
}
647+
648+
/**
649+
* Simple wrapper that enables submitting metrics for notifications with unknown event names.
650+
*/
651+
func GetEventLabelForNotification(notification Notification) string {
652+
eventName, err := ExtractEventNameFromNotification(notification)
653+
if err != nil {
654+
return UnknownEvent
655+
}
656+
657+
return string(*eventName)
658+
}
659+
660+
/**
661+
* Because we don't record the event type when recording notifications, we have to do some work to extract them from the
662+
* notification message that is eventually sent to the user.
663+
*/
664+
func ExtractEventNameFromNotification(notification Notification) (*types.EventName, error) {
665+
for eventName, eventDescription := range types.EventLabel {
666+
if strings.Contains(notification.Content, eventDescription) {
667+
return &eventName, nil
668+
}
669+
}
670+
671+
return nil, fmt.Errorf("no EventName found for notification %d matching any event descriptions", notification.Id)
672+
}
673+
674+
/**
675+
* Given a collection of notifications, count the number of notifications with each distinct event type.
676+
*/
677+
func CountByEventType(notifications []Notification) map[string]int {
678+
eventTypeCountMap := make(map[string]int, len(types.EventLabel)+1) // +1 to account for the "unknown" event type
679+
// Initialize the map with all EventLabel types, with the Count set to 0
680+
// Must be pre-initialized, because we still want to submit metrics for 0-count EventTypes, so they must exist in this map.
681+
for eventType := range types.EventLabel {
682+
eventTypeCountMap[string(eventType)] = 0
683+
}
684+
eventTypeCountMap[UnknownEvent] = 0 // include unknown, which indicates an EventType which we couldn't parse from the Notification's content field
685+
686+
// Now iterate over the list of events, and increment the value in the map
687+
for _, notification := range notifications {
688+
eventType := GetEventLabelForNotification(notification)
689+
eventTypeCountMap[eventType] = eventTypeCountMap[eventType] + 1
690+
}
691+
return eventTypeCountMap
692+
}
693+
694+
/**
695+
* Given a collection of notifications, count the number of notifications with each distinct channel type.
696+
*/
697+
func CountByChannel(notifications []Notification) map[string]int {
698+
channelCountMap := make(map[string]int, len(types.NotificationChannels))
699+
// Initialize the map with the Channel types, with the Count set to 0.
700+
// Must be pre-initialized, because we still want to submit metrics for 0-count Channels, so they must exist in this map.
701+
for _, channelType := range types.NotificationChannels {
702+
channelCountMap[string(channelType)] = 0
703+
}
704+
705+
// Now iterate over the list of notifications, and increment the value for each channel
706+
for _, notification := range notifications {
707+
channelCountMap[notification.Channel] = channelCountMap[notification.Channel] + 1
708+
}
709+
return channelCountMap
710+
}
711+
712+
/**
713+
* Returns the amount of milliseconds between two timestamps. Always returns a positive
714+
* duration, so you don't have to worry about date ordering
715+
*/
716+
func GetTimeDiffMilliseconds(time1 time.Time, time2 time.Time) float64 {
717+
duration := time1.Sub(time2)
718+
return math.Abs(float64(duration.Milliseconds()))
719+
}

0 commit comments

Comments
 (0)