Skip to content

Commit 9f63c1c

Browse files
authored
refact pkg/acquisition: split loki.go (#4034)
1 parent 77e4360 commit 9f63c1c

File tree

9 files changed

+436
-387
lines changed

9 files changed

+436
-387
lines changed

pkg/acquisition/loki.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88

99
var (
1010
// verify interface compliance
11-
_ DataSource = (*loki.LokiSource)(nil)
12-
_ DSNConfigurer = (*loki.LokiSource)(nil)
13-
_ Fetcher = (*loki.LokiSource)(nil)
14-
_ Tailer = (*loki.LokiSource)(nil)
15-
_ MetricsProvider = (*loki.LokiSource)(nil)
11+
_ DataSource = (*loki.Source)(nil)
12+
_ DSNConfigurer = (*loki.Source)(nil)
13+
_ Fetcher = (*loki.Source)(nil)
14+
_ Tailer = (*loki.Source)(nil)
15+
_ MetricsProvider = (*loki.Source)(nil)
1616
)
1717

1818
//nolint:gochecknoinits
1919
func init() {
20-
registerDataSource("loki", func() DataSource { return &loki.LokiSource{} })
20+
registerDataSource("loki", func() DataSource { return &loki.Source{} })
2121
}
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package loki
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net/url"
8+
"strconv"
9+
"strings"
10+
"time"
11+
12+
yaml "github.com/goccy/go-yaml"
13+
log "github.com/sirupsen/logrus"
14+
15+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
16+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki/internal/lokiclient"
17+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
18+
)
19+
20+
const lokiLimit int = 100
21+
22+
type AuthConfiguration struct {
23+
Username string `yaml:"username"`
24+
Password string `yaml:"password"`
25+
}
26+
27+
type Configuration struct {
28+
URL string `yaml:"url"` // Loki url
29+
Prefix string `yaml:"prefix"` // Loki prefix
30+
Query string `yaml:"query"` // LogQL query
31+
Limit int `yaml:"limit"` // Limit of logs to read
32+
DelayFor time.Duration `yaml:"delay_for"`
33+
Since time.Duration `yaml:"since"`
34+
Headers map[string]string `yaml:"headers"` // HTTP headers for talking to Loki
35+
WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds
36+
Auth AuthConfiguration `yaml:"auth"`
37+
MaxFailureDuration time.Duration `yaml:"max_failure_duration"` // Max duration of failure before stopping the source
38+
NoReadyCheck bool `yaml:"no_ready_check"` // Bypass /ready check before starting
39+
configuration.DataSourceCommonCfg `yaml:",inline"`
40+
}
41+
42+
func (l *Source) UnmarshalConfig(yamlConfig []byte) error {
43+
err := yaml.UnmarshalWithOptions(yamlConfig, &l.Config, yaml.Strict())
44+
if err != nil {
45+
return fmt.Errorf("cannot parse loki acquisition configuration: %s", yaml.FormatError(err, false, false))
46+
}
47+
48+
if l.Config.Query == "" {
49+
return errors.New("loki query is mandatory")
50+
}
51+
52+
if l.Config.WaitForReady == 0 {
53+
l.Config.WaitForReady = 10 * time.Second
54+
}
55+
56+
if l.Config.DelayFor < 0*time.Second || l.Config.DelayFor > 5*time.Second {
57+
return errors.New("delay_for should be a value between 1s and 5s")
58+
}
59+
60+
if l.Config.Mode == "" {
61+
l.Config.Mode = configuration.TAIL_MODE
62+
}
63+
64+
if l.Config.Prefix == "" {
65+
l.Config.Prefix = "/"
66+
}
67+
68+
if !strings.HasSuffix(l.Config.Prefix, "/") {
69+
l.Config.Prefix += "/"
70+
}
71+
72+
if l.Config.Limit == 0 {
73+
l.Config.Limit = lokiLimit
74+
}
75+
76+
if l.Config.Mode == configuration.TAIL_MODE {
77+
l.logger.Infof("Resetting since")
78+
l.Config.Since = 0
79+
}
80+
81+
if l.Config.MaxFailureDuration == 0 {
82+
l.Config.MaxFailureDuration = 30 * time.Second
83+
}
84+
85+
return nil
86+
}
87+
88+
func (l *Source) Configure(_ context.Context, config []byte, logger *log.Entry, metricsLevel metrics.AcquisitionMetricsLevel) error {
89+
l.Config = Configuration{}
90+
l.logger = logger
91+
l.metricsLevel = metricsLevel
92+
93+
if err := l.UnmarshalConfig(config); err != nil {
94+
return err
95+
}
96+
97+
l.logger.Infof("Since value: %s", l.Config.Since.String())
98+
99+
clientConfig := lokiclient.Config{
100+
LokiURL: l.Config.URL,
101+
Headers: l.Config.Headers,
102+
Limit: l.Config.Limit,
103+
Query: l.Config.Query,
104+
Since: l.Config.Since,
105+
Username: l.Config.Auth.Username,
106+
Password: l.Config.Auth.Password,
107+
FailMaxDuration: l.Config.MaxFailureDuration,
108+
}
109+
110+
l.Client = lokiclient.NewLokiClient(clientConfig)
111+
l.Client.Logger = logger.WithFields(log.Fields{"component": "lokiclient", "source": l.Config.URL})
112+
113+
return nil
114+
}
115+
116+
func (l *Source) ConfigureByDSN(_ context.Context, dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
117+
l.logger = logger
118+
l.Config = Configuration{}
119+
l.Config.Mode = configuration.CAT_MODE
120+
l.Config.Labels = labels
121+
l.Config.UniqueId = uuid
122+
123+
u, err := url.Parse(dsn)
124+
if err != nil {
125+
return fmt.Errorf("while parsing dsn '%s': %w", dsn, err)
126+
}
127+
128+
if u.Scheme != "loki" {
129+
return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn)
130+
}
131+
132+
if u.Host == "" {
133+
return errors.New("empty loki host")
134+
}
135+
136+
scheme := "http"
137+
138+
params := u.Query()
139+
if q := params.Get("ssl"); q != "" {
140+
scheme = "https"
141+
}
142+
143+
if q := params.Get("query"); q != "" {
144+
l.Config.Query = q
145+
}
146+
147+
if w := params.Get("wait_for_ready"); w != "" {
148+
l.Config.WaitForReady, err = time.ParseDuration(w)
149+
if err != nil {
150+
return err
151+
}
152+
} else {
153+
l.Config.WaitForReady = 10 * time.Second
154+
}
155+
156+
if d := params.Get("delay_for"); d != "" {
157+
l.Config.DelayFor, err = time.ParseDuration(d)
158+
if err != nil {
159+
return fmt.Errorf("invalid duration: %w", err)
160+
}
161+
162+
if l.Config.DelayFor < 0*time.Second || l.Config.DelayFor > 5*time.Second {
163+
return errors.New("delay_for should be a value between 1s and 5s")
164+
}
165+
} else {
166+
l.Config.DelayFor = 0 * time.Second
167+
}
168+
169+
if s := params.Get("since"); s != "" {
170+
l.Config.Since, err = time.ParseDuration(s)
171+
if err != nil {
172+
return fmt.Errorf("invalid since in dsn: %w", err)
173+
}
174+
}
175+
176+
if maxFailureDuration := params.Get("max_failure_duration"); maxFailureDuration != "" {
177+
duration, err := time.ParseDuration(maxFailureDuration)
178+
if err != nil {
179+
return fmt.Errorf("invalid max_failure_duration in dsn: %w", err)
180+
}
181+
182+
l.Config.MaxFailureDuration = duration
183+
} else {
184+
l.Config.MaxFailureDuration = 5 * time.Second // for OneShot mode it doesn't make sense to have longer duration
185+
}
186+
187+
if limit := params.Get("limit"); limit != "" {
188+
limit, err := strconv.Atoi(limit)
189+
if err != nil {
190+
return fmt.Errorf("invalid limit in dsn: %w", err)
191+
}
192+
193+
l.Config.Limit = limit
194+
} else {
195+
l.Config.Limit = 5000 // max limit allowed by loki
196+
}
197+
198+
if logLevel := params.Get("log_level"); logLevel != "" {
199+
level, err := log.ParseLevel(logLevel)
200+
if err != nil {
201+
return fmt.Errorf("invalid log_level in dsn: %w", err)
202+
}
203+
204+
l.Config.LogLevel = level
205+
l.logger.Logger.SetLevel(level)
206+
}
207+
208+
if noReadyCheck := params.Get("no_ready_check"); noReadyCheck != "" {
209+
noReadyCheck, err := strconv.ParseBool(noReadyCheck)
210+
if err != nil {
211+
return fmt.Errorf("invalid no_ready_check in dsn: %w", err)
212+
}
213+
214+
l.Config.NoReadyCheck = noReadyCheck
215+
}
216+
217+
l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host)
218+
if u.User != nil {
219+
l.Config.Auth.Username = u.User.Username()
220+
l.Config.Auth.Password, _ = u.User.Password()
221+
}
222+
223+
clientConfig := lokiclient.Config{
224+
LokiURL: l.Config.URL,
225+
Headers: l.Config.Headers,
226+
Limit: l.Config.Limit,
227+
Query: l.Config.Query,
228+
Since: l.Config.Since,
229+
Username: l.Config.Auth.Username,
230+
Password: l.Config.Auth.Password,
231+
DelayFor: int(l.Config.DelayFor / time.Second),
232+
}
233+
234+
l.Client = lokiclient.NewLokiClient(clientConfig)
235+
l.Client.Logger = logger.WithFields(log.Fields{"component": "lokiclient", "source": l.Config.URL})
236+
237+
return nil
238+
}

pkg/acquisition/modules/loki/entry.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ type Entry struct {
1414

1515
func (e *Entry) UnmarshalJSON(b []byte) error {
1616
var values []string
17-
err := json.Unmarshal(b, &values)
18-
if err != nil {
17+
18+
if err := json.Unmarshal(b, &values); err != nil {
1919
return err
2020
}
2121

@@ -27,8 +27,10 @@ func (e *Entry) UnmarshalJSON(b []byte) error {
2727
if err != nil {
2828
return err
2929
}
30+
3031
e.Timestamp = time.Unix(int64(t), 0)
3132
e.Line = values[1]
33+
3234
return nil
3335
}
3436

@@ -57,7 +59,7 @@ type LokiQuery struct {
5759
type Data struct {
5860
ResultType string `json:"resultType"`
5961
Result []StreamResult `json:"result"` // Warning, just stream value is handled
60-
Stats interface{} `json:"stats"` // Stats is boring, just ignore it
62+
Stats any `json:"stats"` // Stats is boring, just ignore it
6163
}
6264

6365
type StreamResult struct {

0 commit comments

Comments
 (0)