Skip to content

Commit 1c715d3

Browse files
authored
refact pkg/acquisition: split kinesis.go (#4032)
1 parent 0e88865 commit 1c715d3

File tree

7 files changed

+633
-596
lines changed

7 files changed

+633
-596
lines changed

pkg/acquisition/kinesis.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import (
88

99
var (
1010
// verify interface compliance
11-
_ DataSource = (*kinesisacquisition.KinesisSource)(nil)
12-
_ Tailer = (*kinesisacquisition.KinesisSource)(nil)
13-
_ MetricsProvider = (*kinesisacquisition.KinesisSource)(nil)
11+
_ DataSource = (*kinesisacquisition.Source)(nil)
12+
_ Tailer = (*kinesisacquisition.Source)(nil)
13+
_ MetricsProvider = (*kinesisacquisition.Source)(nil)
1414
)
1515

1616
//nolint:gochecknoinits
1717
func init() {
18-
registerDataSource("kinesis", func() DataSource { return &kinesisacquisition.KinesisSource{} })
18+
registerDataSource("kinesis", func() DataSource { return &kinesisacquisition.Source{} })
1919
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package kinesisacquisition
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"github.com/aws/aws-sdk-go-v2/aws"
9+
"github.com/aws/aws-sdk-go-v2/config"
10+
"github.com/aws/aws-sdk-go-v2/service/kinesis"
11+
12+
yaml "github.com/goccy/go-yaml"
13+
log "github.com/sirupsen/logrus"
14+
"gopkg.in/tomb.v2"
15+
16+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
17+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
18+
)
19+
20+
type Configuration struct {
21+
configuration.DataSourceCommonCfg `yaml:",inline"`
22+
23+
StreamName string `yaml:"stream_name"`
24+
StreamARN string `yaml:"stream_arn"`
25+
UseEnhancedFanOut bool `yaml:"use_enhanced_fanout"` // Use RegisterStreamConsumer and SubscribeToShard instead of GetRecords
26+
AwsProfile *string `yaml:"aws_profile"`
27+
AwsRegion string `yaml:"aws_region"`
28+
AwsEndpoint string `yaml:"aws_endpoint"`
29+
ConsumerName string `yaml:"consumer_name"`
30+
FromSubscription bool `yaml:"from_subscription"`
31+
MaxRetries int `yaml:"max_retries"`
32+
}
33+
34+
func (s *Source) UnmarshalConfig(yamlConfig []byte) error {
35+
s.Config = Configuration{}
36+
37+
err := yaml.UnmarshalWithOptions(yamlConfig, &s.Config, yaml.Strict())
38+
if err != nil {
39+
return fmt.Errorf("cannot parse kinesis datasource configuration: %s", yaml.FormatError(err, false, false))
40+
}
41+
42+
if s.Config.Mode == "" {
43+
s.Config.Mode = configuration.TAIL_MODE
44+
}
45+
46+
if s.Config.StreamName == "" && !s.Config.UseEnhancedFanOut {
47+
return errors.New("stream_name is mandatory when use_enhanced_fanout is false")
48+
}
49+
50+
if s.Config.StreamARN == "" && s.Config.UseEnhancedFanOut {
51+
return errors.New("stream_arn is mandatory when use_enhanced_fanout is true")
52+
}
53+
54+
if s.Config.ConsumerName == "" && s.Config.UseEnhancedFanOut {
55+
return errors.New("consumer_name is mandatory when use_enhanced_fanout is true")
56+
}
57+
58+
if s.Config.StreamARN != "" && s.Config.StreamName != "" {
59+
return errors.New("stream_arn and stream_name are mutually exclusive")
60+
}
61+
62+
if s.Config.MaxRetries <= 0 {
63+
s.Config.MaxRetries = 10
64+
}
65+
66+
return nil
67+
}
68+
69+
func (s *Source) Configure(ctx context.Context, yamlConfig []byte, logger *log.Entry, metricsLevel metrics.AcquisitionMetricsLevel) error {
70+
s.logger = logger
71+
s.metricsLevel = metricsLevel
72+
73+
err := s.UnmarshalConfig(yamlConfig)
74+
if err != nil {
75+
return err
76+
}
77+
78+
err = s.newClient(ctx)
79+
if err != nil {
80+
return fmt.Errorf("cannot create kinesis client: %w", err)
81+
}
82+
83+
s.shardReaderTomb = &tomb.Tomb{}
84+
85+
return nil
86+
}
87+
88+
func (s *Source) newClient(ctx context.Context) error {
89+
var loadOpts []func(*config.LoadOptions) error
90+
if s.Config.AwsProfile != nil && *s.Config.AwsProfile != "" {
91+
loadOpts = append(loadOpts, config.WithSharedConfigProfile(*s.Config.AwsProfile))
92+
}
93+
94+
region := s.Config.AwsRegion
95+
if region == "" {
96+
region = "us-east-1"
97+
}
98+
99+
loadOpts = append(loadOpts, config.WithRegion(region))
100+
101+
if c := defaultCreds(); c != nil {
102+
loadOpts = append(loadOpts, config.WithCredentialsProvider(c))
103+
}
104+
105+
cfg, err := config.LoadDefaultConfig(ctx, loadOpts...)
106+
if err != nil {
107+
return fmt.Errorf("failed to load aws config: %w", err)
108+
}
109+
110+
var clientOpts []func(*kinesis.Options)
111+
if s.Config.AwsEndpoint != "" {
112+
clientOpts = append(clientOpts, func(o *kinesis.Options) {
113+
o.BaseEndpoint = aws.String(s.Config.AwsEndpoint)
114+
})
115+
}
116+
117+
s.kClient = kinesis.NewFromConfig(cfg, clientOpts...)
118+
119+
return nil
120+
}

0 commit comments

Comments
 (0)