diff --git a/.chloggen/feat_aws-logs-waf-logs.yaml b/.chloggen/feat_aws-logs-waf-logs.yaml new file mode 100644 index 0000000000000..37228f83f1fd2 --- /dev/null +++ b/.chloggen/feat_aws-logs-waf-logs.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: extension/awslogs_encoding + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adopt encoding extension streaming contract for WAF logs + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [46214] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/extension/encoding/awslogsencodingextension/README.md b/extension/encoding/awslogsencodingextension/README.md index 9200db2def1d5..51de195cfbe2b 100644 --- a/extension/encoding/awslogsencodingextension/README.md +++ b/extension/encoding/awslogsencodingextension/README.md @@ -185,6 +185,7 @@ The table below summarizes streaming support details for each log type, along wi |---------------------|---------------------|-----------------------------|----------------------------------------------------------------------------------------------| | Network Firewall | Alert/Flow/TLS | Bytes processed | | | Subscription filter | - | Number of records processed | Supports processing multi-line inputs and offset tracks number of records that get processed | +| WAF Logs | - | Bytes processed | | ## Produced Records per Format diff --git a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/benchmark_test.go b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/benchmark_test.go index 03ec45f1e0456..533c0caa44e43 100644 --- a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/benchmark_test.go +++ b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/benchmark_test.go @@ -44,7 +44,7 @@ func BenchmarkUnmarshalLogs(b *testing.B) { }, } - u := wafLogUnmarshaler{ + u := WafLogUnmarshaler{ buildInfo: component.BuildInfo{}, } diff --git a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/missing_webaclid_log.json b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/missing_webaclid_log.json index 48d31d524cfb8..feef745e039ab 100644 --- a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/missing_webaclid_log.json +++ b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/missing_webaclid_log.json @@ -1,100 +1 @@ -{ - "timestamp":1748208718574, - "formatVersion":1, - "terminatingRuleId":"Default_Action", - "terminatingRuleType":"REGULAR", - "action":"ALLOW", - "terminatingRuleMatchDetails":[ ], - "httpSourceName":"CF", - "httpSourceId":"E3DTJP8YLL6OBQ", - "ruleGroupList":[ ], - "rateBasedRuleList":[ - { - "rateBasedRuleId":"arn:aws:wafv2:us-east-1:123456789101_MANAGED:global/ipset/e3132a63-134d-4da9-a0c4-b166ddd6de6c_77ce5c35-14fa-4731-9710-86216d568f12_IPV4/77ce5c35-14fa-4731-9710-86216d568f12", - "rateBasedRuleName":"rule-1", - "limitKey":"IP", - "maxRateAllowed":10000, - "evaluationWindowSec":300, - "limitValue":"178.84.204.171" - } - ], - "nonTerminatingMatchingRules":[ ], - "requestHeadersInserted":null, - "responseCodeSent":null, - "httpRequest":{ - "clientIp":"178.84.204.171", - "country":"NL", - "headers":[ - { - "name":"host", - "value":"dsx1234tsajqz63.cloudfront.net" - }, - { - "name":"user-agent", - "value":"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:138.0) Gecko/20100101 Firefox/138.0" - }, - { - "name":"accept", - "value":"image/avif,image/webp,image/png,image/svg+xml,image/*;q=0.8,*/*;q=0.5" - }, - { - "name":"accept-language", - "value":"en-US,en;q=0.5" - }, - { - "name":"accept-encoding", - "value":"gzip, deflate, br, zstd" - }, - { - "name":"referer", - "value":"https://dsx88tsajqz63.cloudfront.net/" - }, - { - "name":"sec-fetch-dest", - "value":"image" - }, - { - "name":"sec-fetch-mode", - "value":"no-cors" - }, - { - "name":"sec-fetch-site", - "value":"same-origin" - }, - { - "name":"dnt", - "value":"1" - }, - { - "name":"sec-gpc", - "value":"1" - }, - { - "name":"priority", - "value":"u=6" - }, - { - "name":"te", - "value":"trailers" - } - ], - "uri":"/favicon.ico", - "args":"", - "httpVersion":"HTTP/2.0", - "httpMethod":"GET", - "requestId":"n6LHLPqblIh_4qRsVj0940K9LxKyrkiUUE7lyMol1eTptabtlhHiXQ==", - "fragment":"", - "scheme":"https", - "host":"dsx88tsajqz63.cloudfront.net" - }, - "labels":[ - { - "name":"awswaf:clientip:geo:country:NL" - }, - { - "name":"awswaf:clientip:geo:region:NL-NH" - } - ], - "ja3Fingerprint":"6f7889b9fb1a62a9577e685c1fcfa919", - "ja4Fingerprint":"t13d1717h2_5b57614c22b0_3cbfd9057e0d" -} \ No newline at end of file +{"timestamp":1748208718574,"formatVersion":1,"terminatingRuleId":"Default_Action","terminatingRuleType":"REGULAR","action":"ALLOW","terminatingRuleMatchDetails":[],"httpSourceName":"CF","httpSourceId":"E3DTJP8YLL6OBQ","ruleGroupList":[],"rateBasedRuleList":[{"rateBasedRuleId":"arn:aws:wafv2:us-east-1:123456789101_MANAGED:global/ipset/e3132a63-134d-4da9-a0c4-b166ddd6de6c_77ce5c35-14fa-4731-9710-86216d568f12_IPV4/77ce5c35-14fa-4731-9710-86216d568f12","rateBasedRuleName":"rule-1","limitKey":"IP","maxRateAllowed":10000,"evaluationWindowSec":300,"limitValue":"178.84.204.171"}],"nonTerminatingMatchingRules":[],"requestHeadersInserted":null,"responseCodeSent":null,"httpRequest":{"clientIp":"178.84.204.171","country":"NL","headers":[{"name":"host","value":"dsx1234tsajqz63.cloudfront.net"},{"name":"user-agent","value":"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:138.0) Gecko/20100101 Firefox/138.0"},{"name":"accept","value":"image/avif,image/webp,image/png,image/svg+xml,image/*;q=0.8,*/*;q=0.5"},{"name":"accept-language","value":"en-US,en;q=0.5"},{"name":"accept-encoding","value":"gzip, deflate, br, zstd"},{"name":"referer","value":"https://dsx88tsajqz63.cloudfront.net/"},{"name":"sec-fetch-dest","value":"image"},{"name":"sec-fetch-mode","value":"no-cors"},{"name":"sec-fetch-site","value":"same-origin"},{"name":"dnt","value":"1"},{"name":"sec-gpc","value":"1"},{"name":"priority","value":"u=6"},{"name":"te","value":"trailers"}],"uri":"/favicon.ico","args":"","httpVersion":"HTTP/2.0","httpMethod":"GET","requestId":"n6LHLPqblIh_4qRsVj0940K9LxKyrkiUUE7lyMol1eTptabtlhHiXQ==","fragment":"","scheme":"https","host":"dsx88tsajqz63.cloudfront.net"},"labels":[{"name":"awswaf:clientip:geo:country:NL"},{"name":"awswaf:clientip:geo:region:NL-NH"}],"ja3Fingerprint":"6f7889b9fb1a62a9577e685c1fcfa919","ja4Fingerprint":"t13d1717h2_5b57614c22b0_3cbfd9057e0d"} \ No newline at end of file diff --git a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log.json b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log.json index 2b90e20093d0a..b5607a91c292e 100644 --- a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log.json +++ b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log.json @@ -1,101 +1 @@ -{ - "timestamp":1748208718574, - "formatVersion":1, - "webaclId":"arn:aws:wafv2:us-east-1:123456789101:global/webacl/open-telemetry-waf/e3132a63-134d-4da9-a0c4-b166ddd6de6c", - "terminatingRuleId":"Default_Action", - "terminatingRuleType":"REGULAR", - "action":"ALLOW", - "terminatingRuleMatchDetails":[ ], - "httpSourceName":"CF", - "httpSourceId":"E3DTJP8YLL6OBQ", - "ruleGroupList":[ ], - "rateBasedRuleList":[ - { - "rateBasedRuleId":"arn:aws:wafv2:us-east-1:123456789101_MANAGED:global/ipset/e3132a63-134d-4da9-a0c4-b166ddd6de6c_77ce5c35-14fa-4731-9710-86216d568f12_IPV4/77ce5c35-14fa-4731-9710-86216d568f12", - "rateBasedRuleName":"rule-1", - "limitKey":"IP", - "maxRateAllowed":10000, - "evaluationWindowSec":300, - "limitValue":"178.84.204.171" - } - ], - "nonTerminatingMatchingRules":[ ], - "requestHeadersInserted":null, - "responseCodeSent":null, - "httpRequest":{ - "clientIp":"178.84.204.171", - "country":"NL", - "headers":[ - { - "name":"host", - "value":"dsx1234tsajqz63.cloudfront.net" - }, - { - "name":"user-agent", - "value":"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:138.0) Gecko/20100101 Firefox/138.0" - }, - { - "name":"accept", - "value":"image/avif,image/webp,image/png,image/svg+xml,image/*;q=0.8,*/*;q=0.5" - }, - { - "name":"accept-language", - "value":"en-US,en;q=0.5" - }, - { - "name":"accept-encoding", - "value":"gzip, deflate, br, zstd" - }, - { - "name":"referer", - "value":"https://dsx88tsajqz63.cloudfront.net/" - }, - { - "name":"sec-fetch-dest", - "value":"image" - }, - { - "name":"sec-fetch-mode", - "value":"no-cors" - }, - { - "name":"sec-fetch-site", - "value":"same-origin" - }, - { - "name":"dnt", - "value":"1" - }, - { - "name":"sec-gpc", - "value":"1" - }, - { - "name":"priority", - "value":"u=6" - }, - { - "name":"te", - "value":"trailers" - } - ], - "uri":"/favicon.ico", - "args":"", - "httpVersion":"HTTP/2.0", - "httpMethod":"GET", - "requestId":"n6LHLPqblIh_4qRsVj0940K9LxKyrkiUUE7lyMol1eTptabtlhHiXQ==", - "fragment":"", - "scheme":"https", - "host":"dsx88tsajqz63.cloudfront.net" - }, - "labels":[ - { - "name":"awswaf:clientip:geo:country:NL" - }, - { - "name":"awswaf:clientip:geo:region:NL-NH" - } - ], - "ja3Fingerprint":"6f7889b9fb1a62a9577e685c1fcfa919", - "ja4Fingerprint":"t13d1717h2_5b57614c22b0_3cbfd9057e0d" -} \ No newline at end of file +{"timestamp":1748208718574,"formatVersion":1,"webaclId":"arn:aws:wafv2:us-east-1:123456789101:global/webacl/open-telemetry-waf/e3132a63-134d-4da9-a0c4-b166ddd6de6c","terminatingRuleId":"Default_Action","terminatingRuleType":"REGULAR","action":"ALLOW","terminatingRuleMatchDetails":[],"httpSourceName":"CF","httpSourceId":"E3DTJP8YLL6OBQ","ruleGroupList":[],"rateBasedRuleList":[{"rateBasedRuleId":"arn:aws:wafv2:us-east-1:123456789101_MANAGED:global/ipset/e3132a63-134d-4da9-a0c4-b166ddd6de6c_77ce5c35-14fa-4731-9710-86216d568f12_IPV4/77ce5c35-14fa-4731-9710-86216d568f12","rateBasedRuleName":"rule-1","limitKey":"IP","maxRateAllowed":10000,"evaluationWindowSec":300,"limitValue":"178.84.204.171"}],"nonTerminatingMatchingRules":[],"requestHeadersInserted":null,"responseCodeSent":null,"httpRequest":{"clientIp":"178.84.204.171","country":"NL","headers":[{"name":"host","value":"dsx1234tsajqz63.cloudfront.net"},{"name":"user-agent","value":"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:138.0) Gecko/20100101 Firefox/138.0"},{"name":"accept","value":"image/avif,image/webp,image/png,image/svg+xml,image/*;q=0.8,*/*;q=0.5"},{"name":"accept-language","value":"en-US,en;q=0.5"},{"name":"accept-encoding","value":"gzip, deflate, br, zstd"},{"name":"referer","value":"https://dsx88tsajqz63.cloudfront.net/"},{"name":"sec-fetch-dest","value":"image"},{"name":"sec-fetch-mode","value":"no-cors"},{"name":"sec-fetch-site","value":"same-origin"},{"name":"dnt","value":"1"},{"name":"sec-gpc","value":"1"},{"name":"priority","value":"u=6"},{"name":"te","value":"trailers"}],"uri":"/favicon.ico","args":"","httpVersion":"HTTP/2.0","httpMethod":"GET","requestId":"n6LHLPqblIh_4qRsVj0940K9LxKyrkiUUE7lyMol1eTptabtlhHiXQ==","fragment":"","scheme":"https","host":"dsx88tsajqz63.cloudfront.net"},"labels":[{"name":"awswaf:clientip:geo:country:NL"},{"name":"awswaf:clientip:geo:region:NL-NH"}],"ja3Fingerprint":"6f7889b9fb1a62a9577e685c1fcfa919","ja4Fingerprint":"t13d1717h2_5b57614c22b0_3cbfd9057e0d"} \ No newline at end of file diff --git a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log_multi.json b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log_multi.json new file mode 100644 index 0000000000000..aab2e8e59d830 --- /dev/null +++ b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log_multi.json @@ -0,0 +1,2 @@ +{"timestamp":1748208718574,"formatVersion":1,"webaclId":"arn:aws:wafv2:us-east-1:123456789101:global/webacl/open-telemetry-waf/e3132a63-134d-4da9-a0c4-b166ddd6de6c","terminatingRuleId":"Default_Action","terminatingRuleType":"REGULAR","action":"ALLOW","terminatingRuleMatchDetails":[],"httpSourceName":"CF","httpSourceId":"E3DTJP8YLL6OBQ","ruleGroupList":[],"rateBasedRuleList":[{"rateBasedRuleId":"arn:aws:wafv2:us-east-1:123456789101_MANAGED:global/ipset/e3132a63-134d-4da9-a0c4-b166ddd6de6c_77ce5c35-14fa-4731-9710-86216d568f12_IPV4/77ce5c35-14fa-4731-9710-86216d568f12","rateBasedRuleName":"rule-1","limitKey":"IP","maxRateAllowed":10000,"evaluationWindowSec":300,"limitValue":"178.84.204.171"}],"nonTerminatingMatchingRules":[],"requestHeadersInserted":null,"responseCodeSent":null,"httpRequest":{"clientIp":"178.84.204.171","country":"NL","headers":[{"name":"host","value":"dsx1234tsajqz63.cloudfront.net"},{"name":"user-agent","value":"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:138.0) Gecko/20100101 Firefox/138.0"},{"name":"accept","value":"image/avif,image/webp,image/png,image/svg+xml,image/*;q=0.8,*/*;q=0.5"},{"name":"accept-language","value":"en-US,en;q=0.5"},{"name":"accept-encoding","value":"gzip, deflate, br, zstd"},{"name":"referer","value":"https://dsx88tsajqz63.cloudfront.net/"},{"name":"sec-fetch-dest","value":"image"},{"name":"sec-fetch-mode","value":"no-cors"},{"name":"sec-fetch-site","value":"same-origin"},{"name":"dnt","value":"1"},{"name":"sec-gpc","value":"1"},{"name":"priority","value":"u=6"},{"name":"te","value":"trailers"}],"uri":"/favicon.ico","args":"","httpVersion":"HTTP/2.0","httpMethod":"GET","requestId":"n6LHLPqblIh_4qRsVj0940K9LxKyrkiUUE7lyMol1eTptabtlhHiXQ==","fragment":"","scheme":"https","host":"dsx88tsajqz63.cloudfront.net"},"labels":[{"name":"awswaf:clientip:geo:country:NL"},{"name":"awswaf:clientip:geo:region:NL-NH"}],"ja3Fingerprint":"6f7889b9fb1a62a9577e685c1fcfa919","ja4Fingerprint":"t13d1717h2_5b57614c22b0_3cbfd9057e0d"} +{"timestamp":1683355579981,"formatVersion":1,"webaclId":"arn:aws:wafv2:us-east-1:123456789101:global/webacl/open-telemetry-waf/e3132a63-134d-4da9-a0c4-b166ddd6de6c","terminatingRuleId":"RateBasedRule","terminatingRuleType":"RATE_BASED","action":"BLOCK","terminatingRuleMatchDetails":[],"httpSourceName":"APIGW","httpSourceId":"EXAMPLE11:rjvegx5guh:CanaryTest","ruleGroupList":[],"rateBasedRuleList":[{"rateBasedRuleId":"arn:aws:wafv2:us-east-1:123456789101_MANAGED:global/ipset/e3132a63-134d-4da9-a0c4-b166ddd6de6c_77ce5c35-14fa-4731-9710-86216d568f12_IPV4/77ce5c35-14fa-4731-9710-86216d568f12","rateBasedRuleName":"RateBasedRule","limitKey":"CUSTOMKEYS","maxRateAllowed":100,"evaluationWindowSec":"120","customValues":[{"key":"HEADER","name":"dogname","value":"ella"}]}],"nonTerminatingMatchingRules":[],"requestHeadersInserted":null,"responseCodeSent":null,"httpRequest":{"clientIp":"52.46.82.45","country":"FR","headers":[{"name":"X-Forwarded-For","value":"52.46.82.45"},{"name":"X-Forwarded-Proto","value":"https"},{"name":"X-Forwarded-Port","value":"443"},{"name":"Host","value":"rjvegx5guh.execute-api.eu-west-3.amazonaws.com"},{"name":"X-Amzn-Trace-Id","value":"Root=1-645566cf-7cb058b04d9bb3ee01dc4036"},{"name":"dogname","value":"ella"},{"name":"User-Agent","value":"RateBasedRuleTestKoipOneKeyModulePV2"},{"name":"Accept-Encoding","value":"gzip,deflate"}],"uri":"/CanaryTest","args":"","httpVersion":"HTTP/1.1","httpMethod":"GET","requestId":"Ed0AiHF_CGYF-DA="}} \ No newline at end of file diff --git a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log_multi_1.yaml b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log_multi_1.yaml new file mode 100644 index 0000000000000..28b8164721b92 --- /dev/null +++ b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log_multi_1.yaml @@ -0,0 +1,113 @@ +resourceLogs: + - resource: + attributes: + - key: cloud.provider + value: + stringValue: aws + - key: cloud.region + value: + stringValue: us-east-1 + - key: cloud.account.id + value: + stringValue: "123456789101" + - key: cloud.resource_id + value: + stringValue: arn:aws:wafv2:us-east-1:123456789101:global/webacl/open-telemetry-waf/e3132a63-134d-4da9-a0c4-b166ddd6de6c + scopeLogs: + - logRecords: + - attributes: + - key: network.protocol.name + value: + stringValue: http + - key: network.protocol.version + value: + stringValue: "2.0" + - key: aws.waf.terminating_rule.type + value: + stringValue: REGULAR + - key: aws.waf.terminating_rule.id + value: + stringValue: Default_Action + - key: aws.waf.action + value: + stringValue: ALLOW + - key: aws.waf.source.id + value: + stringValue: E3DTJP8YLL6OBQ + - key: aws.waf.source.name + value: + stringValue: CF + - key: http.request.header.host + value: + stringValue: dsx1234tsajqz63.cloudfront.net + - key: http.request.header.user-agent + value: + stringValue: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:138.0) Gecko/20100101 Firefox/138.0 + - key: http.request.header.accept + value: + stringValue: image/avif,image/webp,image/png,image/svg+xml,image/*;q=0.8,*/*;q=0.5 + - key: http.request.header.accept-language + value: + stringValue: en-US,en;q=0.5 + - key: http.request.header.accept-encoding + value: + stringValue: gzip, deflate, br, zstd + - key: http.request.header.referer + value: + stringValue: https://dsx88tsajqz63.cloudfront.net/ + - key: http.request.header.sec-fetch-dest + value: + stringValue: image + - key: http.request.header.sec-fetch-mode + value: + stringValue: no-cors + - key: http.request.header.sec-fetch-site + value: + stringValue: same-origin + - key: http.request.header.dnt + value: + stringValue: "1" + - key: http.request.header.sec-gpc + value: + stringValue: "1" + - key: http.request.header.priority + value: + stringValue: u=6 + - key: http.request.header.te + value: + stringValue: trailers + - key: client.address + value: + stringValue: 178.84.204.171 + - key: server.address + value: + stringValue: dsx88tsajqz63.cloudfront.net + - key: url.path + value: + stringValue: /favicon.ico + - key: http.request.method + value: + stringValue: GET + - key: aws.request_id + value: + stringValue: n6LHLPqblIh_4qRsVj0940K9LxKyrkiUUE7lyMol1eTptabtlhHiXQ== + - key: url.scheme + value: + stringValue: https + - key: geo.country.iso_code + value: + stringValue: NL + - key: tls.client.ja3 + value: + stringValue: 6f7889b9fb1a62a9577e685c1fcfa919 + - key: tls.client.ja4 + value: + stringValue: t13d1717h2_5b57614c22b0_3cbfd9057e0d + body: {} + timeUnixNano: "1748208718574000000" + scope: + attributes: + - key: encoding.format + value: + stringValue: aws.waf + name: github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension \ No newline at end of file diff --git a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log_multi_2.yaml b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log_multi_2.yaml new file mode 100644 index 0000000000000..9c220c241078c --- /dev/null +++ b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/testdata/valid_log_multi_2.yaml @@ -0,0 +1,86 @@ +resourceLogs: + - resource: + attributes: + - key: cloud.provider + value: + stringValue: aws + - key: cloud.region + value: + stringValue: us-east-1 + - key: cloud.account.id + value: + stringValue: "123456789101" + - key: cloud.resource_id + value: + stringValue: arn:aws:wafv2:us-east-1:123456789101:global/webacl/open-telemetry-waf/e3132a63-134d-4da9-a0c4-b166ddd6de6c + scopeLogs: + - logRecords: + - attributes: + - key: network.protocol.name + value: + stringValue: http + - key: network.protocol.version + value: + stringValue: "1.1" + - key: aws.waf.terminating_rule.type + value: + stringValue: RATE_BASED + - key: aws.waf.terminating_rule.id + value: + stringValue: RateBasedRule + - key: aws.waf.action + value: + stringValue: BLOCK + - key: aws.waf.source.id + value: + stringValue: EXAMPLE11:rjvegx5guh:CanaryTest + - key: aws.waf.source.name + value: + stringValue: APIGW + - key: http.request.header.X-Forwarded-For + value: + stringValue: 52.46.82.45 + - key: http.request.header.X-Forwarded-Proto + value: + stringValue: https + - key: http.request.header.X-Forwarded-Port + value: + stringValue: "443" + - key: http.request.header.Host + value: + stringValue: rjvegx5guh.execute-api.eu-west-3.amazonaws.com + - key: http.request.header.X-Amzn-Trace-Id + value: + stringValue: Root=1-645566cf-7cb058b04d9bb3ee01dc4036 + - key: http.request.header.dogname + value: + stringValue: ella + - key: http.request.header.User-Agent + value: + stringValue: RateBasedRuleTestKoipOneKeyModulePV2 + - key: http.request.header.Accept-Encoding + value: + stringValue: gzip,deflate + - key: client.address + value: + stringValue: 52.46.82.45 + - key: url.path + value: + stringValue: /CanaryTest + - key: http.request.method + value: + stringValue: GET + - key: aws.request_id + value: + stringValue: Ed0AiHF_CGYF-DA= + - key: geo.country.iso_code + value: + stringValue: FR + body: {} + timeUnixNano: "1683355579981000000" + scope: + attributes: + - key: encoding.format + value: + stringValue: aws.waf + name: github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension \ No newline at end of file diff --git a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/unmarshaler.go b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/unmarshaler.go index 1c52666b7b690..ad837040a040a 100644 --- a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/unmarshaler.go +++ b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/unmarshaler.go @@ -4,7 +4,6 @@ package waf // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf" import ( - "bufio" "errors" "fmt" "io" @@ -16,21 +15,13 @@ import ( "go.opentelemetry.io/collector/pdata/plog" conventions "go.opentelemetry.io/otel/semconv/v1.38.0" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension/internal/unmarshaler" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xstreamencoding" ) -type wafLogUnmarshaler struct { - buildInfo component.BuildInfo -} - -func NewWAFLogUnmarshaler(buildInfo component.BuildInfo) unmarshaler.AWSUnmarshaler { - return &wafLogUnmarshaler{ - buildInfo: buildInfo, - } -} - // See log fields: https://docs.aws.amazon.com/waf/latest/developerguide/logging-fields.html. type wafLog struct { Timestamp int64 `json:"timestamp"` @@ -61,86 +52,115 @@ type wafLog struct { Ja4Fingerprint string `json:"ja4Fingerprint"` } -func (w *wafLogUnmarshaler) UnmarshalAWSLogs(reader io.Reader) (plog.Logs, error) { - logs := plog.NewLogs() - - resourceLogs := logs.ResourceLogs().AppendEmpty() - resourceLogs.Resource().Attributes().PutStr( - string(conventions.CloudProviderKey), - conventions.CloudProviderAWS.Value.AsString(), - ) +var _ unmarshaler.StreamingLogsUnmarshaler = (*WafLogUnmarshaler)(nil) - scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() - scopeLogs.Scope().SetName(metadata.ScopeName) - scopeLogs.Scope().SetVersion(w.buildInfo.Version) - scopeLogs.Scope().Attributes().PutStr(constants.FormatIdentificationTag, "aws."+constants.FormatWAFLog) - - scanner := bufio.NewScanner(reader) - webACLID := "" - for scanner.Scan() { - logLine := scanner.Bytes() +type WafLogUnmarshaler struct { + buildInfo component.BuildInfo +} - var log wafLog - if err := gojson.Unmarshal(logLine, &log); err != nil { - return plog.Logs{}, fmt.Errorf("failed to unmarshal WAF log: %w", err) - } - if log.WebACLID == "" { - return plog.Logs{}, errors.New("invalid WAF log: empty webaclId field") - } - if webACLID != "" && log.WebACLID != webACLID { - return plog.Logs{}, fmt.Errorf( - "unexpected: new webaclId %q is different than previous one %q", - webACLID, - log.WebACLID, - ) - } - webACLID = log.WebACLID +func NewWAFLogUnmarshaler(buildInfo component.BuildInfo) *WafLogUnmarshaler { + return &WafLogUnmarshaler{ + buildInfo: buildInfo, + } +} - record := scopeLogs.LogRecords().AppendEmpty() - if err := w.addWAFLog(log, record); err != nil { - return plog.Logs{}, err - } +func (w *WafLogUnmarshaler) UnmarshalAWSLogs(reader io.Reader) (plog.Logs, error) { + // Decode as a stream but flush all at once using flush options + streamUnmarshaler, err := w.NewLogsDecoder(reader, encoding.WithFlushItems(0), encoding.WithFlushBytes(0)) + if err != nil { + return plog.Logs{}, err } - if err := setResourceAttributes(resourceLogs, webACLID); err != nil { - return plog.Logs{}, fmt.Errorf("failed to get resource attributes: %w", err) + logs, err := streamUnmarshaler.DecodeLogs() + if err != nil { + //nolint:errorlint + if err == io.EOF { + // EOF indicates no logs were found, return any logs that's available + return logs, nil + } + return plog.Logs{}, err } return logs, nil } -// setResourceAttributes based on the web ACL ID -func setResourceAttributes(resourceLogs plog.ResourceLogs, webACLID string) error { - expectedFormat := "arn:aws:wafv2:::/webacl//" - value, remaining, _ := strings.Cut(webACLID, "arn:aws:wafv2:") - if value != "" { - return fmt.Errorf("webaclId %q does not have expected prefix %q", webACLID, "arn:aws:wafv2:") - } - if remaining == "" { - return fmt.Errorf("webaclId %q contains no data after expected prefix %q", webACLID, "arn:aws:wafv2:") +// NewLogsDecoder returns a LogsDecoder that processes AWS WAF logs from the provided reader. +// Parses JSON-formatted logs containing WAF events (web ACL evaluations, actions, HTTP request details). +// Supports offset-based streaming; offset tracks bytes processed +func (w *WafLogUnmarshaler) NewLogsDecoder(reader io.Reader, options ...encoding.DecoderOption) (encoding.LogsDecoder, error) { + scannerHelper, err := xstreamencoding.NewScannerHelper(reader, options...) + if err != nil { + return nil, err } - value, remaining, _ = strings.Cut(remaining, ":") - if value == "" { - return fmt.Errorf("could not find region in webaclId %q", webACLID) - } - resourceLogs.Resource().Attributes().PutStr(string(conventions.CloudRegionKey), value) + var sharedWebACLID string + + decodeF := func() (plog.Logs, error) { + logs := plog.NewLogs() + + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceLogs.Resource().Attributes().PutStr( + string(conventions.CloudProviderKey), + conventions.CloudProviderAWS.Value.AsString(), + ) + + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + scopeLogs.Scope().SetName(metadata.ScopeName) + scopeLogs.Scope().SetVersion(w.buildInfo.Version) + scopeLogs.Scope().Attributes().PutStr(constants.FormatIdentificationTag, "aws."+constants.FormatWAFLog) + + for { + logLine, flush, err := scannerHelper.ScanBytes() + if err != nil { + if !errors.Is(err, io.EOF) { + return plog.Logs{}, fmt.Errorf("error reading WAF logs from stream:: %w", err) + } + + if len(logLine) == 0 { + break + } + } + + var log wafLog + if err := gojson.Unmarshal(logLine, &log); err != nil { + return plog.Logs{}, fmt.Errorf("failed to unmarshal WAF log: %w", err) + } + if log.WebACLID == "" { + return plog.Logs{}, errors.New("invalid WAF log: empty webaclId field") + } + if sharedWebACLID != "" && log.WebACLID != sharedWebACLID { + return plog.Logs{}, fmt.Errorf( + "unexpected: new webaclId %q is different than previous one %q", + log.WebACLID, + sharedWebACLID, + ) + } + sharedWebACLID = log.WebACLID + record := scopeLogs.LogRecords().AppendEmpty() + if err := w.addWAFLog(log, record); err != nil { + return plog.Logs{}, err + } + + if flush { + break + } + } - value, remaining, _ = strings.Cut(remaining, ":") - if value == "" { - return fmt.Errorf("could not find account in webaclId %q", webACLID) - } - resourceLogs.Resource().Attributes().PutStr(string(conventions.CloudAccountIDKey), value) + if err := setResourceAttributes(resourceLogs, sharedWebACLID); err != nil { + return plog.Logs{}, fmt.Errorf("failed to get resource attributes: %w", err) + } - if remaining == "" { - return fmt.Errorf("webaclId %q does not have expected format %q", webACLID, expectedFormat) + if scopeLogs.LogRecords().Len() == 0 { + return logs, io.EOF + } + + return logs, nil } - resourceLogs.Resource().Attributes().PutStr(string(conventions.CloudResourceIDKey), webACLID) - return nil + return xstreamencoding.NewLogsDecoderAdapter(decodeF, scannerHelper.Offset), nil } -func (*wafLogUnmarshaler) addWAFLog(log wafLog, record plog.LogRecord) error { +func (*WafLogUnmarshaler) addWAFLog(log wafLog, record plog.LogRecord) error { // timestamp is in milliseconds, so we need to convert it to ns first nanos := log.Timestamp * 1_000_000 ts := pcommon.Timestamp(nanos) @@ -193,3 +213,34 @@ func (*wafLogUnmarshaler) addWAFLog(log wafLog, record plog.LogRecord) error { return nil } + +// setResourceAttributes based on the web ACL ID +func setResourceAttributes(resourceLogs plog.ResourceLogs, webACLID string) error { + expectedFormat := "arn:aws:wafv2:::/webacl//" + value, remaining, _ := strings.Cut(webACLID, "arn:aws:wafv2:") + if value != "" { + return fmt.Errorf("webaclId %q does not have expected prefix %q", webACLID, "arn:aws:wafv2:") + } + if remaining == "" { + return fmt.Errorf("webaclId %q contains no data after expected prefix %q", webACLID, "arn:aws:wafv2:") + } + + value, remaining, _ = strings.Cut(remaining, ":") + if value == "" { + return fmt.Errorf("could not find region in webaclId %q", webACLID) + } + resourceLogs.Resource().Attributes().PutStr(string(conventions.CloudRegionKey), value) + + value, remaining, _ = strings.Cut(remaining, ":") + if value == "" { + return fmt.Errorf("could not find account in webaclId %q", webACLID) + } + resourceLogs.Resource().Attributes().PutStr(string(conventions.CloudAccountIDKey), value) + + if remaining == "" { + return fmt.Errorf("webaclId %q does not have expected format %q", webACLID, expectedFormat) + } + + resourceLogs.Resource().Attributes().PutStr(string(conventions.CloudResourceIDKey), webACLID) + return nil +} diff --git a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/unmarshaler_test.go b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/unmarshaler_test.go index c2aba338cc6ff..7ca055126f569 100644 --- a/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/unmarshaler_test.go +++ b/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf/unmarshaler_test.go @@ -5,17 +5,19 @@ package waf import ( "bytes" + "errors" + "fmt" "io" "os" "path/filepath" "testing" - gojson "github.com/goccy/go-json" "github.com/klauspost/compress/gzip" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/plog" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" ) @@ -38,10 +40,8 @@ func compressToGZIPReader(t *testing.T, buf []byte) io.Reader { func readAndCompressLogFile(t *testing.T, dir, file string) io.Reader { data, err := os.ReadFile(filepath.Join(dir, file)) require.NoError(t, err) - compacted := bytes.NewBuffer([]byte{}) - err = gojson.Compact(compacted, data) require.NoError(t, err) - return compressToGZIPReader(t, compacted.Bytes()) + return compressToGZIPReader(t, data) } func TestUnmarshalLogs(t *testing.T) { @@ -67,7 +67,7 @@ func TestUnmarshalLogs(t *testing.T) { }, } - u := wafLogUnmarshaler{buildInfo: component.BuildInfo{}} + u := WafLogUnmarshaler{buildInfo: component.BuildInfo{}} for name, test := range tests { t.Run(name, func(t *testing.T) { logs, err := u.UnmarshalAWSLogs(test.reader) @@ -85,6 +85,66 @@ func TestUnmarshalLogs(t *testing.T) { } } +func TestNewLogsDecoder(t *testing.T) { + directory := "testdata" + expectPattern := "valid_log_multi_%d.yaml" + + tests := []struct { + name string + offset int64 + index int + }{ + { + name: "Normal streaming", + offset: 0, + index: 0, + }, + { + name: "Stream with offset", + offset: 1983, // skip first record + index: 1, // start from first index + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + input := readAndCompressLogFile(t, directory, "valid_log_multi.json") + + wafUnmarshaler := NewWAFLogUnmarshaler(component.BuildInfo{}) + // Flush after every log for testing purposes & set offset + streamer, err := wafUnmarshaler.NewLogsDecoder(input, encoding.WithFlushItems(1), encoding.WithOffset(tt.offset)) + require.NoError(t, err) + + index := tt.index + for { + index++ + + var logs plog.Logs + logs, err = streamer.DecodeLogs() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + t.Errorf("failed to unmarshal log %d: %v", index, err) + } + + // To check or update offset, uncomment offset below + // fmt.Println(streamer.Offset()) + + var expectedLogs plog.Logs + expectedLogs, err = golden.ReadLogs(filepath.Join(directory, fmt.Sprintf(expectPattern, index))) + require.NoError(t, err) + require.NoError(t, plogtest.CompareLogs(expectedLogs, logs, plogtest.IgnoreResourceLogsOrder())) + } + + // expect EOF after all logs are read + _, err = streamer.DecodeLogs() + require.ErrorIs(t, err, io.EOF) + }) + } +} + func TestSetKeyAttributes(t *testing.T) { t.Parallel()