Skip to content

Commit 2c60ffe

Browse files
authored
refact pkg/acquisition: split appsec.go (#4043)
1 parent c9105fa commit 2c60ffe

File tree

5 files changed

+271
-241
lines changed

5 files changed

+271
-241
lines changed

pkg/acquisition/appsec.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import (
88

99
var (
1010
// verify interface compliance
11-
_ DataSource = (*appsecacquisition.AppsecSource)(nil)
12-
_ Tailer = (*appsecacquisition.AppsecSource)(nil)
13-
_ MetricsProvider = (*appsecacquisition.AppsecSource)(nil)
11+
_ DataSource = (*appsecacquisition.Source)(nil)
12+
_ Tailer = (*appsecacquisition.Source)(nil)
13+
_ MetricsProvider = (*appsecacquisition.Source)(nil)
1414
)
1515

1616
//nolint:gochecknoinits
1717
func init() {
18-
registerDataSource("appsec", func() DataSource { return &appsecacquisition.AppsecSource{} })
18+
registerDataSource("appsec", func() DataSource { return &appsecacquisition.Source{} })
1919
}
Lines changed: 6 additions & 237 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,21 @@ import (
77
"encoding/json"
88
"errors"
99
"fmt"
10-
"io/fs"
11-
"net"
1210
"net/http"
1311
"os"
14-
"sync"
1512
"time"
1613

1714
yaml "github.com/goccy/go-yaml"
1815
"github.com/google/uuid"
1916
"github.com/prometheus/client_golang/prometheus"
2017
log "github.com/sirupsen/logrus"
21-
"gopkg.in/tomb.v2"
22-
23-
"github.com/crowdsecurity/go-cs-lib/trace"
2418

2519
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
26-
"github.com/crowdsecurity/crowdsec/pkg/apiclient"
2720
"github.com/crowdsecurity/crowdsec/pkg/apiclient/useragent"
2821
"github.com/crowdsecurity/crowdsec/pkg/appsec"
2922
"github.com/crowdsecurity/crowdsec/pkg/appsec/allowlists"
3023
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
31-
"github.com/crowdsecurity/crowdsec/pkg/csnet"
3224
"github.com/crowdsecurity/crowdsec/pkg/metrics"
33-
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
34-
)
35-
36-
const (
37-
InBand = "inband"
38-
OutOfBand = "outofband"
3925
)
4026

4127
var (
@@ -46,7 +32,7 @@ var (
4632
var DefaultAuthCacheDuration = (1 * time.Minute)
4733

4834
// configuration structure of the acquis for the application security engine
49-
type AppsecSourceConfig struct {
35+
type Configuration struct {
5036
ListenAddr string `yaml:"listen_addr"`
5137
ListenSocket string `yaml:"listen_socket"`
5238
CertFilePath string `yaml:"cert_file"`
@@ -60,63 +46,7 @@ type AppsecSourceConfig struct {
6046
configuration.DataSourceCommonCfg `yaml:",inline"`
6147
}
6248

63-
// runtime structure of AppsecSourceConfig
64-
type AppsecSource struct {
65-
config AppsecSourceConfig
66-
logger *log.Entry
67-
mux *http.ServeMux
68-
server *http.Server
69-
InChan chan appsec.ParsedRequest
70-
AppsecRuntime *appsec.AppsecRuntimeConfig
71-
AppsecConfigs map[string]appsec.AppsecConfig
72-
lapiURL string
73-
AuthCache AuthCache
74-
AppsecRunners []AppsecRunner // one for each go-routine
75-
appsecAllowlistClient *allowlists.AppsecAllowlist
76-
lapiCACertPool *x509.CertPool
77-
authMutex sync.Mutex
78-
httpClient *http.Client
79-
}
80-
81-
// Struct to handle cache of authentication
82-
type AuthCache struct {
83-
APIKeys map[string]time.Time
84-
mu sync.RWMutex
85-
}
86-
87-
func NewAuthCache() AuthCache {
88-
return AuthCache{
89-
APIKeys: make(map[string]time.Time, 0),
90-
mu: sync.RWMutex{},
91-
}
92-
}
93-
94-
func (ac *AuthCache) Set(apiKey string, expiration time.Time) {
95-
ac.mu.Lock()
96-
ac.APIKeys[apiKey] = expiration
97-
ac.mu.Unlock()
98-
}
99-
100-
func (ac *AuthCache) Get(apiKey string) (time.Time, bool) {
101-
ac.mu.RLock()
102-
expiration, exists := ac.APIKeys[apiKey]
103-
ac.mu.RUnlock()
104-
105-
return expiration, exists
106-
}
107-
108-
func (ac *AuthCache) Delete(apiKey string) {
109-
ac.mu.Lock()
110-
delete(ac.APIKeys, apiKey)
111-
ac.mu.Unlock()
112-
}
113-
114-
// @tko + @sbl : we might want to get rid of that or improve it
115-
type BodyResponse struct {
116-
Action string `json:"action"`
117-
}
118-
119-
func (w *AppsecSource) UnmarshalConfig(yamlConfig []byte) error {
49+
func (w *Source) UnmarshalConfig(yamlConfig []byte) error {
12050
err := yaml.UnmarshalWithOptions(yamlConfig, &w.config, yaml.Strict())
12151
if err != nil {
12252
return fmt.Errorf("cannot parse appsec configuration: %s", yaml.FormatError(err, false, false))
@@ -168,16 +98,6 @@ func (w *AppsecSource) UnmarshalConfig(yamlConfig []byte) error {
16898
return nil
16999
}
170100

171-
func (*AppsecSource) GetMetrics() []prometheus.Collector {
172-
return []prometheus.Collector{metrics.AppsecReqCounter, metrics.AppsecBlockCounter, metrics.AppsecRuleHits,
173-
metrics.AppsecOutbandParsingHistogram, metrics.AppsecInbandParsingHistogram, metrics.AppsecGlobalParsingHistogram}
174-
}
175-
176-
func (*AppsecSource) GetAggregMetrics() []prometheus.Collector {
177-
return []prometheus.Collector{metrics.AppsecReqCounter, metrics.AppsecBlockCounter, metrics.AppsecRuleHits,
178-
metrics.AppsecOutbandParsingHistogram, metrics.AppsecInbandParsingHistogram, metrics.AppsecGlobalParsingHistogram}
179-
}
180-
181101
func loadCertPool(caCertPath string, logger log.FieldLogger) (*x509.CertPool, error) {
182102
caCertPool, err := x509.SystemCertPool()
183103
if err != nil {
@@ -200,7 +120,7 @@ func loadCertPool(caCertPath string, logger log.FieldLogger) (*x509.CertPool, er
200120
return caCertPool, nil
201121
}
202122

203-
func (w *AppsecSource) Configure(_ context.Context, yamlConfig []byte, logger *log.Entry, _ metrics.AcquisitionMetricsLevel) error {
123+
func (w *Source) Configure(_ context.Context, yamlConfig []byte, logger *log.Entry, _ metrics.AcquisitionMetricsLevel) error {
204124
err := w.UnmarshalConfig(yamlConfig)
205125
if err != nil {
206126
return fmt.Errorf("unable to parse appsec configuration: %w", err)
@@ -321,158 +241,7 @@ func (w *AppsecSource) Configure(_ context.Context, yamlConfig []byte, logger *l
321241
return nil
322242
}
323243

324-
func (w *AppsecSource) GetMode() string {
325-
return w.config.Mode
326-
}
327-
328-
func (*AppsecSource) GetName() string {
329-
return "appsec"
330-
}
331-
332-
func (w *AppsecSource) listenAndServe(ctx context.Context, t *tomb.Tomb) error {
333-
defer trace.CatchPanic("crowdsec/acquis/appsec/listenAndServe")
334-
335-
w.logger.Infof("%d appsec runner to start", len(w.AppsecRunners))
336-
337-
serverError := make(chan error, 2)
338-
339-
startServer := func(listener net.Listener, canTLS bool) {
340-
var err error
341-
342-
if canTLS && (w.config.CertFilePath != "" || w.config.KeyFilePath != "") {
343-
if w.config.KeyFilePath == "" {
344-
serverError <- errors.New("missing TLS key file")
345-
return
346-
}
347-
348-
if w.config.CertFilePath == "" {
349-
serverError <- errors.New("missing TLS cert file")
350-
return
351-
}
352-
353-
err = w.server.ServeTLS(listener, w.config.CertFilePath, w.config.KeyFilePath)
354-
} else {
355-
err = w.server.Serve(listener)
356-
}
357-
358-
switch {
359-
case errors.Is(err, http.ErrServerClosed):
360-
break
361-
case err != nil:
362-
serverError <- err
363-
}
364-
}
365-
366-
listenConfig := &net.ListenConfig{}
367-
368-
// Starting Unix socket listener
369-
go func(socket string) {
370-
if socket == "" {
371-
return
372-
}
373-
374-
if err := os.Remove(w.config.ListenSocket); err != nil {
375-
if !errors.Is(err, fs.ErrNotExist) {
376-
w.logger.Errorf("can't remove socket %s: %s", socket, err)
377-
}
378-
}
379-
380-
w.logger.Infof("creating unix socket %s", socket)
381-
382-
listener, err := listenConfig.Listen(ctx, "unix", socket)
383-
if err != nil {
384-
serverError <- csnet.WrapSockErr(err, socket)
385-
return
386-
}
387-
388-
w.logger.Infof("Appsec listening on Unix socket %s", socket)
389-
startServer(listener, false)
390-
}(w.config.ListenSocket)
391-
392-
// Starting TCP listener
393-
go func(url string) {
394-
if url == "" {
395-
return
396-
}
397-
398-
listener, err := listenConfig.Listen(ctx, "tcp", url)
399-
if err != nil {
400-
serverError <- fmt.Errorf("listening on %s: %w", url, err)
401-
return
402-
}
403-
404-
w.logger.Infof("Appsec listening on %s", url)
405-
startServer(listener, true)
406-
}(w.config.ListenAddr)
407-
408-
select {
409-
case err := <-serverError:
410-
return err
411-
case <-t.Dying():
412-
w.logger.Info("Shutting down Appsec server")
413-
// xx let's clean up the appsec runners :)
414-
appsec.AppsecRulesDetails = make(map[int]appsec.RulesDetails)
415-
416-
if err := w.server.Shutdown(ctx); err != nil {
417-
w.logger.Errorf("Error shutting down Appsec server: %s", err.Error())
418-
}
419-
420-
if w.config.ListenSocket != "" {
421-
if err := os.Remove(w.config.ListenSocket); err != nil {
422-
if !errors.Is(err, fs.ErrNotExist) {
423-
w.logger.Errorf("can't remove socket %s: %s", w.config.ListenSocket, err)
424-
}
425-
}
426-
}
427-
}
428-
429-
return nil
430-
}
431-
432-
func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
433-
apiClient, err := apiclient.GetLAPIClient()
434-
if err != nil {
435-
return fmt.Errorf("unable to get authenticated LAPI client: %w", err)
436-
}
437-
438-
err = w.appsecAllowlistClient.Start(ctx, apiClient)
439-
if err != nil {
440-
w.logger.Errorf("failed to fetch allowlists for appsec, disabling them: %s", err)
441-
} else {
442-
w.appsecAllowlistClient.StartRefresh(ctx, t)
443-
}
444-
445-
t.Go(func() error {
446-
defer trace.CatchPanic("crowdsec/acquis/appsec/live")
447-
448-
for _, runner := range w.AppsecRunners {
449-
runner.outChan = out
450-
451-
t.Go(func() error {
452-
defer trace.CatchPanic("crowdsec/acquis/appsec/live/runner")
453-
return runner.Run(t)
454-
})
455-
}
456-
457-
return w.listenAndServe(ctx, t)
458-
})
459-
460-
return nil
461-
}
462-
463-
func (*AppsecSource) CanRun() error {
464-
return nil
465-
}
466-
467-
func (w *AppsecSource) GetUuid() string {
468-
return w.config.UniqueId
469-
}
470-
471-
func (w *AppsecSource) Dump() any {
472-
return w
473-
}
474-
475-
func (w *AppsecSource) isValidKey(ctx context.Context, apiKey string) (bool, error) {
244+
func (w *Source) isValidKey(ctx context.Context, apiKey string) (bool, error) {
476245
req, err := http.NewRequestWithContext(ctx, http.MethodHead, w.lapiURL, http.NoBody)
477246
if err != nil {
478247
w.logger.Errorf("Error creating request: %s", err)
@@ -493,7 +262,7 @@ func (w *AppsecSource) isValidKey(ctx context.Context, apiKey string) (bool, err
493262
return resp.StatusCode == http.StatusOK, nil
494263
}
495264

496-
func (w *AppsecSource) checkAuth(ctx context.Context, apiKey string) error {
265+
func (w *Source) checkAuth(ctx context.Context, apiKey string) error {
497266
if apiKey == "" {
498267
return errMissingAPIKey
499268
}
@@ -536,7 +305,7 @@ func (w *AppsecSource) checkAuth(ctx context.Context, apiKey string) error {
536305
}
537306

538307
// should this be in the runner ?
539-
func (w *AppsecSource) appsecHandler(rw http.ResponseWriter, r *http.Request) {
308+
func (w *Source) appsecHandler(rw http.ResponseWriter, r *http.Request) {
540309
ctx := r.Context()
541310
w.logger.Debugf("Received request from '%s' on %s", r.RemoteAddr, r.URL.Path)
542311

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,29 @@
11
package appsecacquisition
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
6+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
7+
)
8+
9+
func (*Source) GetMetrics() []prometheus.Collector {
10+
return []prometheus.Collector{
11+
metrics.AppsecReqCounter,
12+
metrics.AppsecBlockCounter,
13+
metrics.AppsecRuleHits,
14+
metrics.AppsecOutbandParsingHistogram,
15+
metrics.AppsecInbandParsingHistogram,
16+
metrics.AppsecGlobalParsingHistogram,
17+
}
18+
}
19+
20+
func (*Source) GetAggregMetrics() []prometheus.Collector {
21+
return []prometheus.Collector{
22+
metrics.AppsecReqCounter,
23+
metrics.AppsecBlockCounter,
24+
metrics.AppsecRuleHits,
25+
metrics.AppsecOutbandParsingHistogram,
26+
metrics.AppsecInbandParsingHistogram,
27+
metrics.AppsecGlobalParsingHistogram,
28+
}
29+
}

0 commit comments

Comments
 (0)