Skip to content

Commit 5b1f5eb

Browse files
authored
cmd/crowdsec: refact output.go, pour.go, parse.go (#4157)
* output.go: light refact * pour.go: happy path * parse.go: extract parseEvent()
1 parent 58d6821 commit 5b1f5eb

File tree

3 files changed

+88
-68
lines changed

3 files changed

+88
-68
lines changed

cmd/crowdsec/output.go

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,15 @@ func dedupAlerts(alerts []pipeline.RuntimeAlert) []*models.Alert {
2020

2121
for idx, alert := range alerts {
2222
log.Tracef("alert %d/%d", idx, len(alerts))
23-
// if we have more than one source, we need to dedup
24-
if len(alert.Sources) == 0 || len(alert.Sources) == 1 {
23+
if len(alert.Sources) <= 1 {
2524
dedupCache = append(dedupCache, alert.Alert)
2625
continue
2726
}
2827

29-
for k := range alert.Sources {
30-
refsrc := *alert.Alert // copy
31-
28+
// if we have more than one source, we need to dedup
29+
for k, src := range alert.Sources {
3230
log.Tracef("source[%s]", k)
33-
34-
src := alert.Sources[k]
31+
refsrc := *alert.Alert // copy
3532
refsrc.Source = &src
3633
dedupCache = append(dedupCache, &refsrc)
3734
}
@@ -57,8 +54,15 @@ func PushAlerts(ctx context.Context, alerts []pipeline.RuntimeAlert, client *api
5754

5855
var bucketOverflows []pipeline.Event
5956

60-
func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pipeline.Event, buckets *leaky.Buckets, postOverflowCTX parser.UnixParserCtx,
61-
postOverflowNodes []parser.Node, client *apiclient.ApiClient) error {
57+
func runOutput(
58+
ctx context.Context,
59+
input chan pipeline.Event,
60+
overflow chan pipeline.Event,
61+
buckets *leaky.Buckets,
62+
postOverflowCTX parser.UnixParserCtx,
63+
postOverflowNodes []parser.Node,
64+
client *apiclient.ApiClient,
65+
) error {
6266
var (
6367
cache []pipeline.RuntimeAlert
6468
cacheMutex sync.Mutex
@@ -73,8 +77,7 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip
7377
if len(cache) > 0 {
7478
cacheMutex.Lock()
7579
cachecopy := cache
76-
newcache := make([]pipeline.RuntimeAlert, 0)
77-
cache = newcache
80+
cache = nil
7881
cacheMutex.Unlock()
7982
/*
8083
This loop needs to block as little as possible as scenarios directly write to the input chan
@@ -103,48 +106,47 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip
103106
}
104107
return nil
105108
case event := <-overflow:
109+
ov := event.Overflow
106110
// if alert is empty and mapKey is present, the overflow is just to cleanup bucket
107-
if event.Overflow.Alert == nil && event.Overflow.Mapkey != "" {
111+
if ov.Alert == nil && ov.Mapkey != "" {
108112
buckets.Bucket_map.Delete(event.Overflow.Mapkey)
109113
break
110114
}
115+
111116
/* process post overflow parser nodes */
112117
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes)
113118
if err != nil {
114119
return fmt.Errorf("postoverflow failed: %w", err)
115120
}
116121

117-
log.Info(*event.Overflow.Alert.Message)
122+
log.Info(*ov.Alert.Message)
118123

119124
// if the Alert is nil, it's to signal bucket is ready for GC, don't track this
120125
// dump after postoveflow processing to avoid missing whitelist info
121-
if dumpStates && event.Overflow.Alert != nil {
122-
if bucketOverflows == nil {
123-
bucketOverflows = make([]pipeline.Event, 0)
124-
}
125-
126+
if dumpStates && ov.Alert != nil {
126127
bucketOverflows = append(bucketOverflows, event)
127128
}
128129

129-
if event.Overflow.Whitelisted {
130-
log.Infof("[%s] is whitelisted, skip.", *event.Overflow.Alert.Message)
130+
if ov.Whitelisted {
131+
log.Infof("[%s] is whitelisted, skip.", *ov.Alert.Message)
131132
continue
132133
}
133134

134-
if event.Overflow.Reprocess {
135+
if ov.Reprocess {
135136
select {
136137
case input <- event:
137138
log.Debug("Reprocessing overflow event")
138139
case <-ctx.Done():
139140
log.Debug("Reprocessing overflow event: parsing is dead, skipping")
140141
}
141142
}
143+
142144
if dumpStates {
143145
continue
144146
}
145147

146148
cacheMutex.Lock()
147-
cache = append(cache, event.Overflow)
149+
cache = append(cache, ov)
148150
cacheMutex.Unlock()
149151
}
150152
}

cmd/crowdsec/parse.go

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,48 +12,61 @@ import (
1212
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
1313
)
1414

15+
func parseEvent(
16+
event pipeline.Event,
17+
parserCTX parser.UnixParserCtx,
18+
nodes []parser.Node,
19+
) *pipeline.Event {
20+
if !event.Process {
21+
return nil
22+
}
23+
/*Application security engine is going to generate 2 events:
24+
- one that is treated as a log and can go to scenarios
25+
- another one that will go directly to LAPI*/
26+
if event.Type == pipeline.APPSEC {
27+
outputEventChan <- event
28+
return nil
29+
}
30+
if event.Line.Module == "" {
31+
log.Errorf("empty event.Line.Module field, the acquisition module must set it ! : %+v", event.Line)
32+
return nil
33+
}
34+
metrics.GlobalParserHits.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc()
35+
36+
startParsing := time.Now()
37+
/* parse the log using magic */
38+
parsed, err := parser.Parse(parserCTX, event, nodes)
39+
if err != nil {
40+
log.Errorf("failed parsing: %v", err)
41+
}
42+
elapsed := time.Since(startParsing)
43+
metrics.GlobalParsingHistogram.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Observe(elapsed.Seconds())
44+
if !parsed.Process {
45+
metrics.GlobalParserHitsKo.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc()
46+
log.Debugf("Discarding line %+v", parsed)
47+
return nil
48+
}
49+
metrics.GlobalParserHitsOk.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc()
50+
if parsed.Whitelisted {
51+
log.Debugf("event whitelisted, discard")
52+
return nil
53+
}
54+
55+
return &parsed
56+
}
57+
1558
func runParse(ctx context.Context, input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) {
1659
for {
1760
select {
1861
case <-ctx.Done():
1962
log.Infof("Killing parser routines")
2063
return
2164
case event := <-input:
22-
if !event.Process {
23-
continue
24-
}
25-
/*Application security engine is going to generate 2 events:
26-
- one that is treated as a log and can go to scenarios
27-
- another one that will go directly to LAPI*/
28-
if event.Type == pipeline.APPSEC {
29-
outputEventChan <- event
30-
continue
31-
}
32-
if event.Line.Module == "" {
33-
log.Errorf("empty event.Line.Module field, the acquisition module must set it ! : %+v", event.Line)
34-
continue
35-
}
36-
metrics.GlobalParserHits.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc()
37-
38-
startParsing := time.Now()
39-
/* parse the log using magic */
40-
parsed, err := parser.Parse(parserCTX, event, nodes)
41-
if err != nil {
42-
log.Errorf("failed parsing: %v", err)
43-
}
44-
elapsed := time.Since(startParsing)
45-
metrics.GlobalParsingHistogram.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Observe(elapsed.Seconds())
46-
if !parsed.Process {
47-
metrics.GlobalParserHitsKo.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc()
48-
log.Debugf("Discarding line %+v", parsed)
49-
continue
50-
}
51-
metrics.GlobalParserHitsOk.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc()
52-
if parsed.Whitelisted {
53-
log.Debugf("event whitelisted, discard")
65+
parsed := parseEvent(event, parserCTX, nodes)
66+
if parsed == nil {
5467
continue
5568
}
56-
output <- parsed
69+
output <- *parsed
5770
}
5871
}
5972
}

cmd/crowdsec/pour.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,26 @@ import (
1313
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
1414
)
1515

16-
func maybeGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.Config) {
16+
func shouldTriggerGC(count int) bool {
17+
return count % 5000 == 0
18+
}
19+
20+
func triggerGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.Config) {
1721
log.Infof("%d existing buckets", leaky.LeakyRoutineCount)
1822
// when in forensics mode, garbage collect buckets
19-
if cConfig.Crowdsec.BucketsGCEnabled {
20-
if parsed.MarshaledTime != "" {
21-
z := &time.Time{}
22-
if err := z.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil {
23-
log.Warningf("Failed to parse time from event '%s': %s", parsed.MarshaledTime, err)
24-
} else {
25-
log.Warning("Starting buckets garbage collection ...")
23+
if !cConfig.Crowdsec.BucketsGCEnabled || parsed.MarshaledTime == "" {
24+
return
25+
}
2626

27-
leaky.GarbageCollectBuckets(*z, buckets)
28-
}
29-
}
27+
z := &time.Time{}
28+
if err := z.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil {
29+
log.Warningf("Failed to parse time from event '%s': %s", parsed.MarshaledTime, err)
30+
return
3031
}
32+
33+
log.Warning("Starting buckets garbage collection ...")
34+
35+
leaky.GarbageCollectBuckets(*z, buckets)
3136
}
3237

3338
func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) {
@@ -43,8 +48,8 @@ func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.Buc
4348
startTime := time.Now()
4449

4550
count++
46-
if count%5000 == 0 {
47-
maybeGC(parsed, buckets, cConfig)
51+
if shouldTriggerGC(count) {
52+
triggerGC(parsed, buckets, cConfig)
4853
}
4954
// here we can bucketify with parsed
5055
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets)

0 commit comments

Comments
 (0)