Skip to content

Commit 58d6821

Browse files
authored
refact: drop parserTomb, lpMetricsTomb (#4138)
1 parent 2302ec5 commit 58d6821

File tree

6 files changed

+61
-92
lines changed

6 files changed

+61
-92
lines changed

cmd/crowdsec/crowdsec.go

Lines changed: 31 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55
"fmt"
66
"os"
77
"strconv"
8-
"sync"
98
"time"
109

1110
log "github.com/sirupsen/logrus"
11+
"golang.org/x/sync/errgroup"
1212

1313
"github.com/crowdsecurity/go-cs-lib/trace"
1414

@@ -69,40 +69,25 @@ func initCrowdsec(ctx context.Context, cConfig *csconfig.Config, hub *cwhub.Hub,
6969
return csParsers, datasources, nil
7070
}
7171

72-
func startParserRoutines(cConfig *csconfig.Config, parsers *parser.Parsers) {
73-
// start go-routines for parsing, buckets pour and outputs.
74-
parserWg := &sync.WaitGroup{}
75-
76-
parsersTomb.Go(func() error {
77-
parserWg.Add(1)
78-
79-
for range cConfig.Crowdsec.ParserRoutinesCount {
80-
parsersTomb.Go(func() error {
81-
defer trace.CatchPanic("crowdsec/runParse")
82-
83-
if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil {
84-
// this error will never happen as parser.Parse is not able to return errors
85-
return err
86-
}
87-
88-
return nil
89-
})
90-
}
91-
92-
parserWg.Done()
93-
94-
return nil
95-
})
96-
parserWg.Wait()
72+
func startParserRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers) {
73+
for idx := range cConfig.Crowdsec.ParserRoutinesCount {
74+
log.WithField("idx", idx).Info("Starting parser routine")
75+
g.Go(func() error {
76+
defer trace.CatchPanic("crowdsec/runParse/"+strconv.Itoa(idx))
77+
runParse(ctx, inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes)
78+
return nil
79+
})
80+
}
9781
}
9882

99-
func startBucketRoutines(ctx context.Context, cConfig *csconfig.Config) {
83+
func startBucketRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config) {
10084
for idx := range cConfig.Crowdsec.BucketsRoutinesCount {
101-
log.Infof("Starting bucket routine %d", idx)
102-
go func() {
85+
log.WithField("idx", idx).Info("Starting bucket routine")
86+
g.Go(func() error {
10387
defer trace.CatchPanic("crowdsec/runPour/"+strconv.Itoa(idx))
10488
runPour(ctx, inputEventChan, holders, buckets, cConfig)
105-
}()
89+
return nil
90+
})
10691
}
10792
}
10893

@@ -112,24 +97,13 @@ func startHeartBeat(ctx context.Context, _ *csconfig.Config, apiClient *apiclien
11297
}
11398

11499
func startOutputRoutines(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, apiClient *apiclient.ApiClient) {
115-
outputWg := &sync.WaitGroup{}
116-
117-
outputsTomb.Go(func() error {
118-
outputWg.Add(1)
119-
120-
for range cConfig.Crowdsec.OutputRoutinesCount {
121-
outputsTomb.Go(func() error {
122-
defer trace.CatchPanic("crowdsec/runOutput")
123-
124-
return runOutput(ctx, inputEventChan, outputEventChan, buckets, *parsers.PovfwCtx, parsers.Povfwnodes, apiClient)
125-
})
126-
}
127-
128-
outputWg.Done()
129-
130-
return nil
131-
})
132-
outputWg.Wait()
100+
for idx := range cConfig.Crowdsec.OutputRoutinesCount {
101+
log.WithField("idx", idx).Info("Starting output routine")
102+
outputsTomb.Go(func() error {
103+
defer trace.CatchPanic("crowdsec/runOutput/"+strconv.Itoa(idx))
104+
return runOutput(ctx, inputEventChan, outputEventChan, buckets, *parsers.PovfwCtx, parsers.Povfwnodes, apiClient)
105+
})
106+
}
133107
}
134108

135109
func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *apiclient.ApiClient, hub *cwhub.Hub, datasources []acquisition.DataSource) error {
@@ -142,9 +116,9 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap
142116
hub,
143117
)
144118

145-
lpMetricsTomb.Go(func() error {
146-
return mp.Run(ctx, &lpMetricsTomb)
147-
})
119+
go func() {
120+
mp.Run(ctx)
121+
}()
148122

149123
if cConfig.Prometheus != nil && cConfig.Prometheus.Enabled {
150124
aggregated := false
@@ -161,13 +135,12 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap
161135
}
162136

163137
// runCrowdsec starts the log processor service
164-
func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource) error {
138+
func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource) error {
165139
inputEventChan = make(chan pipeline.Event)
166140
inputLineChan = make(chan pipeline.Event)
167141

168-
startParserRoutines(cConfig, parsers)
169-
170-
startBucketRoutines(ctx, cConfig)
142+
startParserRoutines(ctx, g, cConfig, parsers)
143+
startBucketRoutines(ctx, g, cConfig)
171144

172145
apiClient, err := apiclient.GetLAPIClient()
173146
if err != nil {
@@ -195,6 +168,7 @@ func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser.
195168
func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool) {
196169
cctx, cancel := context.WithCancel(ctx)
197170

171+
var g errgroup.Group
198172

199173
crowdsecTomb.Go(func() error {
200174
defer trace.CatchPanic("crowdsec/serveCrowdsec")
@@ -206,7 +180,7 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf
206180

207181
agentReady <- true
208182

209-
if err := runCrowdsec(cctx, cConfig, parsers, hub, datasources); err != nil {
183+
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources); err != nil {
210184
log.Fatalf("unable to start crowdsec routines: %s", err)
211185
}
212186
}()
@@ -218,7 +192,7 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf
218192
waitOnTomb()
219193
log.Debugf("Shutting down crowdsec routines")
220194

221-
if err := ShutdownCrowdsecRoutines(cancel); err != nil {
195+
if err := ShutdownCrowdsecRoutines(cancel, &g); err != nil {
222196
return fmt.Errorf("unable to shutdown crowdsec routines: %w", err)
223197
}
224198

cmd/crowdsec/lpmetrics.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
io_prometheus_client "github.com/prometheus/client_model/go"
1414

1515
"github.com/sirupsen/logrus"
16-
"gopkg.in/tomb.v2"
1716

1817
"github.com/crowdsecurity/go-cs-lib/ptr"
1918
"github.com/crowdsecurity/go-cs-lib/trace"
@@ -403,11 +402,11 @@ func (m *MetricsProvider) sendMetrics(ctx context.Context, met *models.AllMetric
403402
}
404403
}
405404

406-
func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error {
405+
func (m *MetricsProvider) Run(ctx context.Context) {
407406
defer trace.CatchPanic("crowdsec/MetricsProvider.Run")
408407

409408
if m.interval == time.Duration(0) {
410-
return nil
409+
return
411410
}
412411

413412
ticker := time.NewTicker(1) // Send on start
@@ -418,9 +417,9 @@ func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error {
418417
met := m.metricsPayload()
419418
m.sendMetrics(ctx, met)
420419
ticker.Reset(m.interval)
421-
case <-myTomb.Dying():
420+
case <-ctx.Done():
422421
ticker.Stop()
423-
return nil
422+
return
424423
}
425424
}
426425
}

cmd/crowdsec/main.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,10 @@ import (
3131
var (
3232
// tombs for the parser, buckets and outputs.
3333
acquisTomb tomb.Tomb
34-
parsersTomb tomb.Tomb
3534
outputsTomb tomb.Tomb
3635
apiTomb tomb.Tomb
3736
crowdsecTomb tomb.Tomb
3837
pluginTomb tomb.Tomb
39-
lpMetricsTomb tomb.Tomb
4038

4139
flags Flags
4240

cmd/crowdsec/output.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,11 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip
132132
}
133133

134134
if event.Overflow.Reprocess {
135-
log.Debugf("Overflow being reprocessed.")
136135
select {
137136
case input <- event:
138-
log.Debugf("reprocessing overflow event")
139-
case <-parsersTomb.Dead():
140-
log.Debugf("parsing is dead, skipping")
137+
log.Debug("Reprocessing overflow event")
138+
case <-ctx.Done():
139+
log.Debug("Reprocessing overflow event: parsing is dead, skipping")
141140
}
142141
}
143142
if dumpStates {

cmd/crowdsec/parse.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/prometheus/client_golang/prometheus"
@@ -11,12 +12,12 @@ import (
1112
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
1213
)
1314

14-
func runParse(input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) error {
15+
func runParse(ctx context.Context, input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) {
1516
for {
1617
select {
17-
case <-parsersTomb.Dying():
18+
case <-ctx.Done():
1819
log.Infof("Killing parser routines")
19-
return nil
20+
return
2021
case event := <-input:
2122
if !event.Process {
2223
continue

cmd/crowdsec/serve.go

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
log "github.com/sirupsen/logrus"
13+
"golang.org/x/sync/errgroup"
1314
"gopkg.in/tomb.v2"
1415

1516
"github.com/crowdsecurity/go-cs-lib/csdaemon"
@@ -27,12 +28,10 @@ import (
2728
func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) {
2829
// re-initialize tombs
2930
acquisTomb = tomb.Tomb{}
30-
parsersTomb = tomb.Tomb{}
3131
outputsTomb = tomb.Tomb{}
3232
apiTomb = tomb.Tomb{}
3333
crowdsecTomb = tomb.Tomb{}
3434
pluginTomb = tomb.Tomb{}
35-
lpMetricsTomb = tomb.Tomb{}
3635

3736
cConfig, err := LoadConfig(flags.ConfigFile, flags.DisableAgent, flags.DisableAPI, false)
3837
if err != nil {
@@ -86,7 +85,19 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) {
8685
return cConfig, nil
8786
}
8887

89-
func ShutdownCrowdsecRoutines(cancel context.CancelFunc) error {
88+
func waitErrGroup(g *errgroup.Group, timeout time.Duration) error {
89+
done := make(chan error, 1)
90+
go func() { done <- g.Wait() }()
91+
92+
select {
93+
case err := <-done:
94+
return err
95+
case <-time.After(timeout):
96+
return context.DeadlineExceeded
97+
}
98+
}
99+
100+
func ShutdownCrowdsecRoutines(cancel context.CancelFunc, g *errgroup.Group) error {
90101
var reterr error
91102

92103
log.Debugf("Shutting down crowdsec sub-routines")
@@ -103,20 +114,8 @@ func ShutdownCrowdsecRoutines(cancel context.CancelFunc) error {
103114
}
104115

105116
log.Debugf("acquisition is finished, wait for parser/bucket/ouputs.")
106-
parsersTomb.Kill(nil)
107117
drainChan(inputEventChan)
108118

109-
if err := parsersTomb.Wait(); err != nil {
110-
log.Warningf("Parsers returned error : %s", err)
111-
reterr = err
112-
}
113-
114-
log.Debugf("parsers is done")
115-
time.Sleep(1 * time.Second) // ugly workaround for now to ensure PourItemtoholders are finished
116-
cancel()
117-
118-
log.Debugf("buckets is done")
119-
time.Sleep(1 * time.Second) // ugly workaround for now
120119
outputsTomb.Kill(nil)
121120

122121
done := make(chan error, 1)
@@ -139,13 +138,14 @@ func ShutdownCrowdsecRoutines(cancel context.CancelFunc) error {
139138
log.Warningf("Outputs didn't finish in time, some events may have not been flushed")
140139
}
141140

142-
lpMetricsTomb.Kill(nil)
141+
cancel()
143142

144-
if err := lpMetricsTomb.Wait(); err != nil {
145-
log.Warningf("Metrics returned error : %s", err)
146-
reterr = err
143+
if err := waitErrGroup(g, 3 * time.Second); err != nil {
144+
log.WithError(err).Warn("timeout waiting for parser/bucket routines")
147145
}
148146

147+
log.Debugf("parsers are done")
148+
log.Debugf("buckets are done")
149149
log.Debugf("metrics are done")
150150

151151
// He's dead, Jim.
@@ -304,12 +304,10 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error {
304304

305305
func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool) error {
306306
acquisTomb = tomb.Tomb{}
307-
parsersTomb = tomb.Tomb{}
308307
outputsTomb = tomb.Tomb{}
309308
apiTomb = tomb.Tomb{}
310309
crowdsecTomb = tomb.Tomb{}
311310
pluginTomb = tomb.Tomb{}
312-
lpMetricsTomb = tomb.Tomb{}
313311

314312
if cConfig.API.Server != nil && cConfig.API.Server.DbConfig != nil {
315313
dbCfg := cConfig.API.Server.DbConfig

0 commit comments

Comments
 (0)