From 9bbce683468a201203e30c73c51e69b0e876ddaa Mon Sep 17 00:00:00 2001 From: Coleton Pierson Date: Wed, 13 Apr 2022 13:56:39 +0700 Subject: [PATCH] feat(processor): added key-value and CEF processor (#49) --- go.mod | 2 + go.sum | 5 + internal/app/builtin.go | 51 +++++----- internal/processor/kv/cef.go | 143 +++++++++++++++++++++++++++ internal/processor/kv/cef_test.go | 156 ++++++++++++++++++++++++++++++ internal/processor/kv/kv.go | 140 +++++++++++++++++++++++++++ internal/processor/kv/kv_test.go | 47 +++++++++ 7 files changed, 521 insertions(+), 23 deletions(-) create mode 100644 internal/processor/kv/cef.go create mode 100644 internal/processor/kv/cef_test.go create mode 100644 internal/processor/kv/kv.go create mode 100644 internal/processor/kv/kv_test.go diff --git a/go.mod b/go.mod index 9a1bd9f..d090ad0 100644 --- a/go.mod +++ b/go.mod @@ -6,12 +6,14 @@ require ( cloud.google.com/go/pubsub v1.19.0 cloud.google.com/go/storage v1.10.0 github.com/aws/aws-sdk-go v1.43.18 + github.com/dlclark/regexp2 v1.4.0 github.com/go-resty/resty/v2 v2.7.0 github.com/golang/protobuf v1.5.2 github.com/google/cel-go v0.10.1 github.com/google/uuid v1.3.0 github.com/gookit/validate v1.3.1 github.com/influxdata/go-syslog/v3 v3.0.0 + github.com/jjeffery/kv v0.8.1 github.com/sirupsen/logrus v1.8.1 github.com/spf13/cobra v1.4.0 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index 422c0c1..5003137 100644 --- a/go.sum +++ b/go.sum @@ -108,6 +108,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dlclark/regexp2 v1.4.0 h1:F1rxgk7p4uKjwIQxBs9oAXe5CqrXlCduYEJvrF4u93E= +github.com/dlclark/regexp2 v1.4.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -268,6 +270,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/go-syslog/v3 v3.0.0 h1:jichmjSZlYK0VMmlz+k4WeOQd7z745YLsvGMqwtYt4I= github.com/influxdata/go-syslog/v3 v3.0.0/go.mod h1:tulsOp+CecTAYC27u9miMgq21GqXRW6VdKbOG+QSP4Q= +github.com/jjeffery/kv v0.8.1 h1:S4/KbfPVNTGM/YCt5F+Za93o09119VNtOT+rLL4Gls0= +github.com/jjeffery/kv v0.8.1/go.mod h1:iHA3uy+umBqxcJFr+e+gaGAv1OcyHlU6rSo3TcR61yQ= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= @@ -541,6 +545,7 @@ golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/app/builtin.go b/internal/app/builtin.go index 273fdec..bbfadb3 100644 --- a/internal/app/builtin.go +++ b/internal/app/builtin.go @@ -1,44 +1,49 @@ package app import ( - filein "github.com/ThoronicLLC/collector/internal/input/file" - "github.com/ThoronicLLC/collector/internal/input/msgraph" - pubsubin "github.com/ThoronicLLC/collector/internal/input/pubsub" - "github.com/ThoronicLLC/collector/internal/input/syslog" - fileout "github.com/ThoronicLLC/collector/internal/output/file" - "github.com/ThoronicLLC/collector/internal/output/gcs" - "github.com/ThoronicLLC/collector/internal/output/log_analytics" - pubsubout "github.com/ThoronicLLC/collector/internal/output/pubsub" - "github.com/ThoronicLLC/collector/internal/output/s3" - "github.com/ThoronicLLC/collector/internal/output/stdout" - "github.com/ThoronicLLC/collector/internal/processor/cel" - syslog_processor "github.com/ThoronicLLC/collector/internal/processor/syslog" "github.com/ThoronicLLC/collector/pkg/core" + + file_input "github.com/ThoronicLLC/collector/internal/input/file" + msgraph_input "github.com/ThoronicLLC/collector/internal/input/msgraph" + pubsub_input "github.com/ThoronicLLC/collector/internal/input/pubsub" + syslog_input "github.com/ThoronicLLC/collector/internal/input/syslog" + + cel_processor "github.com/ThoronicLLC/collector/internal/processor/cel" + kv_processor "github.com/ThoronicLLC/collector/internal/processor/kv" + syslog_processor "github.com/ThoronicLLC/collector/internal/processor/syslog" + + file_output "github.com/ThoronicLLC/collector/internal/output/file" + gcs_output "github.com/ThoronicLLC/collector/internal/output/gcs" + log_analytics_output "github.com/ThoronicLLC/collector/internal/output/log_analytics" + pubsub_output "github.com/ThoronicLLC/collector/internal/output/pubsub" + s3_output "github.com/ThoronicLLC/collector/internal/output/s3" + stdout_output "github.com/ThoronicLLC/collector/internal/output/stdout" ) func AddInternalInputs() map[string]core.InputHandler { return map[string]core.InputHandler{ - filein.InputName: filein.Handler(), - pubsubin.InputName: pubsubin.Handler(), - syslog.InputName: syslog.Handler(), - msgraph.InputName: msgraph.Handler(), + file_input.InputName: file_input.Handler(), + pubsub_input.InputName: pubsub_input.Handler(), + syslog_input.InputName: syslog_input.Handler(), + msgraph_input.InputName: msgraph_input.Handler(), } } func AddInternalProcessors() map[string]core.ProcessHandler { return map[string]core.ProcessHandler{ - cel.ProcessorName: cel.Handler(), + cel_processor.ProcessorName: cel_processor.Handler(), syslog_processor.ProcessorName: syslog_processor.Handler(), + kv_processor.ProcessorName: kv_processor.Handler(), } } func AddInternalOutputs() map[string]core.OutputHandler { return map[string]core.OutputHandler{ - fileout.OutputName: fileout.Handler(), - stdout.OutputName: stdout.Handler(), - s3.OutputName: s3.Handler(), - gcs.OutputName: gcs.Handler(), - log_analytics.OutputName: log_analytics.Handler(), - pubsubout.OutputName: pubsubout.Handler(), + file_output.OutputName: file_output.Handler(), + stdout_output.OutputName: stdout_output.Handler(), + s3_output.OutputName: s3_output.Handler(), + gcs_output.OutputName: gcs_output.Handler(), + log_analytics_output.OutputName: log_analytics_output.Handler(), + pubsub_output.OutputName: pubsub_output.Handler(), } } diff --git a/internal/processor/kv/cef.go b/internal/processor/kv/cef.go new file mode 100644 index 0000000..11d4a61 --- /dev/null +++ b/internal/processor/kv/cef.go @@ -0,0 +1,143 @@ +package kv + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/dlclark/regexp2" +) + +type cefEvent struct { + Version string + DeviceVendor string + DeviceProduct string + DeviceVersion string + DeviceEventClassId string + Name string + Severity string + Extensions map[string]string +} + +func parseCef(event string) ([]byte, error) { + cefMsg, err := cefStringToObject(event) + + if err != nil { + return nil, err + } + + // Marshal JSON string + jsonString, err := json.Marshal(cefMsg) + + // Handle errors + if err != nil { + fmt.Println(3) + return nil, err + } + + return jsonString, nil +} + +func cefStringToObject(cefString string) (*cefEvent, error) { + // Split by CEF separator + arr := strings.Split(cefString, "|") + + if len(arr) < 7 { + return nil, fmt.Errorf("invalid CEF format") + } + + version := "" + + if strings.Contains(arr[0], ":") { + // Split first field to validate CEF + validate := strings.Split(arr[0], ":") + + // Validate that it is a valid CEF message + if validate[0] != "CEF" { + return nil, fmt.Errorf("invalid CEF format") + } + + version = validate[1] + } else { + if _, err := strconv.Atoi(arr[0]); err != nil { + return nil, fmt.Errorf("invalid CEF format") + } + version = arr[0] + } + + // Get extensions + extensions := strings.Join(arr[7:], "|") + + // Replace colons with {{COLON}} + safeExtensions := strings.ReplaceAll(extensions, ":", "{{COLON}}") + safeExtensions = strings.ReplaceAll(safeExtensions, `\\=`, "{{EQUAL_ESCAPE_2}}") + safeExtensions = strings.ReplaceAll(safeExtensions, `\=`, "{{EQUAL_ESCAPE_1}}") + + // Replace non KV spaces with {{SPACE}} + re := regexp2.MustCompile(`\s(?!([\w\-]+)\=)`, 0) + safeExtensions2, err := re.Replace(safeExtensions, "{{SPACE}}", -1, -1) + + // Parse extensions in key value format + keyValueMap, err := parseKeyValue(safeExtensions2, true) + + // Handle error + if err != nil { + return nil, err + } + + // Restore colons + newKeyValueMap := make(map[string]string, 0) + for k, v := range keyValueMap { + newKey := strings.ReplaceAll(k, `{{SPACE}}`, " ") + newKey = strings.ReplaceAll(newKey, `{{EQUAL_ESCAPE_1}}`, `\=`) + newKey = strings.ReplaceAll(newKey, `{{EQUAL_ESCAPE_2}}`, `\\=`) + newKey = strings.ReplaceAll(newKey, `{{COLON}}`, ":") + + newValue := strings.ReplaceAll(v, `{{SPACE}}`, " ") + newValue = strings.ReplaceAll(newValue, `{{EQUAL_ESCAPE_1}}`, `\=`) + newValue = strings.ReplaceAll(newValue, `{{EQUAL_ESCAPE_2}}`, `\\=`) + newValue = strings.ReplaceAll(newValue, `{{COLON}}`, ":") + newValue = strings.TrimSpace(newValue) + + newKeyValueMap[newKey] = newValue + } + + // Build CEF event + cefEvent := &cefEvent{ + Version: version, + DeviceVendor: cefEscapeField(arr[1]), + DeviceProduct: cefEscapeField(arr[2]), + DeviceVersion: cefEscapeField(arr[3]), + DeviceEventClassId: cefEscapeField(arr[4]), + Name: cefEscapeField(arr[5]), + Severity: cefEscapeField(arr[6]), + Extensions: newKeyValueMap, + } + + return cefEvent, nil +} + +// Unescape CEF fields +func cefEscapeField(field string) string { + + replacer := strings.NewReplacer( + "\\\\", "\\", + "\\|", "|", + "\\n", "\n", + ) + + return replacer.Replace(field) +} + +// Unescape CEF extensions +func cefEscapeExtension(field string) string { + + replacer := strings.NewReplacer( + "\\\\", "\\", + "\\n", "\n", + "\\=", "=", + ) + + return replacer.Replace(field) +} diff --git a/internal/processor/kv/cef_test.go b/internal/processor/kv/cef_test.go new file mode 100644 index 0000000..486ab55 --- /dev/null +++ b/internal/processor/kv/cef_test.go @@ -0,0 +1,156 @@ +package kv + +import "testing" + +var cefMessage1 = "0|illusive|illusive|3.1.128.1719|illusive:heartbeat|Heartbeat|0|dvc=10.118.182.162 rt=1600239263565 cat=illusive:SYS" +var cefMessage2 = "CEF:0|Cool Vendor|Cool Product|1.0|FLAKY_EVENT|Something flaky happened.|3|requestClientApplication=Go-http-client/1.1 sourceAddress=127.0.0.1" +var cefMessage3 = `0|illusive|illusive|3.1.128.1719|illusive:audit|Audit|5|msg=theuser@domain.local logged out {User role \\= ROLE_ADMIN; Source address \\= 10.120.10.152} dvc=10.105.33.50 rt=1600239250955 duser=theuser@domain.local cat=illusive:info outcome=SUCCESS` +var cefMessage4 = `illusive|illusive|3.1.128.1719|illusive:heartbeat|Heartbeat|0|dvc=10.118.182.162 rt=1600239263565 cat=illusive:SYS` +var cefMessage5 = `CEF:0|Cool Vendor|Cool Product|1.0|FLAKY_EVENT|Something flaky happened.|3|requestClientApplication=Go-http-client/1.1 blank2= blank3= blank4= sourceAddress=127.0.0.1 blank=` + +func TestParseCef(t *testing.T) { + expectedVersion := "0" + expectedDeviceVendor := "illusive" + expectedDeviceEventClassId := "illusive:heartbeat" + expectedName := "Heartbeat" + + if _, err := parseCef(cefMessage1); err != nil { + t.Fatalf("failed to parse CEF message: %v", err) + } + + cefObj, err := cefStringToObject(cefMessage1) + + if err != nil { + t.Errorf("failed to parse CEF message: %v", err) + } + + if cefObj == nil { + t.Errorf("null CEF message returned") + } + + if cefObj.Version != expectedVersion { + t.Errorf("cefStringToObject(message1).Version got %s; expected %s", cefObj.Version, expectedVersion) + } + + if cefObj.DeviceVendor != expectedDeviceVendor { + t.Errorf("cefStringToObject(message1).DeviceVendor got %s; expected %s", cefObj.DeviceVendor, expectedDeviceVendor) + } + + if cefObj.Name != expectedName { + t.Errorf("cefStringToObject(message1).Name got %s; expected %s", cefObj.Name, expectedName) + } + + if cefObj.DeviceEventClassId != expectedDeviceEventClassId { + t.Errorf("cefStringToObject(message1).DeviceEventClassId got %s; expected %s", cefObj.DeviceEventClassId, expectedDeviceEventClassId) + } + +} + +func TestParseCef2(t *testing.T) { + expectedVersion := "0" + expectedDeviceVendor := "Cool Vendor" + expectedDeviceProduct := "Cool Product" + expectedDeviceEventClassId := "FLAKY_EVENT" + + if _, err := parseCef(cefMessage2); err != nil { + t.Errorf("failed to parse CEF message: %v", err) + } + + cefObj, err := cefStringToObject(cefMessage2) + + if err != nil { + t.Errorf("failed to parse CEF message: %v", err) + } + + if cefObj == nil { + t.Errorf("null CEF message returned") + } + + if cefObj.Version != expectedVersion { + t.Errorf("cefStringToObject(message2).Version got %s; expected %s", cefObj.Version, expectedVersion) + } + + if cefObj.DeviceVendor != expectedDeviceVendor { + t.Errorf("cefStringToObject(message2).DeviceVendor got %s; expected %s", cefObj.DeviceVendor, expectedDeviceVendor) + } + + if cefObj.DeviceProduct != expectedDeviceProduct { + t.Errorf("cefStringToObject(message2).DeviceProduct got %s; expected %s", cefObj.DeviceProduct, expectedDeviceProduct) + } + + if cefObj.DeviceEventClassId != expectedDeviceEventClassId { + t.Errorf("cefStringToObject(message2).DeviceEventClassId got %s; expected %s", cefObj.DeviceEventClassId, expectedDeviceEventClassId) + } + +} + +func TestParseCef3(t *testing.T) { + cefExpectedKeyValuePair1 := []string{"msg", `theuser@domain.local logged out {User role \\= ROLE_ADMIN; Source address \\= 10.120.10.152}`} + cefExpectedKeyValuePair2 := []string{"duser", "theuser@domain.local"} + cefExpectedKeyValuePair3 := []string{"outcome", "SUCCESS"} + cefExpectedKeyValues := [][]string{cefExpectedKeyValuePair1, cefExpectedKeyValuePair2, cefExpectedKeyValuePair3} + + if _, err := parseCef(cefMessage3); err != nil { + t.Errorf("failed to parse CEF message: %v", err) + } + + cefObj, err := cefStringToObject(cefMessage3) + + if err != nil { + t.Errorf("failed to parse CEF message: %v", err) + } + + if cefObj == nil { + t.Errorf("null CEF message returned") + } + + for _, v := range cefExpectedKeyValues { + if cefObj.Extensions[v[0]] != v[1] { + t.Errorf(`cefStringToObject(message3).Extensions["%s"] got %s; expected %s`, v[0], cefObj.Extensions[v[0]], v[1]) + } + } + +} + +func TestParseCef4(t *testing.T) { + if _, err := parseCef(cefMessage4); err == nil { + t.Errorf("failed to error on invalid CEF message: %v", err) + } +} + +func TestParseCef5(t *testing.T) { + expectedVersion := "0" + expectedDeviceVendor := "Cool Vendor" + expectedDeviceProduct := "Cool Product" + expectedDeviceEventClassId := "FLAKY_EVENT" + + if _, err := parseCef(cefMessage5); err != nil { + t.Errorf("failed to parse CEF message: %v", err) + } + + cefObj, err := cefStringToObject(cefMessage5) + + if err != nil { + t.Errorf("failed to parse CEF message: %v", err) + } + + if cefObj == nil { + t.Errorf("null CEF message returned") + } + + if cefObj.Version != expectedVersion { + t.Errorf("cefStringToObject(message5).Version got %s; expected %s", cefObj.Version, expectedVersion) + } + + if cefObj.DeviceVendor != expectedDeviceVendor { + t.Errorf("cefStringToObject(message5).DeviceVendor got %s; expected %s", cefObj.DeviceVendor, expectedDeviceVendor) + } + + if cefObj.DeviceProduct != expectedDeviceProduct { + t.Errorf("cefStringToObject(message5).DeviceProduct got %s; expected %s", cefObj.DeviceProduct, expectedDeviceProduct) + } + + if cefObj.DeviceEventClassId != expectedDeviceEventClassId { + t.Errorf("cefStringToObject(message5).DeviceEventClassId got %s; expected %s", cefObj.DeviceEventClassId, expectedDeviceEventClassId) + } +} diff --git a/internal/processor/kv/kv.go b/internal/processor/kv/kv.go new file mode 100644 index 0000000..29ed5a5 --- /dev/null +++ b/internal/processor/kv/kv.go @@ -0,0 +1,140 @@ +package kv + +import ( + "encoding/json" + "fmt" + "io" + "regexp" + "strings" + + "github.com/ThoronicLLC/collector/pkg/core" + "github.com/jjeffery/kv" + log "github.com/sirupsen/logrus" +) + +var ProcessorName = "kv" + +type Config struct { + Type string `json:"type" validate:"in:raw,cef"` +} + +type kvProcessor struct { + config Config + logger *log.Entry +} + +func Handler() core.ProcessHandler { + return func(config []byte) (core.Processor, error) { + // Set config defaults + conf := Config{} + + // Unmarshal config + err := json.Unmarshal(config, &conf) + if err != nil { + return nil, fmt.Errorf("issue unmarshalling CEL config: %s", err) + } + + // Validate config + err = core.ValidateStruct(&conf) + if err != nil { + return nil, err + } + + return &kvProcessor{ + config: conf, + logger: log.WithField("processor", "syslog"), + }, nil + } +} + +func (processor *kvProcessor) Process(inputFile string, writer io.Writer) error { + // Use the file reader utility to pass our function + err := core.FileReader(inputFile, func(s string) { + // Clean line of any extra spaces for CEL detection + cleanLine := strings.TrimSpace(s) + + // Return if clean line is empty + if cleanLine == "" { + if log.IsLevelEnabled(log.DebugLevel) { + processor.logger.Debugf("line is empty: %s", cleanLine) + } + return + } + + switch processor.config.Type { + case "raw": + msg, err := parseKV(cleanLine) + if err != nil { + processor.logger.Errorf("issue parsing line: %s", err) + } else { + _, _ = writer.Write(msg) + } + case "cef": + msg, err := parseCef(cleanLine) + if err != nil { + processor.logger.Errorf("issue parsing line: %s", err) + } else { + _, _ = writer.Write(msg) + } + } + + }) + + return err +} + +// parseKeyValue will take a key value formatted string and convert it into a key value map +func parseKeyValue(event string, cef bool) (map[string]string, error) { + // Clear out empty key values + reg := regexp.MustCompile("[a-zA-Z0-9]+=[ ]") + newEvent := reg.ReplaceAllString(event, " ") + + // fix ending + if newEvent[len(newEvent)-1] == '=' { + reg := regexp.MustCompile("[ ][a-zA-Z0-9]+=$") + newEvent = reg.ReplaceAllString(newEvent, "") + } + + // Use KeyValue library to parse + text, list := kv.Parse([]byte(newEvent)) + + // If return text then an error occurred during parsing + if len(text) > 0 { + return nil, fmt.Errorf(`invalid key value format at: "%s"`, string(text)) + } + + // Convert from list to a map + elementMap := make(map[string]string) + for i := 0; i < len(list); i += 2 { + key := list[i].(string) + value := list[i+1].(string) + if cef { + elementMap[cefEscapeExtension(key)] = cefEscapeExtension(value) + } else { + elementMap[key] = value + } + } + + return elementMap, nil +} + +// ConstructKeyValue will take a key value formatted string and convert it into a key value json object +func parseKV(event string) ([]byte, error) { + // Parse key value string + result, err := parseKeyValue(event, false) + + // Handle errors + if err != nil { + return nil, err + } + + // Marshal JSON string + jsonString, err := json.Marshal(result) + + // Handle errors + if err != nil { + return nil, err + } + + return jsonString, nil +} diff --git a/internal/processor/kv/kv_test.go b/internal/processor/kv/kv_test.go new file mode 100644 index 0000000..092d771 --- /dev/null +++ b/internal/processor/kv/kv_test.go @@ -0,0 +1,47 @@ +package kv + +import ( + "testing" + + "github.com/jjeffery/kv" +) + +var kvMessage1 = []byte(`dvc=10.118.182.162 rt=1600239263565 cat=illusive{{COLON}}SYS`) +var kvMessage2 = []byte(`message this stuff dvc=10.118.182.162 rt=1600239263565 cat=illusive{{COLON}}SYS`) +var kvMessage3 = []byte(`dvc==10.118.182.162 rt==1600239263565 cat==illusive{{COLON}}SYS`) + +func TestParseKV(t *testing.T) { + kvExpectedLength := 6 + kvExpectedKeyValuePair1 := []string{"dvc", "10.118.182.162"} + kvExpectedKeyValuePair2 := []string{"rt", "1600239263565"} + kvExpectedKeyValuePair3 := []string{"cat", "illusive{{COLON}}SYS"} + kvExpectedKeyValues := [][]string{kvExpectedKeyValuePair1, kvExpectedKeyValuePair2, kvExpectedKeyValuePair3} + + if text, list := kv.Parse(kvMessage1); text != nil { + t.Fatalf("failed to parse KV message") + } else if len(list) != kvExpectedLength { + t.Fatalf("len(list) got %v; expected %v", len(list), kvExpectedLength) + } + + resultMap, err := parseKeyValue(string(kvMessage1), false) + + if err != nil { + t.Fatalf("failed to parse KV message") + } + + for _, v := range kvExpectedKeyValues { + if resultMap[v[0]] != v[1] { + t.Errorf(`resultMap["%s"] got %s; expected %s`, v[0], resultMap[v[0]], v[1]) + } + } +} + +func TestParseKV2(t *testing.T) { + if text, _ := kv.Parse(kvMessage2); text == nil { + t.Errorf("failed to error on invalid KV message") + } + + if text, _ := kv.Parse(kvMessage3); text == nil { + t.Errorf("failed to error on invalid KV message") + } +}