Skip to content

Commit 71cb6b1

Browse files
authored
refact pkg/acquisition: split k8s_audit.go (#4033)
1 parent 1c715d3 commit 71cb6b1

File tree

7 files changed

+263
-226
lines changed

7 files changed

+263
-226
lines changed

pkg/acquisition/k8s.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import (
88

99
var (
1010
// verify interface compliance
11-
_ DataSource = (*k8sauditacquisition.KubernetesAuditSource)(nil)
12-
_ Tailer = (*k8sauditacquisition.KubernetesAuditSource)(nil)
13-
_ MetricsProvider = (*k8sauditacquisition.KubernetesAuditSource)(nil)
11+
_ DataSource = (*k8sauditacquisition.Source)(nil)
12+
_ Tailer = (*k8sauditacquisition.Source)(nil)
13+
_ MetricsProvider = (*k8sauditacquisition.Source)(nil)
1414
)
1515

1616
//nolint:gochecknoinits
1717
func init() {
18-
registerDataSource("k8s-audit", func() DataSource { return &k8sauditacquisition.KubernetesAuditSource{} })
18+
registerDataSource("k8s-audit", func() DataSource { return &k8sauditacquisition.Source{} })
1919
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package kubernetesauditacquisition
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net/http"
8+
9+
yaml "github.com/goccy/go-yaml"
10+
log "github.com/sirupsen/logrus"
11+
12+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
13+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
14+
)
15+
16+
type Configuration struct {
17+
ListenAddr string `yaml:"listen_addr"`
18+
ListenPort int `yaml:"listen_port"`
19+
WebhookPath string `yaml:"webhook_path"`
20+
configuration.DataSourceCommonCfg `yaml:",inline"`
21+
}
22+
23+
func (s *Source) UnmarshalConfig(yamlConfig []byte) error {
24+
k8sConfig := Configuration{}
25+
26+
err := yaml.UnmarshalWithOptions(yamlConfig, &k8sConfig, yaml.Strict())
27+
if err != nil {
28+
return fmt.Errorf("cannot parse k8s-audit configuration: %s", yaml.FormatError(err, false, false))
29+
}
30+
31+
s.config = k8sConfig
32+
33+
if s.config.ListenAddr == "" {
34+
return errors.New("listen_addr cannot be empty")
35+
}
36+
37+
if s.config.ListenPort == 0 {
38+
return errors.New("listen_port cannot be empty")
39+
}
40+
41+
if s.config.WebhookPath == "" {
42+
return errors.New("webhook_path cannot be empty")
43+
}
44+
45+
if s.config.WebhookPath[0] != '/' {
46+
s.config.WebhookPath = "/" + s.config.WebhookPath
47+
}
48+
49+
if s.config.Mode == "" {
50+
s.config.Mode = configuration.TAIL_MODE
51+
}
52+
53+
return nil
54+
}
55+
56+
func (s *Source) Configure(_ context.Context, config []byte, logger *log.Entry, metricsLevel metrics.AcquisitionMetricsLevel) error {
57+
s.logger = logger
58+
s.metricsLevel = metricsLevel
59+
60+
err := s.UnmarshalConfig(config)
61+
if err != nil {
62+
return err
63+
}
64+
65+
s.logger.Tracef("K8SAudit configuration: %+v", s.config)
66+
67+
s.addr = fmt.Sprintf("%s:%d", s.config.ListenAddr, s.config.ListenPort)
68+
69+
s.mux = http.NewServeMux()
70+
71+
s.server = &http.Server{
72+
Addr: s.addr,
73+
Handler: s.mux,
74+
Protocols: &http.Protocols{},
75+
}
76+
77+
s.server.Protocols.SetHTTP1(true)
78+
s.server.Protocols.SetUnencryptedHTTP2(true)
79+
s.server.Protocols.SetHTTP2(true)
80+
81+
s.mux.HandleFunc(s.config.WebhookPath, s.webhookHandler)
82+
83+
return nil
84+
}

pkg/acquisition/modules/kubernetesaudit/k8s_audit.go

Lines changed: 0 additions & 218 deletions
This file was deleted.

pkg/acquisition/modules/kubernetesaudit/k8s_audit_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ source: k8s-audit
4848
listen_addr: 0.0.0.0
4949
listen_port: true
5050
`,
51-
expectedErr: `[4:14] cannot unmarshal bool into Go struct field KubernetesAuditConfiguration.ListenPort of type int`,
51+
expectedErr: `[4:14] cannot unmarshal bool into Go struct field Configuration.ListenPort of type int`,
5252
},
5353
}
5454

5555
for _, test := range tests {
5656
t.Run(test.name, func(t *testing.T) {
57-
f := KubernetesAuditSource{}
57+
f := Source{}
5858

5959
err := f.UnmarshalConfig([]byte(test.config))
6060
cstest.RequireErrorContains(t, err, test.expectedErr)
@@ -86,7 +86,7 @@ webhook_path: /k8s-audit`,
8686
out := make(chan pipeline.Event)
8787
tb := &tomb.Tomb{}
8888

89-
f := KubernetesAuditSource{}
89+
f := Source{}
9090

9191
err := f.UnmarshalConfig([]byte(test.config))
9292

@@ -244,7 +244,7 @@ func TestHandler(t *testing.T) {
244244
}
245245
})
246246

247-
f := KubernetesAuditSource{}
247+
f := Source{}
248248

249249
port := 49234+idx
250250
config := fmt.Sprintf(`source: k8s-audit
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package kubernetesauditacquisition
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
6+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
7+
)
8+
9+
func (*Source) GetMetrics() []prometheus.Collector {
10+
return []prometheus.Collector{
11+
metrics.K8SAuditDataSourceEventCount,
12+
metrics.K8SAuditDataSourceRequestCount,
13+
}
14+
}
15+
16+
func (*Source) GetAggregMetrics() []prometheus.Collector {
17+
return []prometheus.Collector{
18+
metrics.K8SAuditDataSourceEventCount,
19+
metrics.K8SAuditDataSourceRequestCount,
20+
}
21+
}

0 commit comments

Comments
 (0)