Skip to content

Commit 9bbce68

Browse files
authored
feat(processor): added key-value and CEF processor (#49)
1 parent 807e2cc commit 9bbce68

File tree

7 files changed

+521
-23
lines changed

7 files changed

+521
-23
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ require (
66
cloud.google.com/go/pubsub v1.19.0
77
cloud.google.com/go/storage v1.10.0
88
github.com/aws/aws-sdk-go v1.43.18
9+
github.com/dlclark/regexp2 v1.4.0
910
github.com/go-resty/resty/v2 v2.7.0
1011
github.com/golang/protobuf v1.5.2
1112
github.com/google/cel-go v0.10.1
1213
github.com/google/uuid v1.3.0
1314
github.com/gookit/validate v1.3.1
1415
github.com/influxdata/go-syslog/v3 v3.0.0
16+
github.com/jjeffery/kv v0.8.1
1517
github.com/sirupsen/logrus v1.8.1
1618
github.com/spf13/cobra v1.4.0
1719
github.com/spf13/pflag v1.0.5

go.sum

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
108108
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
109109
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
110110
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
111+
github.com/dlclark/regexp2 v1.4.0 h1:F1rxgk7p4uKjwIQxBs9oAXe5CqrXlCduYEJvrF4u93E=
112+
github.com/dlclark/regexp2 v1.4.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc=
111113
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
112114
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
113115
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -268,6 +270,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH
268270
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
269271
github.com/influxdata/go-syslog/v3 v3.0.0 h1:jichmjSZlYK0VMmlz+k4WeOQd7z745YLsvGMqwtYt4I=
270272
github.com/influxdata/go-syslog/v3 v3.0.0/go.mod h1:tulsOp+CecTAYC27u9miMgq21GqXRW6VdKbOG+QSP4Q=
273+
github.com/jjeffery/kv v0.8.1 h1:S4/KbfPVNTGM/YCt5F+Za93o09119VNtOT+rLL4Gls0=
274+
github.com/jjeffery/kv v0.8.1/go.mod h1:iHA3uy+umBqxcJFr+e+gaGAv1OcyHlU6rSo3TcR61yQ=
271275
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
272276
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
273277
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
@@ -541,6 +545,7 @@ golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5h
541545
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
542546
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
543547
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
548+
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
544549
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
545550
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
546551
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

internal/app/builtin.go

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,49 @@
11
package app
22

33
import (
4-
filein "github.com/ThoronicLLC/collector/internal/input/file"
5-
"github.com/ThoronicLLC/collector/internal/input/msgraph"
6-
pubsubin "github.com/ThoronicLLC/collector/internal/input/pubsub"
7-
"github.com/ThoronicLLC/collector/internal/input/syslog"
8-
fileout "github.com/ThoronicLLC/collector/internal/output/file"
9-
"github.com/ThoronicLLC/collector/internal/output/gcs"
10-
"github.com/ThoronicLLC/collector/internal/output/log_analytics"
11-
pubsubout "github.com/ThoronicLLC/collector/internal/output/pubsub"
12-
"github.com/ThoronicLLC/collector/internal/output/s3"
13-
"github.com/ThoronicLLC/collector/internal/output/stdout"
14-
"github.com/ThoronicLLC/collector/internal/processor/cel"
15-
syslog_processor "github.com/ThoronicLLC/collector/internal/processor/syslog"
164
"github.com/ThoronicLLC/collector/pkg/core"
5+
6+
file_input "github.com/ThoronicLLC/collector/internal/input/file"
7+
msgraph_input "github.com/ThoronicLLC/collector/internal/input/msgraph"
8+
pubsub_input "github.com/ThoronicLLC/collector/internal/input/pubsub"
9+
syslog_input "github.com/ThoronicLLC/collector/internal/input/syslog"
10+
11+
cel_processor "github.com/ThoronicLLC/collector/internal/processor/cel"
12+
kv_processor "github.com/ThoronicLLC/collector/internal/processor/kv"
13+
syslog_processor "github.com/ThoronicLLC/collector/internal/processor/syslog"
14+
15+
file_output "github.com/ThoronicLLC/collector/internal/output/file"
16+
gcs_output "github.com/ThoronicLLC/collector/internal/output/gcs"
17+
log_analytics_output "github.com/ThoronicLLC/collector/internal/output/log_analytics"
18+
pubsub_output "github.com/ThoronicLLC/collector/internal/output/pubsub"
19+
s3_output "github.com/ThoronicLLC/collector/internal/output/s3"
20+
stdout_output "github.com/ThoronicLLC/collector/internal/output/stdout"
1721
)
1822

1923
func AddInternalInputs() map[string]core.InputHandler {
2024
return map[string]core.InputHandler{
21-
filein.InputName: filein.Handler(),
22-
pubsubin.InputName: pubsubin.Handler(),
23-
syslog.InputName: syslog.Handler(),
24-
msgraph.InputName: msgraph.Handler(),
25+
file_input.InputName: file_input.Handler(),
26+
pubsub_input.InputName: pubsub_input.Handler(),
27+
syslog_input.InputName: syslog_input.Handler(),
28+
msgraph_input.InputName: msgraph_input.Handler(),
2529
}
2630
}
2731

2832
func AddInternalProcessors() map[string]core.ProcessHandler {
2933
return map[string]core.ProcessHandler{
30-
cel.ProcessorName: cel.Handler(),
34+
cel_processor.ProcessorName: cel_processor.Handler(),
3135
syslog_processor.ProcessorName: syslog_processor.Handler(),
36+
kv_processor.ProcessorName: kv_processor.Handler(),
3237
}
3338
}
3439

3540
func AddInternalOutputs() map[string]core.OutputHandler {
3641
return map[string]core.OutputHandler{
37-
fileout.OutputName: fileout.Handler(),
38-
stdout.OutputName: stdout.Handler(),
39-
s3.OutputName: s3.Handler(),
40-
gcs.OutputName: gcs.Handler(),
41-
log_analytics.OutputName: log_analytics.Handler(),
42-
pubsubout.OutputName: pubsubout.Handler(),
42+
file_output.OutputName: file_output.Handler(),
43+
stdout_output.OutputName: stdout_output.Handler(),
44+
s3_output.OutputName: s3_output.Handler(),
45+
gcs_output.OutputName: gcs_output.Handler(),
46+
log_analytics_output.OutputName: log_analytics_output.Handler(),
47+
pubsub_output.OutputName: pubsub_output.Handler(),
4348
}
4449
}

internal/processor/kv/cef.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package kv
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"strconv"
7+
"strings"
8+
9+
"github.com/dlclark/regexp2"
10+
)
11+
12+
type cefEvent struct {
13+
Version string
14+
DeviceVendor string
15+
DeviceProduct string
16+
DeviceVersion string
17+
DeviceEventClassId string
18+
Name string
19+
Severity string
20+
Extensions map[string]string
21+
}
22+
23+
func parseCef(event string) ([]byte, error) {
24+
cefMsg, err := cefStringToObject(event)
25+
26+
if err != nil {
27+
return nil, err
28+
}
29+
30+
// Marshal JSON string
31+
jsonString, err := json.Marshal(cefMsg)
32+
33+
// Handle errors
34+
if err != nil {
35+
fmt.Println(3)
36+
return nil, err
37+
}
38+
39+
return jsonString, nil
40+
}
41+
42+
func cefStringToObject(cefString string) (*cefEvent, error) {
43+
// Split by CEF separator
44+
arr := strings.Split(cefString, "|")
45+
46+
if len(arr) < 7 {
47+
return nil, fmt.Errorf("invalid CEF format")
48+
}
49+
50+
version := ""
51+
52+
if strings.Contains(arr[0], ":") {
53+
// Split first field to validate CEF
54+
validate := strings.Split(arr[0], ":")
55+
56+
// Validate that it is a valid CEF message
57+
if validate[0] != "CEF" {
58+
return nil, fmt.Errorf("invalid CEF format")
59+
}
60+
61+
version = validate[1]
62+
} else {
63+
if _, err := strconv.Atoi(arr[0]); err != nil {
64+
return nil, fmt.Errorf("invalid CEF format")
65+
}
66+
version = arr[0]
67+
}
68+
69+
// Get extensions
70+
extensions := strings.Join(arr[7:], "|")
71+
72+
// Replace colons with {{COLON}}
73+
safeExtensions := strings.ReplaceAll(extensions, ":", "{{COLON}}")
74+
safeExtensions = strings.ReplaceAll(safeExtensions, `\\=`, "{{EQUAL_ESCAPE_2}}")
75+
safeExtensions = strings.ReplaceAll(safeExtensions, `\=`, "{{EQUAL_ESCAPE_1}}")
76+
77+
// Replace non KV spaces with {{SPACE}}
78+
re := regexp2.MustCompile(`\s(?!([\w\-]+)\=)`, 0)
79+
safeExtensions2, err := re.Replace(safeExtensions, "{{SPACE}}", -1, -1)
80+
81+
// Parse extensions in key value format
82+
keyValueMap, err := parseKeyValue(safeExtensions2, true)
83+
84+
// Handle error
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
// Restore colons
90+
newKeyValueMap := make(map[string]string, 0)
91+
for k, v := range keyValueMap {
92+
newKey := strings.ReplaceAll(k, `{{SPACE}}`, " ")
93+
newKey = strings.ReplaceAll(newKey, `{{EQUAL_ESCAPE_1}}`, `\=`)
94+
newKey = strings.ReplaceAll(newKey, `{{EQUAL_ESCAPE_2}}`, `\\=`)
95+
newKey = strings.ReplaceAll(newKey, `{{COLON}}`, ":")
96+
97+
newValue := strings.ReplaceAll(v, `{{SPACE}}`, " ")
98+
newValue = strings.ReplaceAll(newValue, `{{EQUAL_ESCAPE_1}}`, `\=`)
99+
newValue = strings.ReplaceAll(newValue, `{{EQUAL_ESCAPE_2}}`, `\\=`)
100+
newValue = strings.ReplaceAll(newValue, `{{COLON}}`, ":")
101+
newValue = strings.TrimSpace(newValue)
102+
103+
newKeyValueMap[newKey] = newValue
104+
}
105+
106+
// Build CEF event
107+
cefEvent := &cefEvent{
108+
Version: version,
109+
DeviceVendor: cefEscapeField(arr[1]),
110+
DeviceProduct: cefEscapeField(arr[2]),
111+
DeviceVersion: cefEscapeField(arr[3]),
112+
DeviceEventClassId: cefEscapeField(arr[4]),
113+
Name: cefEscapeField(arr[5]),
114+
Severity: cefEscapeField(arr[6]),
115+
Extensions: newKeyValueMap,
116+
}
117+
118+
return cefEvent, nil
119+
}
120+
121+
// Unescape CEF fields
122+
func cefEscapeField(field string) string {
123+
124+
replacer := strings.NewReplacer(
125+
"\\\\", "\\",
126+
"\\|", "|",
127+
"\\n", "\n",
128+
)
129+
130+
return replacer.Replace(field)
131+
}
132+
133+
// Unescape CEF extensions
134+
func cefEscapeExtension(field string) string {
135+
136+
replacer := strings.NewReplacer(
137+
"\\\\", "\\",
138+
"\\n", "\n",
139+
"\\=", "=",
140+
)
141+
142+
return replacer.Replace(field)
143+
}

0 commit comments

Comments
 (0)