-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathservice.go
95 lines (80 loc) · 2.49 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package cwatch
import (
"context"
"log/slog"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
awsx "github.com/nyaruka/gocommon/aws"
"github.com/nyaruka/gocommon/syncx"
)
type Service struct {
Client Client
namespace string
deployment string
batcher *syncx.Batcher[types.MetricDatum]
batcherWG *sync.WaitGroup
}
// NewService creates a new Cloudwatch service with the given credentials and configuration. Some behaviours depend on
// the given deployment value:
// - "test": metrics just logged, Queue(..) sends synchronously
// - "dev": metrics just logged, Queue(..) adds to batcher
// - "*": metrics sent to Cloudwatch, Queue(..) adds to batcher
func NewService(accessKey, secretKey, region, namespace, deployment string) (*Service, error) {
var client Client
if deployment == "dev" || deployment == "test" {
client = &DevClient{}
} else {
cfg, err := awsx.NewConfig(accessKey, secretKey, region)
if err != nil {
return nil, err
}
client = cloudwatch.NewFromConfig(cfg)
}
return &Service{Client: client, namespace: namespace, deployment: deployment}, nil
}
func (s *Service) StartQueue(maxAge time.Duration) {
if s.batcher != nil {
panic("queue already started")
}
s.batcherWG = &sync.WaitGroup{}
s.batcher = syncx.NewBatcher(s.processBatch, 100, maxAge, 1000, s.batcherWG)
s.batcher.Start()
}
func (s *Service) StopQueue() {
if s.batcher == nil {
panic("queue wasn't started")
}
s.batcher.Stop()
s.batcherWG.Wait()
}
func (s *Service) Queue(data ...types.MetricDatum) {
if s.deployment == "test" {
s.Send(context.TODO(), data...)
} else {
for _, d := range data {
s.batcher.Queue(d)
}
}
}
func (s *Service) Send(ctx context.Context, data ...types.MetricDatum) error {
_, err := s.Client.PutMetricData(ctx, s.prepare(data))
return err
}
func (s *Service) prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInput {
// add deployment as the first dimension to all metrics
for i := range data {
data[i].Dimensions = append([]types.Dimension{{Name: aws.String("Deployment"), Value: aws.String(s.deployment)}}, data[i].Dimensions...)
}
return &cloudwatch.PutMetricDataInput{
Namespace: aws.String(s.namespace),
MetricData: data,
}
}
func (s *Service) processBatch(batch []types.MetricDatum) {
if err := s.Send(context.TODO(), batch...); err != nil {
slog.Error("error sending metric data batch", "error", err, "count", len(batch))
}
}