Skip to content

Commit 6a5e3a0

Browse files
authored
pkg/acquisition: refact journalctl datasource and unified retry loop (#4023)
1 parent a6b61ed commit 6a5e3a0

File tree

18 files changed

+684
-489
lines changed

18 files changed

+684
-489
lines changed

cmd/crowdsec-cli/clisetup/setup/detect_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ detect:
390390
source: journalctl
391391
filename: /path/to/file.log`,
392392
want: nil,
393-
wantErr: `invalid acquisition spec for foobar: cannot parse JournalCtlSource configuration: [1:1] unknown field "filename"`,
393+
wantErr: `invalid acquisition spec for foobar: cannot parse journalctl acquisition config: [1:1] unknown field "filename"`,
394394
}, {
395395
name: "source file: required fields",
396396
config: `

cmd/crowdsec/main.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -111,21 +111,21 @@ func LoadBuckets(cConfig *csconfig.Config, hub *cwhub.Hub) error {
111111
}
112112

113113
func LoadAcquisition(ctx context.Context, cConfig *csconfig.Config) ([]acquisition.DataSource, error) {
114-
var err error
115-
116114
if flags.SingleFileType != "" && flags.OneShotDSN != "" {
117-
flags.Labels = labels
115+
flags.Labels = additionalLabels
118116
flags.Labels["type"] = flags.SingleFileType
119117

120-
dataSources, err = acquisition.LoadAcquisitionFromDSN(ctx, flags.OneShotDSN, flags.Labels, flags.Transform)
118+
ds, err := acquisition.LoadAcquisitionFromDSN(ctx, flags.OneShotDSN, flags.Labels, flags.Transform)
121119
if err != nil {
122-
return nil, fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err)
120+
return nil, err
123121
}
122+
dataSources = append(dataSources, ds)
124123
} else {
125-
dataSources, err = acquisition.LoadAcquisitionFromFiles(ctx, cConfig.Crowdsec, cConfig.Prometheus)
124+
dss, err := acquisition.LoadAcquisitionFromFiles(ctx, cConfig.Crowdsec, cConfig.Prometheus)
126125
if err != nil {
127126
return nil, err
128127
}
128+
dataSources = dss
129129
}
130130

131131
if len(dataSources) == 0 {
@@ -136,9 +136,9 @@ func LoadAcquisition(ctx context.Context, cConfig *csconfig.Config) ([]acquisiti
136136
}
137137

138138
var (
139-
dumpFolder string
140-
dumpStates bool
141-
labels = make(labelsMap)
139+
dumpFolder string
140+
dumpStates bool
141+
additionalLabels = make(labelsMap)
142142
)
143143

144144
func (*labelsMap) String() string {
@@ -172,7 +172,7 @@ func (f *Flags) Parse() {
172172
flag.StringVar(&f.OneShotDSN, "dsn", "", "Process a single data source in time-machine")
173173
flag.StringVar(&f.Transform, "transform", "", "expr to apply on the event after acquisition")
174174
flag.StringVar(&f.SingleFileType, "type", "", "Labels.type for file in time-machine")
175-
flag.Var(&labels, "label", "Additional Labels for file in time-machine")
175+
flag.Var(&additionalLabels, "label", "Additional Labels for file in time-machine")
176176
flag.BoolVar(&f.TestMode, "t", false, "only test configs")
177177
flag.BoolVar(&f.DisableAgent, "no-cs", false, "disable crowdsec agent")
178178
flag.BoolVar(&f.DisableAPI, "no-api", false, "disable local API")

pkg/acquisition/acquisition.go

Lines changed: 85 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ import (
88
"io"
99
"maps"
1010
"os"
11+
"time"
1112
"slices"
1213
"strings"
1314

15+
"github.com/cenkalti/backoff/v5"
1416
"github.com/expr-lang/expr"
1517
"github.com/expr-lang/expr/vm"
1618
"github.com/goccy/go-yaml"
@@ -69,6 +71,11 @@ type Tailer interface {
6971
StreamingAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error
7072
}
7173

74+
// RestartableStreamer works Like Tailer but should return any error and leave the retry logic to the caller
75+
type RestartableStreamer interface {
76+
Stream(ctx context.Context, out chan pipeline.Event) error
77+
}
78+
7279
type MetricsProvider interface {
7380
// Returns pointers to metrics that are managed by the module
7481
GetMetrics() []prometheus.Collector
@@ -121,14 +128,14 @@ func registerDataSource(dataSourceType string, dsGetter func() DataSource) {
121128
}
122129

123130
// setupLogger creates a logger for the datasource to use at runtime.
124-
func setupLogger(source, name string, level log.Level) (*log.Entry, error) {
131+
func setupLogger(typ, name string, level log.Level) (*log.Entry, error) {
125132
clog := log.New()
126133
if err := logging.ConfigureLogger(clog, level); err != nil {
127-
return nil, fmt.Errorf("while configuring datasource logger: %w", err)
134+
return nil, fmt.Errorf("configuring datasource logger: %w", err)
128135
}
129136

130137
fields := log.Fields{
131-
"type": source,
138+
"type": typ,
132139
}
133140

134141
if name != "" {
@@ -167,18 +174,20 @@ func DataSourceConfigure(ctx context.Context, commonConfig configuration.DataSou
167174
return dataSrc, nil
168175
}
169176

170-
func LoadAcquisitionFromDSN(ctx context.Context, dsn string, labels map[string]string, transformExpr string) ([]DataSource, error) {
177+
func LoadAcquisitionFromDSN(ctx context.Context, dsn string, labels map[string]string, transformExpr string) (DataSource, error) {
171178
frags := strings.Split(dsn, ":")
172179
if len(frags) == 1 {
173-
return nil, fmt.Errorf("%s isn't valid dsn (no protocol)", dsn)
180+
return nil, fmt.Errorf("%s is not a valid dsn (no protocol)", dsn)
174181
}
175182

176183
dataSrc, err := GetDataSourceIface(frags[0])
177184
if err != nil {
178185
return nil, fmt.Errorf("no acquisition for protocol %s:// - %w", frags[0], err)
179186
}
180187

181-
subLogger, err := setupLogger(dsn, "", 0)
188+
typ := labels["type"]
189+
190+
subLogger, err := setupLogger(typ, "", 0)
182191
if err != nil {
183192
return nil, err
184193
}
@@ -200,10 +209,10 @@ func LoadAcquisitionFromDSN(ctx context.Context, dsn string, labels map[string]s
200209
}
201210

202211
if err = dsnConf.ConfigureByDSN(ctx, dsn, labels, subLogger, uniqueID); err != nil {
203-
return nil, fmt.Errorf("while configuration datasource for %s: %w", dsn, err)
212+
return nil, fmt.Errorf("configuring datasource for %q: %w", dsn, err)
204213
}
205214

206-
return []DataSource{dataSrc}, nil
215+
return dataSrc, nil
207216
}
208217

209218
func GetMetricsLevelFromPromCfg(prom *csconfig.PrometheusCfg) metrics.AcquisitionMetricsLevel {
@@ -349,7 +358,7 @@ func sourcesFromFile(ctx context.Context, acquisFile string, metricsLevel metric
349358
continue
350359
}
351360

352-
return nil, fmt.Errorf("while configuring datasource of type %s from %s (position %d): %w", sub.Source, acquisFile, idx, err)
361+
return nil, fmt.Errorf("configuring datasource of type %s from %s (position %d): %w", sub.Source, acquisFile, idx, err)
353362
}
354363

355364
if sub.TransformExpr != "" {
@@ -484,6 +493,72 @@ func transform(transformChan chan pipeline.Event, output chan pipeline.Event, ac
484493
}
485494
}
486495

496+
497+
func runRestartableStream(ctx context.Context, rs RestartableStreamer, name string, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
498+
// wrap tomb logic with context
499+
ctx, cancel := context.WithCancel(ctx)
500+
go func() {
501+
<-acquisTomb.Dying()
502+
cancel()
503+
}()
504+
505+
acquisTomb.Go(func() error {
506+
// TODO: check timing and exponential?
507+
bo := backoff.NewConstantBackOff(10 * time.Second)
508+
bo.Reset() // TODO: reset according to run time
509+
510+
for {
511+
select {
512+
case <-ctx.Done():
513+
return nil
514+
default:
515+
}
516+
517+
if err := rs.Stream(ctx, output); err != nil {
518+
log.Errorf("datasource %q: stream error: %v (retrying)", name, err)
519+
}
520+
521+
select {
522+
case <-ctx.Done():
523+
return nil
524+
default:
525+
}
526+
527+
d := bo.NextBackOff()
528+
log.Infof("datasource %q: restarting stream in %s", name, d)
529+
530+
select {
531+
case <-ctx.Done():
532+
return nil
533+
case <-time.After(d):
534+
}
535+
}
536+
})
537+
538+
return nil
539+
}
540+
541+
542+
func acquireSource(ctx context.Context, source DataSource, name string, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
543+
if source.GetMode() == configuration.CAT_MODE {
544+
if s, ok := source.(Fetcher); ok {
545+
return s.OneShotAcquisition(ctx, output, acquisTomb)
546+
}
547+
548+
return fmt.Errorf("%s: cat mode is set but OneShotAcquisition is not supported", source.GetName())
549+
}
550+
551+
if s, ok := source.(Tailer); ok {
552+
return s.StreamingAcquisition(ctx, output, acquisTomb)
553+
}
554+
555+
if s, ok := source.(RestartableStreamer); ok {
556+
return runRestartableStream(ctx, s, name, output, acquisTomb)
557+
}
558+
559+
return fmt.Errorf("%s: tail mode is set but the datasource does not support streaming acquisition", source.GetName())
560+
}
561+
487562
func StartAcquisition(ctx context.Context, sources []DataSource, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
488563
// Don't wait if we have no sources, as it will hang forever
489564
if len(sources) == 0 {
@@ -497,8 +572,6 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan pip
497572
acquisTomb.Go(func() error {
498573
defer trace.CatchPanic("crowdsec/acquis")
499574

500-
var err error
501-
502575
outChan := output
503576

504577
log.Debugf("datasource %s UUID: %s", subsrc.GetName(), subsrc.GetUuid())
@@ -519,21 +592,7 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan pip
519592
})
520593
}
521594

522-
if subsrc.GetMode() == configuration.TAIL_MODE {
523-
if s, ok := subsrc.(Tailer); ok {
524-
err = s.StreamingAcquisition(ctx, outChan, acquisTomb)
525-
} else {
526-
err = fmt.Errorf("%s: tail mode is set but StreamingAcquisition is not supported", subsrc.GetName())
527-
}
528-
} else {
529-
if s, ok := subsrc.(Fetcher); ok {
530-
err = s.OneShotAcquisition(ctx, outChan, acquisTomb)
531-
} else {
532-
err = fmt.Errorf("%s: cat mode is set but OneShotAcquisition is not supported", subsrc.GetName())
533-
}
534-
}
535-
536-
if err != nil {
595+
if err := acquireSource(ctx, subsrc, subsrc.GetName(), output, acquisTomb); err != nil {
537596
// if one of the acqusition returns an error, we kill the others to properly shutdown
538597
acquisTomb.Kill(err)
539598
}

pkg/acquisition/acquisition_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ func TestLoadAcquisitionFromFiles(t *testing.T) {
280280
Config: csconfig.CrowdsecServiceCfg{
281281
AcquisitionFiles: []string{"testdata/bad_filetype.yaml"},
282282
},
283-
ExpectedError: "while configuring datasource of type file from testdata/bad_filetype.yaml",
283+
ExpectedError: "configuring datasource of type file from testdata/bad_filetype.yaml",
284284
},
285285
{
286286
TestName: "from_env",
@@ -399,9 +399,8 @@ func TestStartAcquisitionCat(t *testing.T) {
399399
acquisTomb := tomb.Tomb{}
400400

401401
go func() {
402-
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil {
403-
t.Error("unexpected error")
404-
}
402+
err := StartAcquisition(ctx, sources, out, &acquisTomb)
403+
assert.NoError(t, err)
405404
}()
406405

407406
count := 0
@@ -534,15 +533,14 @@ func TestConfigureByDSN(t *testing.T) {
534533
}{
535534
{
536535
dsn: "baddsn",
537-
ExpectedError: "baddsn isn't valid dsn (no protocol)",
536+
ExpectedError: "baddsn is not a valid dsn (no protocol)",
538537
},
539538
{
540539
dsn: "foobar://toto",
541540
ExpectedError: "no acquisition for protocol foobar://",
542541
},
543542
{
544543
dsn: "mockdsn://test_expect",
545-
ExpectedResLen: 1,
546544
},
547545
{
548546
dsn: "mockdsn://bad",
@@ -554,10 +552,15 @@ func TestConfigureByDSN(t *testing.T) {
554552

555553
for _, tc := range tests {
556554
t.Run(tc.dsn, func(t *testing.T) {
557-
srcs, err := LoadAcquisitionFromDSN(ctx, tc.dsn, map[string]string{"type": "test_label"}, "")
555+
source, err := LoadAcquisitionFromDSN(ctx, tc.dsn, map[string]string{"type": "test_label"}, "")
558556
cstest.RequireErrorContains(t, err, tc.ExpectedError)
559557

560-
assert.Len(t, srcs, tc.ExpectedResLen)
558+
if tc.ExpectedError != "" {
559+
return
560+
}
561+
562+
assert.NotNil(t, source)
563+
assert.Equal(t, "mockdsn", source.GetName())
561564
})
562565
}
563566
}
@@ -582,7 +585,7 @@ func TestStartAcquisition_MissingTailer(t *testing.T) {
582585

583586
go func() { errCh <- StartAcquisition(ctx, []DataSource{&TailModeNoTailer{}}, out, &tb) }()
584587

585-
require.ErrorContains(t, <-errCh, "tail_no_tailer: tail mode is set but StreamingAcquisition is not supported")
588+
require.ErrorContains(t, <-errCh, "tail_no_tailer: tail mode is set but the datasource does not support streaming acquisition")
586589
}
587590

588591

pkg/acquisition/journalctl.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88

99
var (
1010
// verify interface compliance
11-
_ DataSource = (*journalctlacquisition.JournalCtlSource)(nil)
12-
_ DSNConfigurer = (*journalctlacquisition.JournalCtlSource)(nil)
13-
_ Fetcher = (*journalctlacquisition.JournalCtlSource)(nil)
14-
_ Tailer = (*journalctlacquisition.JournalCtlSource)(nil)
15-
_ MetricsProvider = (*journalctlacquisition.JournalCtlSource)(nil)
11+
_ DataSource = (*journalctlacquisition.Source)(nil)
12+
_ DSNConfigurer = (*journalctlacquisition.Source)(nil)
13+
_ Fetcher = (*journalctlacquisition.Source)(nil)
14+
_ RestartableStreamer = (*journalctlacquisition.Source)(nil)
15+
_ MetricsProvider = (*journalctlacquisition.Source)(nil)
1616
)
1717

1818
//nolint:gochecknoinits
1919
func init() {
20-
registerDataSource("journalctl", func() DataSource { return &journalctlacquisition.JournalCtlSource{} })
20+
registerDataSource("journalctl", func() DataSource { return &journalctlacquisition.Source{} })
2121
}

0 commit comments

Comments
 (0)