Skip to content

Commit 77e4360

Browse files
authored
refact pkg/acquisition: split victorialogs.go (#4037)
1 parent 9518184 commit 77e4360

File tree

7 files changed

+430
-379
lines changed

7 files changed

+430
-379
lines changed
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
package victorialogs
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net/url"
8+
"strconv"
9+
"strings"
10+
"time"
11+
12+
yaml "github.com/goccy/go-yaml"
13+
log "github.com/sirupsen/logrus"
14+
15+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
16+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/victorialogs/internal/vlclient"
17+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
18+
)
19+
20+
const (
21+
defaultLimit int = 100
22+
)
23+
24+
type AuthConfiguration struct {
25+
Username string `yaml:"username"`
26+
Password string `yaml:"password"`
27+
}
28+
29+
type Configuration struct {
30+
URL string `yaml:"url"` // VictoriaLogs url
31+
Prefix string `yaml:"prefix"` // VictoriaLogs prefix
32+
Query string `yaml:"query"` // LogsQL query
33+
Limit int `yaml:"limit"` // Limit of logs to read
34+
Since time.Duration `yaml:"since"`
35+
Headers map[string]string `yaml:"headers"` // HTTP headers for talking to VictoriaLogs
36+
WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds
37+
Auth AuthConfiguration `yaml:"auth"`
38+
MaxFailureDuration time.Duration `yaml:"max_failure_duration"` // Max duration of failure before stopping the source
39+
configuration.DataSourceCommonCfg `yaml:",inline"`
40+
}
41+
42+
func (s *Source) UnmarshalConfig(yamlConfig []byte) error {
43+
err := yaml.UnmarshalWithOptions(yamlConfig, &s.Config, yaml.Strict())
44+
if err != nil {
45+
return fmt.Errorf("cannot parse VictoriaLogs acquisition configuration: %s", yaml.FormatError(err, false, false))
46+
}
47+
48+
if s.Config.URL == "" {
49+
return errors.New("VictoriaLogs url is mandatory")
50+
}
51+
52+
if s.Config.Query == "" {
53+
return errors.New("VictoriaLogs query is mandatory")
54+
}
55+
56+
if s.Config.WaitForReady == 0 {
57+
s.Config.WaitForReady = 10 * time.Second
58+
}
59+
60+
if s.Config.Mode == "" {
61+
s.Config.Mode = configuration.TAIL_MODE
62+
}
63+
64+
if s.Config.Prefix == "" {
65+
s.Config.Prefix = "/"
66+
}
67+
68+
if !strings.HasSuffix(s.Config.Prefix, "/") {
69+
s.Config.Prefix += "/"
70+
}
71+
72+
if s.Config.Limit == 0 {
73+
s.Config.Limit = defaultLimit
74+
}
75+
76+
if s.Config.Mode == configuration.TAIL_MODE {
77+
s.logger.Infof("Resetting since")
78+
s.Config.Since = 0
79+
}
80+
81+
if s.Config.MaxFailureDuration == 0 {
82+
s.Config.MaxFailureDuration = 30 * time.Second
83+
}
84+
85+
return nil
86+
}
87+
88+
func (s *Source) Configure(_ context.Context, config []byte, logger *log.Entry, metricsLevel metrics.AcquisitionMetricsLevel) error {
89+
s.Config = Configuration{}
90+
s.logger = logger
91+
s.metricsLevel = metricsLevel
92+
93+
err := s.UnmarshalConfig(config)
94+
if err != nil {
95+
return err
96+
}
97+
98+
s.logger.Infof("Since value: %s", s.Config.Since.String())
99+
100+
clientConfig := vlclient.Config{
101+
URL: s.Config.URL,
102+
Headers: s.Config.Headers,
103+
Limit: s.Config.Limit,
104+
Query: s.Config.Query,
105+
Since: s.Config.Since,
106+
Username: s.Config.Auth.Username,
107+
Password: s.Config.Auth.Password,
108+
FailMaxDuration: s.Config.MaxFailureDuration,
109+
}
110+
111+
s.Client = vlclient.NewVLClient(clientConfig)
112+
s.Client.Logger = logger.WithFields(log.Fields{"component": "victorialogs-client", "source": s.Config.URL})
113+
114+
return nil
115+
}
116+
117+
func (s *Source) ConfigureByDSN(_ context.Context, dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
118+
s.logger = logger
119+
s.Config = Configuration{}
120+
s.Config.Mode = configuration.CAT_MODE
121+
s.Config.Labels = labels
122+
s.Config.UniqueId = uuid
123+
124+
u, err := url.Parse(dsn)
125+
if err != nil {
126+
return fmt.Errorf("while parsing dsn '%s': %w", dsn, err)
127+
}
128+
129+
if u.Scheme != "victorialogs" {
130+
return fmt.Errorf("invalid DSN %s for VictoriaLogs source, must start with victorialogs://", dsn)
131+
}
132+
133+
if u.Host == "" {
134+
return errors.New("empty host")
135+
}
136+
137+
scheme := "http"
138+
139+
params := u.Query()
140+
141+
if q := params.Get("ssl"); q != "" {
142+
scheme = "https"
143+
}
144+
145+
if q := params.Get("query"); q != "" {
146+
s.Config.Query = q
147+
}
148+
149+
if w := params.Get("wait_for_ready"); w != "" {
150+
s.Config.WaitForReady, err = time.ParseDuration(w)
151+
if err != nil {
152+
return err
153+
}
154+
} else {
155+
s.Config.WaitForReady = 10 * time.Second
156+
}
157+
158+
if since := params.Get("since"); since != "" {
159+
s.Config.Since, err = time.ParseDuration(since)
160+
if err != nil {
161+
return fmt.Errorf("invalid since in dsn: %w", err)
162+
}
163+
}
164+
165+
if maxFailureDuration := params.Get("max_failure_duration"); maxFailureDuration != "" {
166+
duration, err := time.ParseDuration(maxFailureDuration)
167+
if err != nil {
168+
return fmt.Errorf("invalid max_failure_duration in dsn: %w", err)
169+
}
170+
171+
s.Config.MaxFailureDuration = duration
172+
} else {
173+
s.Config.MaxFailureDuration = 5 * time.Second // for OneShot mode it doesn't make sense to have longer duration
174+
}
175+
176+
if limit := params.Get("limit"); limit != "" {
177+
limit, err := strconv.Atoi(limit)
178+
if err != nil {
179+
return fmt.Errorf("invalid limit in dsn: %w", err)
180+
}
181+
182+
s.Config.Limit = limit
183+
}
184+
185+
if logLevel := params.Get("log_level"); logLevel != "" {
186+
level, err := log.ParseLevel(logLevel)
187+
if err != nil {
188+
return fmt.Errorf("invalid log_level in dsn: %w", err)
189+
}
190+
191+
s.Config.LogLevel = level
192+
s.logger.Logger.SetLevel(level)
193+
}
194+
195+
s.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host)
196+
if u.User != nil {
197+
s.Config.Auth.Username = u.User.Username()
198+
s.Config.Auth.Password, _ = u.User.Password()
199+
}
200+
201+
clientConfig := vlclient.Config{
202+
URL: s.Config.URL,
203+
Headers: s.Config.Headers,
204+
Limit: s.Config.Limit,
205+
Query: s.Config.Query,
206+
Since: s.Config.Since,
207+
Username: s.Config.Auth.Username,
208+
Password: s.Config.Auth.Password,
209+
}
210+
211+
s.Client = vlclient.NewVLClient(clientConfig)
212+
s.Client.Logger = logger.WithFields(log.Fields{"component": "victorialogs-client", "source": s.Config.URL})
213+
214+
return nil
215+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package victorialogs
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
6+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
7+
)
8+
9+
func (*Source) GetMetrics() []prometheus.Collector {
10+
return []prometheus.Collector{
11+
metrics.VictorialogsDataSourceLinesRead,
12+
}
13+
}
14+
15+
func (*Source) GetAggregMetrics() []prometheus.Collector {
16+
return []prometheus.Collector{
17+
metrics.VictorialogsDataSourceLinesRead,
18+
}
19+
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package victorialogs
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
"gopkg.in/tomb.v2"
9+
10+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
11+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/victorialogs/internal/vlclient"
12+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
13+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
14+
)
15+
16+
// OneShotAcquisition reads a set of file and returns when done
17+
func (s *Source) OneShotAcquisition(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
18+
s.logger.Debug("VictoriaLogs one shot acquisition")
19+
s.Client.SetTomb(t)
20+
21+
readyCtx, cancel := context.WithTimeout(ctx, s.Config.WaitForReady)
22+
defer cancel()
23+
24+
err := s.Client.Ready(readyCtx)
25+
if err != nil {
26+
return fmt.Errorf("VictoriaLogs is not ready: %w", err)
27+
}
28+
29+
ctx, cancel = context.WithCancel(ctx)
30+
defer cancel()
31+
32+
respChan, err := s.getResponseChan(ctx, false)
33+
if err != nil {
34+
return fmt.Errorf("error when starting acquisition: %w", err)
35+
}
36+
37+
for {
38+
select {
39+
case <-t.Dying():
40+
s.logger.Debug("VictoriaLogs one shot acquisition stopped")
41+
return nil
42+
case resp, ok := <-respChan:
43+
if !ok {
44+
s.logger.Info("VictoriaLogs acquisition completed")
45+
return nil
46+
}
47+
48+
s.readOneEntry(resp, s.Config.Labels, out)
49+
}
50+
}
51+
}
52+
53+
func (s *Source) readOneEntry(entry *vlclient.Log, labels map[string]string, out chan pipeline.Event) {
54+
ll := pipeline.Line{}
55+
ll.Raw = entry.Message
56+
ll.Time = entry.Time
57+
ll.Src = s.Config.URL
58+
ll.Labels = labels
59+
ll.Process = true
60+
ll.Module = s.GetName()
61+
62+
if s.metricsLevel != metrics.AcquisitionMetricsLevelNone {
63+
metrics.VictorialogsDataSourceLinesRead.With(prometheus.Labels{"source": s.Config.URL, "datasource_type": "victorialogs", "acquis_type": s.Config.Labels["type"]}).Inc()
64+
}
65+
66+
expectMode := pipeline.LIVE
67+
if s.Config.UseTimeMachine {
68+
expectMode = pipeline.TIMEMACHINE
69+
}
70+
71+
out <- pipeline.Event{
72+
Line: ll,
73+
Process: true,
74+
Type: pipeline.LOG,
75+
ExpectMode: expectMode,
76+
}
77+
}
78+
79+
func (s *Source) StreamingAcquisition(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
80+
s.Client.SetTomb(t)
81+
82+
readyCtx, cancel := context.WithTimeout(ctx, s.Config.WaitForReady)
83+
defer cancel()
84+
85+
err := s.Client.Ready(readyCtx)
86+
if err != nil {
87+
return fmt.Errorf("VictoriaLogs is not ready: %w", err)
88+
}
89+
90+
lctx, clientCancel := context.WithCancel(ctx)
91+
// Don't defer clientCancel(), the client outlives this function call
92+
93+
t.Go(func() error {
94+
<-t.Dying()
95+
clientCancel()
96+
97+
return nil
98+
})
99+
100+
t.Go(func() error {
101+
respChan, err := s.getResponseChan(lctx, true)
102+
if err != nil {
103+
clientCancel()
104+
s.logger.Errorf("could not start VictoriaLogs tail: %s", err)
105+
106+
return fmt.Errorf("while starting VictoriaLogs tail: %w", err)
107+
}
108+
109+
for {
110+
select {
111+
case resp, ok := <-respChan:
112+
if !ok {
113+
s.logger.Warnf("VictoriaLogs channel closed")
114+
clientCancel()
115+
116+
return err
117+
}
118+
119+
s.readOneEntry(resp, s.Config.Labels, out)
120+
case <-t.Dying():
121+
clientCancel()
122+
return nil
123+
}
124+
}
125+
})
126+
127+
return nil
128+
}
129+
130+
func (s *Source) getResponseChan(ctx context.Context, infinite bool) (chan *vlclient.Log, error) {
131+
var (
132+
respChan chan *vlclient.Log
133+
err error
134+
)
135+
136+
if s.Config.Mode == configuration.TAIL_MODE {
137+
respChan, err = s.Client.Tail(ctx)
138+
if err != nil {
139+
s.logger.Errorf("could not start VictoriaLogs tail: %s", err)
140+
return respChan, fmt.Errorf("while starting VictoriaLogs tail: %w", err)
141+
}
142+
} else {
143+
respChan = s.Client.QueryRange(ctx, infinite)
144+
}
145+
146+
return respChan, err
147+
}

0 commit comments

Comments
 (0)