Skip to content

Commit 409c15a

Browse files
authored
refactg pkg/acquisition: split file.go (#4038)
1 parent a004601 commit 409c15a

File tree

8 files changed

+775
-717
lines changed

8 files changed

+775
-717
lines changed

pkg/acquisition/file.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88

99
var (
1010
// verify interface compliance
11-
_ DataSource = (*fileacquisition.FileSource)(nil)
12-
_ DSNConfigurer = (*fileacquisition.FileSource)(nil)
13-
_ Fetcher = (*fileacquisition.FileSource)(nil)
14-
_ Tailer = (*fileacquisition.FileSource)(nil)
15-
_ MetricsProvider = (*fileacquisition.FileSource)(nil)
11+
_ DataSource = (*fileacquisition.Source)(nil)
12+
_ DSNConfigurer = (*fileacquisition.Source)(nil)
13+
_ Fetcher = (*fileacquisition.Source)(nil)
14+
_ Tailer = (*fileacquisition.Source)(nil)
15+
_ MetricsProvider = (*fileacquisition.Source)(nil)
1616
)
1717

1818
//nolint:gochecknoinits
1919
func init() {
20-
registerDataSource("file", func() DataSource { return &fileacquisition.FileSource{} })
20+
registerDataSource("file", func() DataSource { return &fileacquisition.Source{} })
2121
}
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
package fileacquisition
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net/url"
8+
"path/filepath"
9+
"regexp"
10+
"strconv"
11+
"strings"
12+
"sync"
13+
"time"
14+
15+
"github.com/fsnotify/fsnotify"
16+
yaml "github.com/goccy/go-yaml"
17+
log "github.com/sirupsen/logrus"
18+
19+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
20+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
21+
)
22+
23+
type Configuration struct {
24+
Filenames []string
25+
ExcludeRegexps []string `yaml:"exclude_regexps"`
26+
Filename string
27+
ForceInotify bool `yaml:"force_inotify"`
28+
MaxBufferSize int `yaml:"max_buffer_size"`
29+
PollWithoutInotify *bool `yaml:"poll_without_inotify"`
30+
DiscoveryPollEnable bool `yaml:"discovery_poll_enable"`
31+
DiscoveryPollInterval time.Duration `yaml:"discovery_poll_interval"`
32+
configuration.DataSourceCommonCfg `yaml:",inline"`
33+
}
34+
35+
func (s *Source) UnmarshalConfig(yamlConfig []byte) error {
36+
s.config = Configuration{}
37+
38+
err := yaml.UnmarshalWithOptions(yamlConfig, &s.config, yaml.Strict())
39+
if err != nil {
40+
return fmt.Errorf("cannot parse FileAcquisition configuration: %s", yaml.FormatError(err, false, false))
41+
}
42+
43+
if s.logger != nil {
44+
s.logger.Tracef("FileAcquisition configuration: %+v", s.config)
45+
}
46+
47+
if s.config.Filename != "" {
48+
s.config.Filenames = append(s.config.Filenames, s.config.Filename)
49+
}
50+
51+
if len(s.config.Filenames) == 0 {
52+
return errors.New("no filename or filenames configuration provided")
53+
}
54+
55+
if s.config.Mode == "" {
56+
s.config.Mode = configuration.TAIL_MODE
57+
}
58+
59+
if s.config.Mode != configuration.CAT_MODE && s.config.Mode != configuration.TAIL_MODE {
60+
return fmt.Errorf("unsupported mode %s for file source", s.config.Mode)
61+
}
62+
63+
for _, exclude := range s.config.ExcludeRegexps {
64+
re, err := regexp.Compile(exclude)
65+
if err != nil {
66+
return fmt.Errorf("could not compile regexp %s: %w", exclude, err)
67+
}
68+
69+
s.exclude_regexps = append(s.exclude_regexps, re)
70+
}
71+
72+
return nil
73+
}
74+
75+
func (s *Source) Configure(_ context.Context, yamlConfig []byte, logger *log.Entry, metricsLevel metrics.AcquisitionMetricsLevel) error {
76+
s.logger = logger
77+
s.metricsLevel = metricsLevel
78+
79+
err := s.UnmarshalConfig(yamlConfig)
80+
if err != nil {
81+
return err
82+
}
83+
84+
s.watchedDirectories = make(map[string]bool)
85+
s.tailMapMutex = &sync.RWMutex{}
86+
s.tails = make(map[string]bool)
87+
88+
s.watcher, err = fsnotify.NewWatcher()
89+
if err != nil {
90+
return fmt.Errorf("could not create fsnotify watcher: %w", err)
91+
}
92+
93+
s.logger.Tracef("Actual FileAcquisition Configuration %+v", s.config)
94+
95+
for _, pattern := range s.config.Filenames {
96+
if s.config.ForceInotify {
97+
directory := filepath.Dir(pattern)
98+
s.logger.Infof("Force add watch on %s", directory)
99+
100+
if !s.watchedDirectories[directory] {
101+
err = s.watcher.Add(directory)
102+
if err != nil {
103+
s.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
104+
continue
105+
}
106+
107+
s.watchedDirectories[directory] = true
108+
}
109+
}
110+
111+
files, err := filepath.Glob(pattern)
112+
if err != nil {
113+
return fmt.Errorf("glob failure: %w", err)
114+
}
115+
116+
if len(files) == 0 {
117+
s.logger.Warnf("No matching files for pattern %s", pattern)
118+
continue
119+
}
120+
121+
for _, file := range files {
122+
if s.isExcluded(file) {
123+
continue
124+
}
125+
126+
if files[0] != pattern && s.config.Mode == configuration.TAIL_MODE { // we have a glob pattern
127+
directory := filepath.Dir(file)
128+
s.logger.Debugf("Will add watch to directory: %s", directory)
129+
130+
if !s.watchedDirectories[directory] {
131+
err = s.watcher.Add(directory)
132+
if err != nil {
133+
s.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
134+
continue
135+
}
136+
137+
s.watchedDirectories[directory] = true
138+
} else {
139+
s.logger.Debugf("Watch for directory %s already exists", directory)
140+
}
141+
}
142+
143+
s.logger.Infof("Adding file %s to datasources", file)
144+
s.files = append(s.files, file)
145+
}
146+
}
147+
148+
return nil
149+
}
150+
151+
func (s *Source) ConfigureByDSN(_ context.Context, dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
152+
if !strings.HasPrefix(dsn, "file://") {
153+
return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
154+
}
155+
156+
s.logger = logger
157+
s.config = Configuration{}
158+
159+
dsn = strings.TrimPrefix(dsn, "file://")
160+
161+
args := strings.Split(dsn, "?")
162+
163+
if args[0] == "" {
164+
return errors.New("empty file:// DSN")
165+
}
166+
167+
if len(args) == 2 && args[1] != "" {
168+
params, err := url.ParseQuery(args[1])
169+
if err != nil {
170+
return fmt.Errorf("could not parse file args: %w", err)
171+
}
172+
173+
for key, value := range params {
174+
switch key {
175+
case "log_level":
176+
if len(value) != 1 {
177+
return errors.New("expected zero or one value for 'log_level'")
178+
}
179+
180+
lvl, err := log.ParseLevel(value[0])
181+
if err != nil {
182+
return fmt.Errorf("unknown level %s: %w", value[0], err)
183+
}
184+
185+
s.logger.Logger.SetLevel(lvl)
186+
case "max_buffer_size":
187+
if len(value) != 1 {
188+
return errors.New("expected zero or one value for 'max_buffer_size'")
189+
}
190+
191+
maxBufferSize, err := strconv.Atoi(value[0])
192+
if err != nil {
193+
return fmt.Errorf("could not parse max_buffer_size %s: %w", value[0], err)
194+
}
195+
196+
s.config.MaxBufferSize = maxBufferSize
197+
default:
198+
return fmt.Errorf("unknown parameter %s", key)
199+
}
200+
}
201+
}
202+
203+
s.config.Labels = labels
204+
s.config.Mode = configuration.CAT_MODE
205+
s.config.UniqueId = uuid
206+
207+
s.logger.Debugf("Will try pattern %s", args[0])
208+
209+
files, err := filepath.Glob(args[0])
210+
if err != nil {
211+
return fmt.Errorf("glob failure: %w", err)
212+
}
213+
214+
if len(files) == 0 {
215+
return fmt.Errorf("no matching files for pattern %s", args[0])
216+
}
217+
218+
if len(files) > 1 {
219+
s.logger.Infof("Will read %d files", len(files))
220+
}
221+
222+
for _, file := range files {
223+
s.logger.Infof("Adding file %s to filelist", file)
224+
s.files = append(s.files, file)
225+
}
226+
227+
return nil
228+
}

0 commit comments

Comments
 (0)