Skip to content

Commit a004601

Browse files
authored
refact pkg/acquisition: split syslog.go (#4028)
1 parent 9e5283a commit a004601

File tree

6 files changed

+178
-108
lines changed

6 files changed

+178
-108
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package syslogacquisition
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
8+
yaml "github.com/goccy/go-yaml"
9+
log "github.com/sirupsen/logrus"
10+
11+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
12+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
13+
)
14+
15+
type Configuration struct {
16+
Proto string `yaml:"protocol,omitempty"`
17+
Port int `yaml:"listen_port,omitempty"`
18+
Addr string `yaml:"listen_addr,omitempty"`
19+
MaxMessageLen int `yaml:"max_message_len,omitempty"`
20+
DisableRFCParser bool `yaml:"disable_rfc_parser,omitempty"` // if true, we don't try to be smart and just remove the PRI
21+
configuration.DataSourceCommonCfg `yaml:",inline"`
22+
}
23+
24+
func validatePort(port int) bool {
25+
return port > 0 && port <= 65535
26+
}
27+
28+
func validateAddr(addr string) bool {
29+
return net.ParseIP(addr) != nil
30+
}
31+
32+
func (s *Source) UnmarshalConfig(yamlConfig []byte) error {
33+
s.config = Configuration{}
34+
s.config.Mode = configuration.TAIL_MODE
35+
36+
err := yaml.UnmarshalWithOptions(yamlConfig, &s.config, yaml.Strict())
37+
if err != nil {
38+
return fmt.Errorf("cannot parse syslog configuration: %s", yaml.FormatError(err, false, false))
39+
}
40+
41+
if s.config.Addr == "" {
42+
s.config.Addr = "127.0.0.1" // do we want a usable or secure default ?
43+
}
44+
45+
if s.config.Port == 0 {
46+
s.config.Port = 514
47+
}
48+
49+
if s.config.MaxMessageLen == 0 {
50+
s.config.MaxMessageLen = 2048
51+
}
52+
53+
if !validatePort(s.config.Port) {
54+
return fmt.Errorf("invalid port %d", s.config.Port)
55+
}
56+
57+
if !validateAddr(s.config.Addr) {
58+
return fmt.Errorf("invalid listen IP %s", s.config.Addr)
59+
}
60+
61+
return nil
62+
}
63+
64+
func (s *Source) Configure(_ context.Context, yamlConfig []byte, logger *log.Entry, metricsLevel metrics.AcquisitionMetricsLevel) error {
65+
s.logger = logger
66+
s.logger.Infof("Starting syslog datasource configuration")
67+
s.metricsLevel = metricsLevel
68+
69+
err := s.UnmarshalConfig(yamlConfig)
70+
if err != nil {
71+
return err
72+
}
73+
74+
return nil
75+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package syslogacquisition
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
6+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
7+
)
8+
9+
func (*Source) GetMetrics() []prometheus.Collector {
10+
return []prometheus.Collector{
11+
metrics.SyslogDataSourceLinesReceived,
12+
metrics.SyslogDataSourceLinesParsed,
13+
}
14+
}
15+
16+
func (*Source) GetAggregMetrics() []prometheus.Collector {
17+
return []prometheus.Collector{
18+
metrics.SyslogDataSourceLinesReceived,
19+
metrics.SyslogDataSourceLinesParsed,
20+
}
21+
}
Lines changed: 28 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -4,135 +4,42 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7-
"net"
87
"strings"
98
"time"
109

11-
yaml "github.com/goccy/go-yaml"
1210
"github.com/prometheus/client_golang/prometheus"
13-
log "github.com/sirupsen/logrus"
1411
"gopkg.in/tomb.v2"
1512

1613
"github.com/crowdsecurity/go-cs-lib/trace"
1714

18-
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
1915
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/parser/rfc3164"
2016
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/parser/rfc5424"
2117
syslogserver "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/server"
2218
"github.com/crowdsecurity/crowdsec/pkg/metrics"
2319
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
2420
)
2521

26-
type SyslogConfiguration struct {
27-
Proto string `yaml:"protocol,omitempty"`
28-
Port int `yaml:"listen_port,omitempty"`
29-
Addr string `yaml:"listen_addr,omitempty"`
30-
MaxMessageLen int `yaml:"max_message_len,omitempty"`
31-
DisableRFCParser bool `yaml:"disable_rfc_parser,omitempty"` // if true, we don't try to be smart and just remove the PRI
32-
configuration.DataSourceCommonCfg `yaml:",inline"`
33-
}
34-
35-
type SyslogSource struct {
36-
metricsLevel metrics.AcquisitionMetricsLevel
37-
config SyslogConfiguration
38-
logger *log.Entry
39-
server *syslogserver.SyslogServer
40-
serverTomb *tomb.Tomb
41-
}
42-
43-
func (s *SyslogSource) GetUuid() string {
44-
return s.config.UniqueId
45-
}
46-
47-
func (*SyslogSource) GetName() string {
48-
return "syslog"
49-
}
50-
51-
func (s *SyslogSource) GetMode() string {
52-
return s.config.Mode
53-
}
54-
55-
func (s *SyslogSource) Dump() any {
56-
return s
57-
}
58-
59-
func (*SyslogSource) CanRun() error {
60-
return nil
61-
}
62-
63-
func (*SyslogSource) GetMetrics() []prometheus.Collector {
64-
return []prometheus.Collector{metrics.SyslogDataSourceLinesReceived, metrics.SyslogDataSourceLinesParsed}
65-
}
66-
67-
func (*SyslogSource) GetAggregMetrics() []prometheus.Collector {
68-
return []prometheus.Collector{metrics.SyslogDataSourceLinesReceived, metrics.SyslogDataSourceLinesParsed}
69-
}
70-
71-
func validatePort(port int) bool {
72-
return port > 0 && port <= 65535
73-
}
74-
75-
func validateAddr(addr string) bool {
76-
return net.ParseIP(addr) != nil
77-
}
78-
79-
func (s *SyslogSource) UnmarshalConfig(yamlConfig []byte) error {
80-
s.config = SyslogConfiguration{}
81-
s.config.Mode = configuration.TAIL_MODE
82-
83-
err := yaml.UnmarshalWithOptions(yamlConfig, &s.config, yaml.Strict())
84-
if err != nil {
85-
return fmt.Errorf("cannot parse syslog configuration: %s", yaml.FormatError(err, false, false))
86-
}
87-
88-
if s.config.Addr == "" {
89-
s.config.Addr = "127.0.0.1" // do we want a usable or secure default ?
90-
}
91-
if s.config.Port == 0 {
92-
s.config.Port = 514
93-
}
94-
if s.config.MaxMessageLen == 0 {
95-
s.config.MaxMessageLen = 2048
96-
}
97-
if !validatePort(s.config.Port) {
98-
return fmt.Errorf("invalid port %d", s.config.Port)
99-
}
100-
if !validateAddr(s.config.Addr) {
101-
return fmt.Errorf("invalid listen IP %s", s.config.Addr)
102-
}
103-
104-
return nil
105-
}
106-
107-
func (s *SyslogSource) Configure(_ context.Context, yamlConfig []byte, logger *log.Entry, metricsLevel metrics.AcquisitionMetricsLevel) error {
108-
s.logger = logger
109-
s.logger.Infof("Starting syslog datasource configuration")
110-
s.metricsLevel = metricsLevel
111-
err := s.UnmarshalConfig(yamlConfig)
112-
if err != nil {
113-
return err
114-
}
115-
116-
return nil
117-
}
118-
119-
func (s *SyslogSource) StreamingAcquisition(_ context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
22+
func (s *Source) StreamingAcquisition(_ context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
12023
c := make(chan syslogserver.SyslogMessage)
12124
s.server = &syslogserver.SyslogServer{Logger: s.logger.WithField("syslog", "internal"), MaxMessageLen: s.config.MaxMessageLen}
12225
s.server.SetChannel(c)
26+
12327
err := s.server.Listen(s.config.Addr, s.config.Port)
12428
if err != nil {
12529
return fmt.Errorf("could not start syslog server: %w", err)
12630
}
31+
12732
s.serverTomb = s.server.StartServer()
33+
12834
t.Go(func() error {
12935
defer trace.CatchPanic("crowdsec/acquis/syslog/live")
13036
return s.handleSyslogMsg(out, t, c)
13137
})
38+
13239
return nil
13340
}
13441

135-
func (s *SyslogSource) buildLogFromSyslog(ts time.Time, hostname string,
42+
func (s *Source) buildLogFromSyslog(ts time.Time, hostname string,
13643
appname string, pid string, msg string,
13744
) string {
13845
ret := ""
@@ -142,46 +49,58 @@ func (s *SyslogSource) buildLogFromSyslog(ts time.Time, hostname string,
14249
s.logger.Tracef("%s - missing TS", msg)
14350
ret += time.Now().UTC().Format("Jan 2 15:04:05")
14451
}
52+
14553
if hostname != "" {
14654
ret += " " + hostname
14755
} else {
14856
s.logger.Tracef("%s - missing host", msg)
14957
ret += " unknownhost"
15058
}
59+
15160
if appname != "" {
15261
ret += " " + appname
15362
}
63+
15464
if pid != "" {
15565
ret += "[" + pid + "]: "
15666
} else {
15767
ret += ": "
15868
}
69+
15970
if msg != "" {
16071
ret += msg
16172
}
73+
16274
return ret
16375
}
16476

165-
func (s *SyslogSource) parseLine(syslogLine syslogserver.SyslogMessage) string {
77+
func (s *Source) parseLine(syslogLine syslogserver.SyslogMessage) string {
16678
var line string
16779

16880
logger := s.logger.WithField("client", syslogLine.Client)
16981
logger.Tracef("raw: %s", syslogLine)
82+
17083
if s.metricsLevel != metrics.AcquisitionMetricsLevelNone {
17184
metrics.SyslogDataSourceLinesReceived.With(prometheus.Labels{"source": syslogLine.Client, "datasource_type": "syslog", "acquis_type": s.config.Labels["type"]}).Inc()
17285
}
86+
17387
if !s.config.DisableRFCParser {
17488
p := rfc3164.NewRFC3164Parser(rfc3164.WithCurrentYear())
89+
17590
err := p.Parse(syslogLine.Message)
17691
if err != nil {
17792
logger.Debugf("could not parse as RFC3164 (%s)", err)
93+
17894
p2 := rfc5424.NewRFC5424Parser()
95+
17996
err = p2.Parse(syslogLine.Message)
18097
if err != nil {
18198
logger.Errorf("could not parse message: %s", err)
18299
logger.Debugf("could not parse as RFC5424 (%s) : %s", err, syslogLine.Message)
100+
183101
return ""
184102
}
103+
185104
line = s.buildLogFromSyslog(p2.Timestamp, p2.Hostname, p2.Tag, p2.PID, p2.Message)
186105
if s.metricsLevel != metrics.AcquisitionMetricsLevelNone {
187106
metrics.SyslogDataSourceLinesParsed.With(prometheus.Labels{"source": syslogLine.Client, "type": "rfc5424", "datasource_type": "syslog", "acquis_type": s.config.Labels["type"]}).Inc()
@@ -197,39 +116,46 @@ func (s *SyslogSource) parseLine(syslogLine syslogserver.SyslogMessage) string {
197116
logger.Errorf("malformated message, missing PRI (message too short)")
198117
return ""
199118
}
119+
200120
if syslogLine.Message[0] != '<' {
201121
logger.Errorf("malformated message, missing PRI beginning")
202122
return ""
203123
}
124+
204125
priEnd := bytes.Index(syslogLine.Message, []byte(">"))
205126
if priEnd == -1 {
206127
logger.Errorf("malformated message, missing PRI end")
207128
return ""
208129
}
130+
209131
if priEnd > 4 {
210132
logger.Errorf("malformated message, PRI too long")
211133
return ""
212134
}
135+
213136
for i := 1; i < priEnd; i++ {
214137
if syslogLine.Message[i] < '0' || syslogLine.Message[i] > '9' {
215138
logger.Errorf("malformated message, PRI not a number")
216139
return ""
217140
}
218141
}
142+
219143
line = string(syslogLine.Message[priEnd+1:])
220144
}
221145

222146
return strings.TrimSuffix(line, "\n")
223147
}
224148

225-
func (s *SyslogSource) handleSyslogMsg(out chan pipeline.Event, t *tomb.Tomb, c chan syslogserver.SyslogMessage) error {
149+
func (s *Source) handleSyslogMsg(out chan pipeline.Event, t *tomb.Tomb, c chan syslogserver.SyslogMessage) error {
226150
killed := false
151+
227152
for {
228153
select {
229154
case <-t.Dying():
230155
if !killed {
231156
s.logger.Info("Syslog datasource is dying")
232157
s.serverTomb.Kill(nil)
158+
233159
killed = true
234160
}
235161
case <-s.serverTomb.Dead():
@@ -252,6 +178,7 @@ func (s *SyslogSource) handleSyslogMsg(out chan pipeline.Event, t *tomb.Tomb, c
252178
l.Process = true
253179
evt := pipeline.MakeEvent(s.config.UseTimeMachine, pipeline.LOG, true)
254180
evt.Line = l
181+
255182
out <- evt
256183
}
257184
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package syslogacquisition
2+
3+
import (
4+
log "github.com/sirupsen/logrus"
5+
"gopkg.in/tomb.v2"
6+
7+
syslogserver "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/server"
8+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
9+
)
10+
11+
type Source struct {
12+
metricsLevel metrics.AcquisitionMetricsLevel
13+
config Configuration
14+
logger *log.Entry
15+
server *syslogserver.SyslogServer
16+
serverTomb *tomb.Tomb
17+
}
18+
19+
func (s *Source) GetUuid() string {
20+
return s.config.UniqueId
21+
}
22+
23+
func (*Source) GetName() string {
24+
return "syslog"
25+
}
26+
27+
func (s *Source) GetMode() string {
28+
return s.config.Mode
29+
}
30+
31+
func (s *Source) Dump() any {
32+
return s
33+
}
34+
35+
func (*Source) CanRun() error {
36+
return nil
37+
}

0 commit comments

Comments
 (0)