Skip to content

Commit b715124

Browse files
authored
Merge pull request #146 from nyaruka/cloudwatch
Add cloudwatch service with support for batching metric data
2 parents 2f44776 + 9b3775c commit b715124

File tree

7 files changed

+157
-32
lines changed

7 files changed

+157
-32
lines changed

aws/config.go

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package aws
2+
3+
import (
4+
"context"
5+
6+
"github.com/aws/aws-sdk-go-v2/aws"
7+
"github.com/aws/aws-sdk-go-v2/config"
8+
"github.com/aws/aws-sdk-go-v2/credentials"
9+
)
10+
11+
// NewConfig creates a new AWS config with the given credentials and region
12+
func NewConfig(accessKey, secretKey, region string) (aws.Config, error) {
13+
opts := []func(*config.LoadOptions) error{config.WithRegion(region)}
14+
15+
if accessKey != "" && secretKey != "" {
16+
opts = append(opts, config.WithCredentialsProvider(credentials.StaticCredentialsProvider{Value: aws.Credentials{
17+
AccessKeyID: accessKey, SecretAccessKey: secretKey,
18+
}}))
19+
}
20+
21+
return config.LoadDefaultConfig(context.TODO(), opts...)
22+
}

aws/cwatch/service.go

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package cwatch
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"sync"
7+
"time"
8+
9+
"github.com/aws/aws-sdk-go-v2/aws"
10+
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
11+
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
12+
awsx "github.com/nyaruka/gocommon/aws"
13+
"github.com/nyaruka/gocommon/syncx"
14+
)
15+
16+
type Service struct {
17+
Client *cloudwatch.Client
18+
namespace string
19+
deployment types.Dimension
20+
batcher *syncx.Batcher[types.MetricDatum]
21+
}
22+
23+
// NewService creates a new Cloudwatch service with the given credentials and configuration
24+
func NewService(accessKey, secretKey, region, namespace, deployment string, wg *sync.WaitGroup) (*Service, error) {
25+
cfg, err := awsx.NewConfig(accessKey, secretKey, region)
26+
if err != nil {
27+
return nil, err
28+
}
29+
30+
s := &Service{
31+
Client: cloudwatch.NewFromConfig(cfg),
32+
namespace: namespace,
33+
deployment: types.Dimension{Name: aws.String("Deployment"), Value: aws.String(deployment)},
34+
}
35+
s.batcher = syncx.NewBatcher(s.processBatch, 100, time.Second*3, 1000, wg)
36+
37+
return s, nil
38+
}
39+
40+
func (s *Service) Start() {
41+
s.batcher.Start()
42+
}
43+
44+
func (s *Service) Stop() {
45+
s.batcher.Stop()
46+
}
47+
48+
func (s *Service) Queue(d types.MetricDatum) {
49+
s.batcher.Queue(d)
50+
}
51+
52+
func (s *Service) Prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInput {
53+
// add deployment as the first dimension to all metrics
54+
for i := range data {
55+
data[i].Dimensions = append([]types.Dimension{s.deployment}, data[i].Dimensions...)
56+
}
57+
58+
return &cloudwatch.PutMetricDataInput{
59+
Namespace: aws.String(s.namespace),
60+
MetricData: data,
61+
}
62+
}
63+
64+
func (s *Service) processBatch(batch []types.MetricDatum) {
65+
_, err := s.Client.PutMetricData(context.TODO(), s.Prepare(batch))
66+
if err != nil {
67+
slog.Error("error sending metrics to cloudwatch", "error", err, "count", len(batch))
68+
}
69+
}

aws/cwatch/service_test.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package cwatch_test
2+
3+
import (
4+
"sync"
5+
"testing"
6+
7+
"github.com/aws/aws-sdk-go-v2/aws"
8+
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
9+
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
10+
"github.com/nyaruka/gocommon/aws/cwatch"
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestService(t *testing.T) {
15+
wg := &sync.WaitGroup{}
16+
17+
svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "testing", wg)
18+
assert.NoError(t, err)
19+
20+
assert.Equal(t, &cloudwatch.PutMetricDataInput{
21+
Namespace: aws.String("Foo"),
22+
MetricData: []types.MetricDatum{
23+
{
24+
MetricName: aws.String("NumGoats"),
25+
Dimensions: []types.Dimension{
26+
{Name: aws.String("Deployment"), Value: aws.String("testing")},
27+
},
28+
Value: aws.Float64(10),
29+
},
30+
{
31+
MetricName: aws.String("NumSheep"),
32+
Dimensions: []types.Dimension{
33+
{Name: aws.String("Deployment"), Value: aws.String("testing")},
34+
{Name: aws.String("Host"), Value: aws.String("foo1")},
35+
},
36+
Value: aws.Float64(20),
37+
},
38+
},
39+
}, svc.Prepare([]types.MetricDatum{
40+
{MetricName: aws.String("NumGoats"), Value: aws.Float64(10)},
41+
{MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20)},
42+
}))
43+
44+
svc.Start()
45+
46+
svc.Stop()
47+
48+
wg.Wait()
49+
}

aws/dynamo/service.go

+3-12
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@ import (
55
"fmt"
66

77
"github.com/aws/aws-sdk-go-v2/aws"
8-
"github.com/aws/aws-sdk-go-v2/config"
9-
"github.com/aws/aws-sdk-go-v2/credentials"
108
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
119
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
10+
awsx "github.com/nyaruka/gocommon/aws"
1211
)
1312

1413
// Service is simple abstraction layer to work with a DynamoDB-compatible database
@@ -17,17 +16,9 @@ type Service struct {
1716
tablePrefix string
1817
}
1918

20-
// NewService creates a new S3 service with the given credentials and configuration
19+
// NewService creates a new dynamodb service with the given credentials and configuration
2120
func NewService(accessKey, secretKey, region, endpoint, tablePrefix string) (*Service, error) {
22-
opts := []func(*config.LoadOptions) error{config.WithRegion(region)}
23-
24-
if accessKey != "" && secretKey != "" {
25-
opts = append(opts, config.WithCredentialsProvider(credentials.StaticCredentialsProvider{Value: aws.Credentials{
26-
AccessKeyID: accessKey, SecretAccessKey: secretKey,
27-
}}))
28-
}
29-
30-
cfg, err := config.LoadDefaultConfig(context.TODO(), opts...)
21+
cfg, err := awsx.NewConfig(accessKey, secretKey, region)
3122
if err != nil {
3223
return nil, err
3324
}

aws/s3x/service.go

+2-11
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@ import (
99
"time"
1010

1111
"github.com/aws/aws-sdk-go-v2/aws"
12-
"github.com/aws/aws-sdk-go-v2/config"
13-
"github.com/aws/aws-sdk-go-v2/credentials"
1412
"github.com/aws/aws-sdk-go-v2/service/s3"
1513
"github.com/aws/aws-sdk-go-v2/service/s3/types"
14+
awsx "github.com/nyaruka/gocommon/aws"
1615
)
1716

1817
// Service is simple abstraction layer to work with a S3-compatible storage service
@@ -23,15 +22,7 @@ type Service struct {
2322

2423
// NewService creates a new S3 service with the given credentials and configuration
2524
func NewService(accessKey, secretKey, region, endpoint string, minio bool) (*Service, error) {
26-
opts := []func(*config.LoadOptions) error{config.WithRegion(region)}
27-
28-
if accessKey != "" && secretKey != "" {
29-
opts = append(opts, config.WithCredentialsProvider(credentials.StaticCredentialsProvider{Value: aws.Credentials{
30-
AccessKeyID: accessKey, SecretAccessKey: secretKey,
31-
}}))
32-
}
33-
34-
cfg, err := config.LoadDefaultConfig(context.TODO(), opts...)
25+
cfg, err := awsx.NewConfig(accessKey, secretKey, region)
3526
if err != nil {
3627
return nil, err
3728
}

go.mod

+4-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/nyaruka/gocommon
33
go 1.23
44

55
require (
6-
github.com/aws/aws-sdk-go-v2 v1.32.5
6+
github.com/aws/aws-sdk-go-v2 v1.32.6
77
github.com/aws/aws-sdk-go-v2/config v1.28.5
88
github.com/aws/aws-sdk-go-v2/credentials v1.17.46
99
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.17
@@ -30,10 +30,11 @@ require (
3030
require (
3131
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect
3232
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.20 // indirect
33-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.24 // indirect
34-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.24 // indirect
33+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 // indirect
34+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect
3535
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
3636
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.24 // indirect
37+
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3
3738
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.6 // indirect
3839
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect
3940
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.5 // indirect

go.sum

+8-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
22
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
3-
github.com/aws/aws-sdk-go-v2 v1.32.5 h1:U8vdWJuY7ruAkzaOdD7guwJjD06YSKmnKCJs7s3IkIo=
4-
github.com/aws/aws-sdk-go-v2 v1.32.5/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U=
3+
github.com/aws/aws-sdk-go-v2 v1.32.6 h1:7BokKRgRPuGmKkFMhEg/jSul+tB9VvXhcViILtfG8b4=
4+
github.com/aws/aws-sdk-go-v2 v1.32.6/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U=
55
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8=
66
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7/go.mod h1:QraP0UcVlQJsmHfioCrveWOC1nbiWUl3ej08h4mXWoc=
77
github.com/aws/aws-sdk-go-v2/config v1.28.5 h1:Za41twdCXbuyyWv9LndXxZZv3QhTG1DinqlFsSuvtI0=
@@ -12,14 +12,16 @@ github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.17 h1:36xxDfD
1212
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.17/go.mod h1:A4XQVRy4yJ70Sk5Qz2tuCQX6J5kXcRa53nGP6wtgntM=
1313
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.20 h1:sDSXIrlsFSFJtWKLQS4PUWRvrT580rrnuLydJrCQ/yA=
1414
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.20/go.mod h1:WZ/c+w0ofps+/OUqMwWgnfrgzZH1DZO1RIkktICsqnY=
15-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.24 h1:4usbeaes3yJnCFC7kfeyhkdkPtoRYPa/hTmCqMpKpLI=
16-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.24/go.mod h1:5CI1JemjVwde8m2WG3cz23qHKPOxbpkq0HaoreEgLIY=
17-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.24 h1:N1zsICrQglfzaBnrfM0Ys00860C+QFwu6u/5+LomP+o=
18-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.24/go.mod h1:dCn9HbJ8+K31i8IQ8EWmWj0EiIk0+vKiHNMxTTYveAg=
15+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 h1:s/fF4+yDQDoElYhfIVvSNyeCydfbuTKzhxSXDXCPasU=
16+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25/go.mod h1:IgPfDv5jqFIzQSNbUEMoitNooSMXjRSDkhXv8jiROvU=
17+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 h1:ZntTCl5EsYnhN/IygQEUugpdwbhdkom9uHcbCftiGgA=
18+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25/go.mod h1:DBdPrgeocww+CSl1C8cEV8PN1mHMBhuCDLpXezyvWkE=
1919
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ=
2020
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
2121
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.24 h1:JX70yGKLj25+lMC5Yyh8wBtvB01GDilyRuJvXJ4piD0=
2222
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.24/go.mod h1:+Ln60j9SUTD0LEwnhEB0Xhg61DHqplBrbZpLgyjoEHg=
23+
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3 h1:nQLG9irjDGUFXVPDHzjCGEEwh0hZ6BcxTvHOod1YsP4=
24+
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3/go.mod h1:URs8sqsyaxiAZkKP6tOEmhcs9j2ynFIomqOKY/CAHJc=
2325
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.37.1 h1:vucMirlM6D+RDU8ncKaSZ/5dGrXNajozVwpmWNPn2gQ=
2426
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.37.1/go.mod h1:fceORfs010mNxZbQhfqUjUeHlTwANmIT4mvHamuUaUg=
2527
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.6 h1:hIl7Z1zcfdzsl5SiV32acFj4gY/cZ5Xr9wd6PpoNYGE=

0 commit comments

Comments
 (0)