diff --git a/.chloggen/feature_awslambdareceiver-s3-multi-format.yaml b/.chloggen/feature_awslambdareceiver-s3-multi-format.yaml new file mode 100644 index 0000000000000..75c5673dc224c --- /dev/null +++ b/.chloggen/feature_awslambdareceiver-s3-multi-format.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/awslambda + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add multi-format S3 log routing via the new `encodings` field in the S3 receiver configuration. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [46458] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The new `encodings` list field enables a single Lambda deployment to route S3 objects to + different decoders based on their key prefix. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/awslambdareceiver/README.md b/receiver/awslambdareceiver/README.md index 286f135bc83fe..dfaf46b17461f 100644 --- a/receiver/awslambdareceiver/README.md +++ b/receiver/awslambdareceiver/README.md @@ -165,6 +165,80 @@ service: Similar to the first example, this configuration is for collecting ELB access logs stored in S3. +### Multi-Format S3 Configuration (encodings) + +The `encodings` field enables routing different S3 object key patterns to different decoders within +a single Lambda deployment. This is useful when a Lambda function receives events from: + +- **A single S3 bucket that stores multiple log types** — for example, VPC Flow Logs and CloudTrail + logs written to the same bucket under different key prefixes. +- **Multiple S3 buckets with different log types** — for example, one bucket for VPC Flow Logs and + another for WAF logs, both configured to trigger the same Lambda function. + +`encoding` (single, top-level) and `encodings` (list) are mutually exclusive — use one or the other. + +Each entry in `encodings` supports three fields: + +| Field | Required | Description | +|----------------|----------|-------------| +| `name` | yes | Unique identifier for this entry. For known names (`vpcflow`, `cloudtrail`, etc.) the default `path_pattern` is applied automatically. | +| `encoding` | no | Extension ID of the decoder (e.g. `awslogs_encoding/vpcflow`). Omit to pass content through as raw bytes using the built-in default decoder. | +| `path_pattern` | no* | Prefix pattern matched against the S3 object key. `*` matches one path segment. Omit to use the built-in default for known names. Use `"*"` as a catch-all. | + +\* May be omitted only for built-in known names. For any other name, `path_pattern` must be set explicitly (use `"*"` for a catch-all). + +Each entry in `encodings` is evaluated in order of pattern specificity (more-specific patterns are +matched first; `"*"` catch-all is matched last). Users may list entries in any order. + +#### Combining encodings with extensions + +The `encoding` field references a collector extension by its component ID. Each referenced +extension must be declared in the `extensions:` block and listed under `service.extensions`. + +The following example decodes VPC Flow Logs and CloudTrail events into structured log records, +and forwards anything else as raw bytes via the catch-all entry: + +```yaml +extensions: + awslogs_encoding/vpcflow: + format: vpcflow + vpcflow: + file_format: plain-text + awslogs_encoding/cloudtrail: + format: cloudtrail + +receivers: + awslambda: + s3: + encodings: + - name: vpcflow + encoding: awslogs_encoding/vpcflow # decode VPC Flow Log fields into structured records + - name: cloudtrail + encoding: awslogs_encoding/cloudtrail # decode CloudTrail JSON events into structured records + path_pattern: "myorg/*/CloudTrail" # optional: override default (AWSLogs/*/CloudTrail); omit to use the default + - name: catchall + path_pattern: "*" # forward anything else as raw bytes + +``` + +#### Built-in default path patterns + +The following well-known names have built-in default path patterns. When `path_pattern` is omitted +for these names, the receiver uses the corresponding default. + +| Name | Default path pattern | +|-------------------|-----------------------------------------| +| `vpcflow` | `AWSLogs/*/vpcflowlogs` | +| `cloudtrail` | `AWSLogs/*/CloudTrail` | +| `elbaccess` | `AWSLogs/*/elasticloadbalancing` | +| `waf` | `AWSLogs/*/WAFLogs` | +| `networkfirewall` | `AWSLogs/*/network-firewall` | + +In the default patterns `*` matches exactly one path segment (the AWS account ID in standard AWS +log paths). + +For any name not listed above, `path_pattern` must be specified explicitly. + ### Example 3: CloudWatch Logs using CloudWatch Subscription Filters ```yaml diff --git a/receiver/awslambdareceiver/config.go b/receiver/awslambdareceiver/config.go index 600af76e68441..09a718c9231df 100644 --- a/receiver/awslambdareceiver/config.go +++ b/receiver/awslambdareceiver/config.go @@ -4,7 +4,9 @@ package awslambdareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver" import ( + "errors" "fmt" + "sort" "strings" "go.opentelemetry.io/collector/component" @@ -12,30 +14,144 @@ import ( const s3ARNPrefix = "arn:aws:s3:::" -type Config struct { - // S3 defines configuration options for S3 Lambda trigger. - S3 sharedConfig `mapstructure:"s3"` +// defaultS3PathPatterns maps known encoding names to their default S3 path patterns. +// "*" matches exactly one path segment (the AWS account ID in standard AWS log paths). +var defaultS3PathPatterns = map[string]string{ + "vpcflow": "AWSLogs/*/vpcflowlogs", + "cloudtrail": "AWSLogs/*/CloudTrail", + "elbaccess": "AWSLogs/*/elasticloadbalancing", + "waf": "AWSLogs/*/WAFLogs", + "networkfirewall": "AWSLogs/*/network-firewall", +} - // CloudWatch defines configuration options for CloudWatch Lambda trigger. - CloudWatch sharedConfig `mapstructure:"cloudwatch"` +// S3Encoding defines one entry in the S3 multi-encoding routing table. +type S3Encoding struct { + // Name identifies the encoding. For known names (vpcflow, cloudtrail, elbaccess, waf, + // networkfirewall) the default path_pattern is applied automatically. + Name string `mapstructure:"name"` - // FailureBucketARN is the ARN of receiver deployment Lambda's error destination. - FailureBucketARN string `mapstructure:"failure_bucket_arn"` + // Encoding is the extension ID for decoding (e.g. "awslogs_encoding/vpcflow"). + // If empty, content is passed through as-is using the built-in raw decoder. + Encoding string `mapstructure:"encoding"` + + // PathPattern is matched as a prefix against the S3 object key. + // "*" matches exactly one path segment. Example: "AWSLogs/*/vpcflowlogs" + // If empty, the default pattern for a known Name is used. + PathPattern string `mapstructure:"path_pattern"` +} - _ struct{} // Prevent unkeyed literal initialization +// ResolvePathPattern returns the effective path pattern for this encoding entry. +// Returns the configured PathPattern if set, else the default for known names. +func (e *S3Encoding) ResolvePathPattern() string { + if e.PathPattern != "" { + return e.PathPattern + } + return defaultS3PathPatterns[e.Name] } -// sharedConfig defines configuration options shared between different AWS Lambda trigger types. -// Note - we may extend or split this into dedicated structures per trigger type in future if needed. +// Validate validates an S3Encoding entry. +func (e *S3Encoding) Validate() error { + if e.Name == "" { + return errors.New("'name' is required") + } + // Catch-all is always valid. + if e.PathPattern == catchAllPattern { + return nil + } + // Unknown name without an explicit path_pattern is not routable. + if e.PathPattern == "" { + if _, ok := defaultS3PathPatterns[e.Name]; !ok { + return fmt.Errorf("'path_pattern' is required for encoding %q (no default available); use %q for catch-all", e.Name, catchAllPattern) + } + } + return nil +} + +// sharedConfig defines configuration options shared between Lambda trigger types. type sharedConfig struct { // Encoding defines the encoding to decode incoming Lambda invocation data. - // This extension is expected to further process content of the events that are extracted from Lambda trigger. - // - // If receiving data is in different formats(ex:- a mix of VPC flow logs, CloudTrail logs), receiver is recommended - // to have separate Lambda functions with specific extension configurations. Encoding string `mapstructure:"encoding"` } +// S3Config defines configuration options for the S3 Lambda trigger. +// It supersedes the sharedConfig for the s3 key. +// sharedConfig is embedded with mapstructure:",squash" so the "encoding" key +// remains at the top level of the s3 block in YAML — fully backwards-compatible. +type S3Config struct { + sharedConfig `mapstructure:",squash"` + + // Encodings defines multiple encoding entries for S3 path-based routing (multi-format mode). + // Each entry maps a path_pattern prefix to an encoding extension. + // Mutually exclusive with sharedConfig.Encoding. + Encodings []S3Encoding `mapstructure:"encodings"` +} + +// Validate validates the S3Config. +func (c *S3Config) Validate() error { + if c.Encoding != "" && len(c.Encodings) > 0 { + return errors.New("'encoding' and 'encodings' are mutually exclusive; use 'encodings' for multi-format support") + } + seen := make(map[string]bool, len(c.Encodings)) + for i, e := range c.Encodings { + if err := e.Validate(); err != nil { + return fmt.Errorf("encodings[%d]: %w", i, err) + } + if seen[e.Name] { + return fmt.Errorf("encodings[%d]: duplicate encoding name %q", i, e.Name) + } + seen[e.Name] = true + } + return nil +} + +// SortedEncodings returns a copy of Encodings sorted by path pattern specificity: +// more-specific patterns first, catch-all "*" last. +// This makes matching order-independent — users can list encodings in any order. +func (c *S3Config) SortedEncodings() []S3Encoding { + if len(c.Encodings) == 0 { + return nil + } + sorted := make([]S3Encoding, len(c.Encodings)) + copy(sorted, c.Encodings) + + // Pre-split patterns once; keyed by pattern string so the map stays correct + // as the sort swaps elements. + splitCache := make(map[string][]string, len(sorted)) + for _, enc := range sorted { + p := enc.ResolvePathPattern() + if _, ok := splitCache[p]; !ok { + splitCache[p] = strings.Split(p, "/") + } + } + + sort.SliceStable(sorted, func(i, j int) bool { + pi := sorted[i].ResolvePathPattern() + pj := sorted[j].ResolvePathPattern() + if isCatchAllPattern(pi) && !isCatchAllPattern(pj) { + return false + } + if !isCatchAllPattern(pi) && isCatchAllPattern(pj) { + return true + } + return comparePatternSpecificity(splitCache[pi], splitCache[pj]) < 0 + }) + return sorted +} + +// Config is the top-level configuration for the awslambda receiver. +type Config struct { + // S3 defines configuration for the S3 Lambda trigger. + S3 S3Config `mapstructure:"s3"` + + // CloudWatch defines configuration for the CloudWatch Logs Lambda trigger. + CloudWatch sharedConfig `mapstructure:"cloudwatch"` + + // FailureBucketARN is the ARN of the S3 bucket used to store failed Lambda event records. + FailureBucketARN string `mapstructure:"failure_bucket_arn"` + + _ struct{} // Prevent unkeyed literal initialization. +} + var _ component.Config = (*Config)(nil) func createDefaultConfig() component.Config { @@ -44,31 +160,26 @@ func createDefaultConfig() component.Config { func (c *Config) Validate() error { if c.FailureBucketARN != "" { - _, err := getBucketNameFromARN(c.FailureBucketARN) - if err != nil { + if _, err := getBucketNameFromARN(c.FailureBucketARN); err != nil { return fmt.Errorf("invalid failure_bucket_arn: %w", err) } } - + if err := c.S3.Validate(); err != nil { + return fmt.Errorf("invalid s3 config: %w", err) + } return nil } -// getBucketNameFromARN extracts S3 bucket name from ARN -// Example -// -// arn = "arn:aws:s3:::myBucket/folderA -// result = myBucket +// getBucketNameFromARN extracts the S3 bucket name from an ARN. +// Example: "arn:aws:s3:::myBucket/folderA" => "myBucket" func getBucketNameFromARN(arn string) (string, error) { if !strings.HasPrefix(arn, s3ARNPrefix) { return "", fmt.Errorf("invalid S3 ARN format: %s", arn) } - s3Path := strings.TrimPrefix(arn, s3ARNPrefix) bucket, _, _ := strings.Cut(s3Path, "/") - if bucket == "" { return "", fmt.Errorf("invalid S3 ARN format, bucket name missing: %s", arn) } - return bucket, nil } diff --git a/receiver/awslambdareceiver/config.schema.yaml b/receiver/awslambdareceiver/config.schema.yaml index 146e218e9bea9..e91c834b2b5ee 100644 --- a/receiver/awslambdareceiver/config.schema.yaml +++ b/receiver/awslambdareceiver/config.schema.yaml @@ -1,19 +1,42 @@ +$defs: + s_3_config: + description: S3Config defines configuration options for the S3 Lambda trigger. It supersedes the sharedConfig for the s3 key. sharedConfig is embedded with mapstructure:",squash" so the "encoding" key remains at the top level of the s3 block in YAML — fully backwards-compatible. + type: object + properties: + encoding: + description: Encoding defines the encoding to decode incoming Lambda invocation data. + type: string + encodings: + description: Encodings defines multiple encoding entries for S3 path-based routing (multi-format mode). Each entry maps a path_pattern prefix to an encoding extension. Mutually exclusive with sharedConfig.Encoding. + type: array + items: + $ref: s_3_encoding + s_3_encoding: + description: S3Encoding defines one entry in the S3 multi-encoding routing table. + type: object + properties: + encoding: + description: Encoding is the extension ID for decoding (e.g. "awslogs_encoding/vpcflow"). If empty, content is passed through as-is using the built-in raw decoder. + type: string + name: + description: Name identifies the encoding. For known names (vpcflow, cloudtrail, elbaccess, waf, networkfirewall) the default path_pattern is applied automatically. + type: string + path_pattern: + description: 'PathPattern is matched as a prefix against the S3 object key. "*" matches exactly one path segment. Example: "AWSLogs/*/vpcflowlogs" If empty, the default pattern for a known Name is used.' + type: string +description: Config is the top-level configuration for the awslambda receiver. type: object properties: cloudwatch: - description: CloudWatch defines configuration options for CloudWatch Lambda trigger. + description: CloudWatch defines configuration for the CloudWatch Logs Lambda trigger. type: object properties: encoding: - description: Encoding defines the encoding to decode incoming Lambda invocation data. This extension is expected to further process content of the events that are extracted from Lambda trigger. If receiving data is in different formats(ex:- a mix of VPC flow logs, CloudTrail logs), receiver is recommended to have separate Lambda functions with specific extension configurations. + description: Encoding defines the encoding to decode incoming Lambda invocation data. type: string failure_bucket_arn: - description: FailureBucketARN is the ARN of receiver deployment Lambda's error destination. + description: FailureBucketARN is the ARN of the S3 bucket used to store failed Lambda event records. type: string s3: - description: S3 defines configuration options for S3 Lambda trigger. - type: object - properties: - encoding: - description: Encoding defines the encoding to decode incoming Lambda invocation data. This extension is expected to further process content of the events that are extracted from Lambda trigger. If receiving data is in different formats(ex:- a mix of VPC flow logs, CloudTrail logs), receiver is recommended to have separate Lambda functions with specific extension configurations. - type: string + description: S3 defines configuration for the S3 Lambda trigger. + $ref: s_3_config diff --git a/receiver/awslambdareceiver/config_test.go b/receiver/awslambdareceiver/config_test.go index 62b54192f9c8d..e7de23c640e72 100644 --- a/receiver/awslambdareceiver/config_test.go +++ b/receiver/awslambdareceiver/config_test.go @@ -31,21 +31,15 @@ func TestLoadConfig(t *testing.T) { name: "Config with both S3 and CloudWatch encoding", componentIDToLoad: component.NewIDWithName(metadata.Type, "aws_logs_encoding"), expected: &Config{ - S3: sharedConfig{ - Encoding: "aws_logs_encoding", - }, - CloudWatch: sharedConfig{ - Encoding: "aws_logs_encoding", - }, + S3: S3Config{sharedConfig: sharedConfig{Encoding: "aws_logs_encoding"}}, + CloudWatch: sharedConfig{Encoding: "aws_logs_encoding"}, }, }, { name: "Config with both S3 config only", componentIDToLoad: component.NewIDWithName(metadata.Type, "json_log_encoding"), expected: &Config{ - S3: sharedConfig{ - Encoding: "json_log_encoding", - }, + S3: S3Config{sharedConfig: sharedConfig{Encoding: "json_log_encoding"}}, CloudWatch: sharedConfig{}, }, }, @@ -53,7 +47,7 @@ func TestLoadConfig(t *testing.T) { name: "Config with empty encoding", componentIDToLoad: component.NewIDWithName(metadata.Type, "empty_encoding"), expected: &Config{ - S3: sharedConfig{}, + S3: S3Config{}, CloudWatch: sharedConfig{}, }, }, @@ -61,11 +55,25 @@ func TestLoadConfig(t *testing.T) { name: "Config with failure bucket ARN", componentIDToLoad: component.NewIDWithName(metadata.Type, "with_failure_arn"), expected: &Config{ - S3: sharedConfig{}, + S3: S3Config{}, CloudWatch: sharedConfig{}, FailureBucketARN: "arn:aws:s3:::example", }, }, + { + name: "Config with S3 multi-encoding", + componentIDToLoad: component.NewIDWithName(metadata.Type, "s3_multi_encoding"), + expected: &Config{ + S3: S3Config{ + Encodings: []S3Encoding{ + {Name: "vpcflow", Encoding: "awslogs_encoding/vpcflow"}, + {Name: "cloudtrail", Encoding: "awslogs_encoding/cloudtrail"}, + {Name: "catchall", PathPattern: "*"}, + }, + }, + CloudWatch: sharedConfig{}, + }, + }, } for _, tt := range tests { @@ -84,3 +92,179 @@ func TestLoadConfig(t *testing.T) { }) } } + +func TestS3ConfigValidate(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + config S3Config + wantErr string + }{ + { + name: "empty config is valid", + config: S3Config{}, + }, + { + name: "single encoding is valid", + config: S3Config{sharedConfig: sharedConfig{Encoding: "awslogs_encoding"}}, + }, + { + name: "encodings with known names is valid", + config: S3Config{ + Encodings: []S3Encoding{ + {Name: "vpcflow", Encoding: "awslogs_encoding/vpcflow"}, + {Name: "cloudtrail", Encoding: "awslogs_encoding/cloudtrail"}, + }, + }, + }, + { + name: "encodings with custom path_pattern is valid", + config: S3Config{ + Encodings: []S3Encoding{ + {Name: "custom", Encoding: "my_encoding", PathPattern: "custom-logs/"}, + }, + }, + }, + { + name: "raw passthrough entry (no encoding) is valid", + config: S3Config{ + Encodings: []S3Encoding{ + {Name: "raw", PathPattern: "raw-logs/"}, + }, + }, + }, + { + name: "catch-all * is valid", + config: S3Config{ + Encodings: []S3Encoding{ + {Name: "catchall", PathPattern: "*"}, + }, + }, + }, + { + name: "encoding and encodings are mutually exclusive", + config: S3Config{ + sharedConfig: sharedConfig{Encoding: "awslogs_encoding"}, + Encodings: []S3Encoding{{Name: "vpcflow"}}, + }, + wantErr: "'encoding' and 'encodings' are mutually exclusive", + }, + { + name: "encoding entry without name is invalid", + config: S3Config{ + Encodings: []S3Encoding{{Encoding: "awslogs_encoding", PathPattern: "logs/"}}, + }, + wantErr: "'name' is required", + }, + { + name: "unknown format without path_pattern is invalid", + config: S3Config{ + Encodings: []S3Encoding{{Name: "custom_format", Encoding: "my_encoding"}}, + }, + wantErr: "'path_pattern' is required for encoding \"custom_format\"", + }, + { + name: "duplicate names are invalid", + config: S3Config{ + Encodings: []S3Encoding{ + {Name: "vpcflow", Encoding: "enc/vpc1"}, + {Name: "vpcflow", Encoding: "enc/vpc2"}, + }, + }, + wantErr: "duplicate encoding name \"vpcflow\"", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.config.Validate() + if tt.wantErr == "" { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, tt.wantErr) + } + }) + } +} + +func TestS3EncodingResolvePathPattern(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + enc S3Encoding + expected string + }{ + {name: "vpcflow uses default", enc: S3Encoding{Name: "vpcflow"}, expected: "AWSLogs/*/vpcflowlogs"}, + {name: "cloudtrail uses default", enc: S3Encoding{Name: "cloudtrail"}, expected: "AWSLogs/*/CloudTrail"}, + {name: "elbaccess uses default", enc: S3Encoding{Name: "elbaccess"}, expected: "AWSLogs/*/elasticloadbalancing"}, + {name: "waf uses default", enc: S3Encoding{Name: "waf"}, expected: "AWSLogs/*/WAFLogs"}, + {name: "networkfirewall uses default", enc: S3Encoding{Name: "networkfirewall"}, expected: "AWSLogs/*/network-firewall"}, + {name: "custom overrides default", enc: S3Encoding{Name: "vpcflow", PathPattern: "AWSLogs/123/vpcflowlogs"}, expected: "AWSLogs/123/vpcflowlogs"}, + {name: "unknown without path_pattern returns empty", enc: S3Encoding{Name: "unknown"}, expected: ""}, + {name: "catch-all stays as *", enc: S3Encoding{Name: "catchall", PathPattern: "*"}, expected: "*"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expected, tt.enc.ResolvePathPattern()) + }) + } +} + +func TestS3ConfigSortedEncodings(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + config S3Config + expectedOrder []string + }{ + { + name: "empty returns nil", + config: S3Config{}, + expectedOrder: nil, + }, + { + name: "catch-all moved to last regardless of input order", + config: S3Config{Encodings: []S3Encoding{ + {Name: "catchall", PathPattern: "*"}, + {Name: "vpcflow"}, + {Name: "cloudtrail"}, + }}, + expectedOrder: []string{"vpcflow", "cloudtrail", "catchall"}, + }, + { + name: "specific account beats wildcard account", + config: S3Config{Encodings: []S3Encoding{ + {Name: "vpc-any", PathPattern: "AWSLogs/*/vpcflowlogs"}, + {Name: "vpc-specific", PathPattern: "AWSLogs/123456789012/vpcflowlogs"}, + }}, + expectedOrder: []string{"vpc-specific", "vpc-any"}, + }, + { + name: "longer pattern beats shorter", + config: S3Config{Encodings: []S3Encoding{ + {Name: "vpc-any", PathPattern: "AWSLogs/*/vpcflowlogs"}, + {Name: "vpc-region", PathPattern: "AWSLogs/*/vpcflowlogs/us-east-1"}, + }}, + expectedOrder: []string{"vpc-region", "vpc-any"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sorted := tt.config.SortedEncodings() + if tt.expectedOrder == nil { + require.Nil(t, sorted) + return + } + var names []string + for _, e := range sorted { + names = append(names, e.Name) + } + require.Equal(t, tt.expectedOrder, names) + }) + } +} diff --git a/receiver/awslambdareceiver/handler.go b/receiver/awslambdareceiver/handler.go index dcac469d6203d..b603b8d0fb49a 100644 --- a/receiver/awslambdareceiver/handler.go +++ b/receiver/awslambdareceiver/handler.go @@ -173,7 +173,7 @@ func (*s3Handler) handlerType() eventType { func (s *s3Handler) handle(ctx context.Context, event json.RawMessage) error { var err error - parsedEvent, err := s.parseEvent(event) + parsedEvent, err := parseS3Event(event) if err != nil { return fmt.Errorf("failed to parse the event: %w", err) } @@ -213,13 +213,15 @@ func (s *s3Handler) handle(ctx context.Context, event json.RawMessage) error { return nil } -func (*s3Handler) parseEvent(raw json.RawMessage) (event events.S3EventRecord, err error) { +// parseS3Event parses a raw JSON S3 event notification and returns the single S3 event record. +// S3 event notifications always contain exactly one record. +func parseS3Event(raw json.RawMessage) (events.S3EventRecord, error) { var message events.S3Event if err := gojson.Unmarshal(raw, &message); err != nil { return events.S3EventRecord{}, fmt.Errorf("failed to unmarshal S3 event notification: %w", err) } - // Records cannot be more than 1 in case of s3 event notifications + // This receiver processes one S3 object per invocation; reject events with != 1 record. if len(message.Records) > 1 || len(message.Records) == 0 { return events.S3EventRecord{}, fmt.Errorf("s3 event notification should contain one record instead of %d", len(message.Records)) } @@ -227,6 +229,101 @@ func (*s3Handler) parseEvent(raw json.RawMessage) (event events.S3EventRecord, e return message.Records[0], nil } +// multiFormatS3LogsHandler handles S3 events with multiple log encodings. +// It uses the router to select the correct LogsDecoderFactory based on the +// S3 object key path, then streams the object through the selected decoder. +type multiFormatS3LogsHandler struct { + s3Service internal.S3Service + logger *zap.Logger + router *logsDecoderRouter + consumer consumer.Logs +} + +func newMultiFormatS3LogsHandler( + service internal.S3Service, + baseLogger *zap.Logger, + router *logsDecoderRouter, + consumer consumer.Logs, +) *multiFormatS3LogsHandler { + return &multiFormatS3LogsHandler{ + s3Service: service, + logger: baseLogger.Named("s3-multiformat"), + router: router, + consumer: consumer, + } +} + +func (*multiFormatS3LogsHandler) handlerType() eventType { + return s3Event +} + +func (s *multiFormatS3LogsHandler) handle(ctx context.Context, event json.RawMessage) error { + parsedEvent, err := parseS3Event(event) + if err != nil { + return fmt.Errorf("failed to parse the event: %w", err) + } + + objectKey := parsedEvent.S3.Object.URLDecodedKey + + s.logger.Debug("Processing S3 event notification (multi-encoding).", + zap.String("File", objectKey), + zap.String("S3Bucket", parsedEvent.S3.Bucket.Arn), + ) + + if parsedEvent.S3.Object.Size == 0 { + s.logger.Info("Empty object, skipping download", zap.String("File", objectKey)) + return nil + } + + // Route to the correct decoder based on the object key path. + decoderFactory, encodingName, err := s.router.GetDecoder(objectKey) + if err != nil { + return fmt.Errorf("failed to route S3 object: %w", err) + } + + s.logger.Debug("Matched encoding for S3 object", + zap.String("File", objectKey), + zap.String("Encoding", encodingName), + ) + + reader, err := s.s3Service.GetReader(ctx, parsedEvent.S3.Bucket.Name, objectKey) + if err != nil { + return err + } + + wrappedReader, err := gunzipIfNeeded(reader) + if err != nil { + return fmt.Errorf("failed to derive reader with wrapper: %w", err) + } + defer func() { + if gzReader, ok := wrappedReader.(*gzip.Reader); ok { + _ = gzReader.Close() + } + }() + + decoder, err := decoderFactory.NewLogsDecoder(wrappedReader, encoding.WithFlushBytes(s3StreamBatchSize), encoding.WithFlushItems(0)) + if err != nil { + return fmt.Errorf("failed to create logs decoder for encoding %q: %w", encodingName, err) + } + + for { + var logs plog.Logs + logs, err = decoder.DecodeLogs() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return fmt.Errorf("failed to decode S3 logs (encoding %q): %w", encodingName, err) + } + enrichS3Logs(logs, parsedEvent) + if err = s.consumer.ConsumeLogs(getEnrichedContext(ctx, parsedEvent), logs); err != nil { + return checkConsumerErrorAndWrap(err) + } + } + + return nil +} + // cwLogsSubscriptionHandler is specialized in CloudWatch log stream subscription filter events type cwLogsSubscriptionHandler struct { logsDecoder encoding.LogsDecoderFactory diff --git a/receiver/awslambdareceiver/handler_test.go b/receiver/awslambdareceiver/handler_test.go index 6457eaa099ee6..2ddbb8d05aca0 100644 --- a/receiver/awslambdareceiver/handler_test.go +++ b/receiver/awslambdareceiver/handler_test.go @@ -307,18 +307,12 @@ func TestS3HandlerParseEvent(t *testing.T) { }, } - ctr := gomock.NewController(t) - s3Service := internal.NewMockS3Service(ctr) - s3Service.EXPECT().ReadObject(gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte("S3 content"), nil).AnyTimes() - - handler := newS3LogsHandler(s3Service, zap.NewNop(), &customLogUnmarshaler{}, &noOpLogsConsumer{}) - for _, test := range tests { t.Run(test.name, func(t *testing.T) { marshal, err := json.Marshal(test.input) require.NoError(t, err) - event, err := handler.parseEvent(marshal) + event, err := parseS3Event(marshal) if test.isError { require.Error(t, err) @@ -656,6 +650,178 @@ func (n *mockPlogEventHandler) handle(context.Context, json.RawMessage) error { return nil } +func TestMultiFormatS3LogsHandler(t *testing.T) { + t.Parallel() + + type s3Content struct { + bucketName string + objectKey string + data []byte + } + + tests := []struct { + name string + s3Event events.S3Event + s3MockContent s3Content + encodings []S3Encoding + decoders map[string]encoding.LogsDecoderFactory + expectedErr string + }{ + { + name: "routes VPC flow log to correct decoder", + s3Event: events.S3Event{Records: []events.S3EventRecord{{ + EventSource: "aws:s3", + AWSRegion: "us-east-1", + EventTime: time.Unix(1764625361, 0), + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: "test-bucket", Arn: "arn:aws:s3:::test-bucket"}, + Object: events.S3Object{ + Key: "AWSLogs/123/vpcflowlogs/us-east-1/file.log.gz", + URLDecodedKey: "AWSLogs/123/vpcflowlogs/us-east-1/file.log.gz", + Size: 10, + }, + }, + }}}, + s3MockContent: s3Content{ + bucketName: "test-bucket", + objectKey: "AWSLogs/123/vpcflowlogs/us-east-1/file.log.gz", + data: []byte("vpc flow log data"), + }, + encodings: []S3Encoding{ + {Name: "vpcflow", Encoding: "awslogs_encoding/vpc"}, + {Name: "cloudtrail", Encoding: "awslogs_encoding/ct"}, + }, + decoders: map[string]encoding.LogsDecoderFactory{ + "vpcflow": &customLogUnmarshaler{}, + "cloudtrail": &customLogUnmarshaler{}, + }, + }, + { + name: "routes CloudTrail to correct decoder", + s3Event: events.S3Event{Records: []events.S3EventRecord{{ + EventSource: "aws:s3", + AWSRegion: "us-east-1", + EventTime: time.Unix(1764625361, 0), + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: "test-bucket", Arn: "arn:aws:s3:::test-bucket"}, + Object: events.S3Object{ + Key: "AWSLogs/123/CloudTrail/us-east-1/file.json.gz", + URLDecodedKey: "AWSLogs/123/CloudTrail/us-east-1/file.json.gz", + Size: 10, + }, + }, + }}}, + s3MockContent: s3Content{ + bucketName: "test-bucket", + objectKey: "AWSLogs/123/CloudTrail/us-east-1/file.json.gz", + data: []byte("cloudtrail data"), + }, + encodings: []S3Encoding{ + {Name: "vpcflow", Encoding: "awslogs_encoding/vpc"}, + {Name: "cloudtrail", Encoding: "awslogs_encoding/ct"}, + }, + decoders: map[string]encoding.LogsDecoderFactory{ + "vpcflow": &customLogUnmarshaler{}, + "cloudtrail": &customLogUnmarshaler{}, + }, + }, + { + name: "no matching pattern returns error", + s3Event: events.S3Event{Records: []events.S3EventRecord{{ + EventSource: "aws:s3", + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: "test-bucket", Arn: "arn:aws:s3:::test-bucket"}, + Object: events.S3Object{ + Key: "unknown/path/file.log", + URLDecodedKey: "unknown/path/file.log", + Size: 10, + }, + }, + }}}, + s3MockContent: s3Content{ + bucketName: "test-bucket", + objectKey: "unknown/path/file.log", + data: []byte("some data"), + }, + encodings: []S3Encoding{{Name: "vpcflow", Encoding: "awslogs_encoding/vpc"}}, + decoders: map[string]encoding.LogsDecoderFactory{ + "vpcflow": &customLogUnmarshaler{}, + }, + expectedErr: "no encoding matches S3 object key", + }, + { + name: "catch-all routes unmatched object to default decoder", + s3Event: events.S3Event{Records: []events.S3EventRecord{{ + EventSource: "aws:s3", + AWSRegion: "us-east-1", + EventTime: time.Unix(1764625361, 0), + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: "test-bucket", Arn: "arn:aws:s3:::test-bucket"}, + Object: events.S3Object{ + Key: "random/path/file.log", + URLDecodedKey: "random/path/file.log", + Size: 10, + }, + }, + }}}, + s3MockContent: s3Content{ + bucketName: "test-bucket", + objectKey: "random/path/file.log", + data: []byte("random data"), + }, + encodings: []S3Encoding{ + {Name: "vpcflow", Encoding: "awslogs_encoding/vpc"}, + {Name: "catchall", PathPattern: "*"}, // no encoding => default decoder + }, + decoders: map[string]encoding.LogsDecoderFactory{ + "vpcflow": &customLogUnmarshaler{}, + }, + }, + { + name: "skips empty object", + s3Event: events.S3Event{Records: []events.S3EventRecord{{ + EventSource: "aws:s3", + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: "test-bucket", Arn: "arn:aws:s3:::test-bucket"}, + Object: events.S3Object{ + Key: "AWSLogs/123/vpcflowlogs/file.log", + URLDecodedKey: "AWSLogs/123/vpcflowlogs/file.log", + Size: 0, + }, + }, + }}}, + s3MockContent: s3Content{bucketName: "test-bucket", objectKey: "AWSLogs/123/vpcflowlogs/file.log"}, + encodings: []S3Encoding{{Name: "vpcflow", Encoding: "awslogs_encoding/vpc"}}, + decoders: map[string]encoding.LogsDecoderFactory{"vpcflow": &customLogUnmarshaler{}}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctr := gomock.NewController(t) + s3Service := internal.NewMockS3Service(ctr) + s3Service.EXPECT(). + GetReader(gomock.Any(), test.s3MockContent.bucketName, test.s3MockContent.objectKey). + Return(io.NopCloser(bytes.NewReader(test.s3MockContent.data)), nil). + AnyTimes() + + defaultDecoder := internal.NewDefaultS3LogsDecoder() + router := newLogsDecoderRouter(test.encodings, test.decoders, defaultDecoder) + handler := newMultiFormatS3LogsHandler(s3Service, zap.NewNop(), router, &noOpLogsConsumer{}) + + event, err := json.Marshal(test.s3Event) + require.NoError(t, err) + + errP := handler.handle(t.Context(), event) + if test.expectedErr != "" { + require.ErrorContains(t, errP, test.expectedErr) + } else { + require.NoError(t, errP) + } + }) + } +} + func loadCompressedData(t *testing.T, file string) string { data, err := os.ReadFile(file) require.NoError(t, err) diff --git a/receiver/awslambdareceiver/match.go b/receiver/awslambdareceiver/match.go new file mode 100644 index 0000000000000..15c08bfb4cee4 --- /dev/null +++ b/receiver/awslambdareceiver/match.go @@ -0,0 +1,77 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awslambdareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver" + +// catchAllPattern is a special pattern value that matches any path. +// Use this as a fallback/catch-all encoding entry. +const catchAllPattern = "*" + +// matchPrefixWithWildcard checks whether targetParts has patternParts as a prefix. +// Each pattern segment must be either an exact literal or "*" (matches any single segment). +// The target may have more segments than the pattern (prefix semantics). +// +// The caller is responsible for splitting the object key and the pattern on "/" before calling. +// +// Examples: +// +// matchPrefixWithWildcard( +// []string{"AWSLogs", "123456789012", "vpcflowlogs", "file.gz"}, +// []string{"AWSLogs", "*", "vpcflowlogs"}, +// ) => true +// matchPrefixWithWildcard([]string{"any", "path"}, []string{"*"}) => true +func matchPrefixWithWildcard(targetParts, patternParts []string) bool { + // Target must have at least as many segments as the pattern. + if len(targetParts) < len(patternParts) { + return false + } + + for i, p := range patternParts { + if p == "*" { + continue // wildcard matches any single segment + } + if p != targetParts[i] { + return false + } + } + + return true +} + +// comparePatternSpecificity compares two pre-split patterns by specificity. +// Returns negative if a is more specific, positive if b is more specific, 0 if equal. +// +// Rules (evaluated left-to-right per segment): +// - Exact segment beats "*" at the same position. +// - Longer pattern (more segments) beats a shorter one. +func comparePatternSpecificity(partsA, partsB []string) int { + minLen := min(len(partsA), len(partsB)) + + for i := range minLen { + aSpec := segmentSpecificity(partsA[i]) + bSpec := segmentSpecificity(partsB[i]) + if aSpec != bSpec { + // Higher score = more specific = should sort earlier. + return bSpec - aSpec + } + } + + // Longer pattern is more specific. + return len(partsB) - len(partsA) +} + +// segmentSpecificity returns a specificity score for a single pattern segment. +// Higher score = more specific. +// - 1: exact literal +// - 0: wildcard "*" +func segmentSpecificity(segment string) int { + if segment == "*" { + return 0 + } + return 1 +} + +// isCatchAllPattern returns true if the pattern is the bare catch-all "*". +func isCatchAllPattern(pattern string) bool { + return pattern == catchAllPattern +} diff --git a/receiver/awslambdareceiver/match_test.go b/receiver/awslambdareceiver/match_test.go new file mode 100644 index 0000000000000..a40eecdf8e8b5 --- /dev/null +++ b/receiver/awslambdareceiver/match_test.go @@ -0,0 +1,162 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awslambdareceiver + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +// splitForTest splits a "/" separated path string into parts for use in match tests. +func splitForTest(s string) []string { + return strings.Split(s, "/") +} + +func TestMatchPrefixWithWildcard(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + pattern string + objectKey string + want bool + }{ + // AWS default patterns — literal prefix + single-segment wildcard + { + name: "VPC Flow Logs matches", + pattern: "AWSLogs/*/vpcflowlogs", + objectKey: "AWSLogs/123456789012/vpcflowlogs/us-east-1/2024/01/15/file.log.gz", + want: true, + }, + { + name: "CloudTrail matches", + pattern: "AWSLogs/*/CloudTrail", + objectKey: "AWSLogs/123456789012/CloudTrail/us-east-1/2024/01/15/file.json.gz", + want: true, + }, + { + name: "ELB Access matches", + pattern: "AWSLogs/*/elasticloadbalancing", + objectKey: "AWSLogs/123456789012/elasticloadbalancing/us-east-1/2024/01/15/file.log.gz", + want: true, + }, + { + name: "WAF logs matches", + pattern: "AWSLogs/*/WAFLogs", + objectKey: "AWSLogs/123456789012/WAFLogs/my-web-acl/2024/01/15/file.log.gz", + want: true, + }, + { + name: "VPC pattern rejects CloudTrail key", + pattern: "AWSLogs/*/vpcflowlogs", + objectKey: "AWSLogs/123456789012/CloudTrail/us-east-1/file.json.gz", + want: false, + }, + // Specific account override — exact literal at account segment + { + name: "specific account matches correct account", + pattern: "AWSLogs/123456789012/vpcflowlogs", + objectKey: "AWSLogs/123456789012/vpcflowlogs/us-east-1/file.log.gz", + want: true, + }, + { + name: "specific account rejects wrong account", + pattern: "AWSLogs/123456789012/vpcflowlogs", + objectKey: "AWSLogs/999999999999/vpcflowlogs/us-east-1/file.log.gz", + want: false, + }, + // Catch-all and edge cases + { + name: "catch-all * matches any path", + pattern: "*", + objectKey: "AWSLogs/123456789012/vpcflowlogs/file.log.gz", + want: true, + }, + { + name: "custom path with wildcard matches deeper key", + pattern: "my-app/*/logs", + objectKey: "my-app/production/logs/2024/app.log", + want: true, + }, + { + name: "path shorter than pattern does not match", + pattern: "AWSLogs/*/vpcflowlogs/us-east-1", + objectKey: "AWSLogs/123456789012/vpcflowlogs/file.log.gz", + want: false, + }, + // Two consecutive wildcards + { + name: "two wildcards then exact segment matches", + pattern: "*/*/WAFLogs", + objectKey: "AWSLogs/123456789012/WAFLogs/my-web-acl/2024/01/15/file.log.gz", + want: true, + }, + { + name: "two wildcards then exact segment rejects mismatch", + pattern: "*/*/WAFLogs", + objectKey: "AWSLogs/123456789012/vpcflowlogs/us-east-1/file.log.gz", + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got := matchPrefixWithWildcard(splitForTest(tt.objectKey), splitForTest(tt.pattern)) + assert.Equal(t, tt.want, got, "matchPrefixWithWildcard(%q, %q)", tt.objectKey, tt.pattern) + }) + } +} + +func TestComparePatternSpecificity(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + a, b string + expect string // "a_wins" | "b_wins" | "equal" + }{ + { + name: "exact segment beats wildcard at same position", + a: "AWSLogs/123456789012/vpcflowlogs", + b: "AWSLogs/*/vpcflowlogs", + expect: "a_wins", + }, + { + name: "longer pattern beats shorter", + a: "AWSLogs/*/vpcflowlogs/us-east-1", + b: "AWSLogs/*/vpcflowlogs", + expect: "a_wins", + }, + { + name: "identical patterns are equal", + a: "AWSLogs/*/vpcflowlogs", + b: "AWSLogs/*/vpcflowlogs", + expect: "equal", + }, + { + name: "wildcard-only beats catch-all (longer wins)", + a: "AWSLogs/*/*", + b: "*", + expect: "a_wins", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got := comparePatternSpecificity(splitForTest(tt.a), splitForTest(tt.b)) + switch tt.expect { + case "a_wins": + assert.Negative(t, got, "expected a=%q to be more specific than b=%q", tt.a, tt.b) + case "b_wins": + assert.Positive(t, got, "expected b=%q to be more specific than a=%q", tt.b, tt.a) + case "equal": + assert.Zero(t, got, "expected a=%q and b=%q to be equal specificity", tt.a, tt.b) + } + }) + } +} diff --git a/receiver/awslambdareceiver/receiver.go b/receiver/awslambdareceiver/receiver.go index 6bb213601f14c..8850b2afa7e82 100644 --- a/receiver/awslambdareceiver/receiver.go +++ b/receiver/awslambdareceiver/receiver.go @@ -229,40 +229,71 @@ func newLogsHandler( ) (handlerProvider, error) { logger := set.Logger - var err error - s3LogsDecoder := internal.NewDefaultS3LogsDecoder() - if cfg.S3.Encoding != "" { - logger.Info("Using configured S3 encoding for logs", zap.String("encoding", cfg.S3.Encoding)) - - s3LogsDecoder, err = resolveLogsDecoder(host, cfg.S3.Encoding) - if err != nil { - return nil, err - } - } - s3Service, err := s3Provider.GetService(ctx) if err != nil { return nil, fmt.Errorf("unable to load the S3 service: %w", err) } + registry := make(handlerRegistry) + + // S3: multi-encoding or single-encoding + if len(cfg.S3.Encodings) > 0 { + s3Router, buildErr := buildS3LogsRouter(host, cfg.S3, logger) + if buildErr != nil { + return nil, fmt.Errorf("failed to build S3 multi-encoding router: %w", buildErr) + } + registry[s3Event] = newMultiFormatS3LogsHandler(s3Service, logger, s3Router, next) + } else { + s3LogsDecoder := internal.NewDefaultS3LogsDecoder() + if cfg.S3.Encoding != "" { + logger.Info("Using configured S3 encoding for logs", zap.String("encoding", cfg.S3.Encoding)) + s3LogsDecoder, err = resolveLogsDecoder(host, cfg.S3.Encoding) + if err != nil { + return nil, err + } + } + registry[s3Event] = newS3LogsHandler(s3Service, logger, s3LogsDecoder, next) + } + + // CloudWatch: single-encoding path unchanged in this PR. cwDecoder := internal.NewDefaultCWLogsDecoder() if cfg.CloudWatch.Encoding != "" { logger.Info("Using configured CloudWatch encoding for logs", zap.String("encoding", cfg.CloudWatch.Encoding)) - cwDecoder, err = resolveLogsDecoder(host, cfg.CloudWatch.Encoding) if err != nil { return nil, err } } - - // Register handlers. Logs supports S3 and CloudWatch Logs subscription events. - registry := make(handlerRegistry) - registry[s3Event] = newS3LogsHandler(s3Service, logger, s3LogsDecoder, next) registry[cwEvent] = newCWLogsSubscriptionHandler(cwDecoder, next) return newHandlerProvider(registry), nil } +// buildS3LogsRouter constructs a logsDecoderRouter from the S3 encodings config. +// Encodings are sorted by path pattern specificity before being passed to the router. +func buildS3LogsRouter(host component.Host, cfg S3Config, logger *zap.Logger) (*logsDecoderRouter, error) { + sortedEncodings := cfg.SortedEncodings() + decoders := make(map[string]encoding.LogsDecoderFactory, len(sortedEncodings)) + + for _, enc := range sortedEncodings { + if enc.Encoding == "" { + continue // raw passthrough uses the default decoder + } + decoder, err := resolveLogsDecoder(host, enc.Encoding) + if err != nil { + return nil, fmt.Errorf("failed to resolve encoding for S3 entry %q: %w", enc.Name, err) + } + logger.Info("Registered decoder for S3 encoding entry", + zap.String("name", enc.Name), + zap.String("encoding", enc.Encoding), + zap.String("pattern", enc.ResolvePathPattern()), + ) + decoders[enc.Name] = decoder + } + + return newLogsDecoderRouter(sortedEncodings, decoders, internal.NewDefaultS3LogsDecoder()), nil +} + func newMetricsHandler( ctx context.Context, cfg *Config, diff --git a/receiver/awslambdareceiver/receiver_test.go b/receiver/awslambdareceiver/receiver_test.go index 6fb469979f7e5..fa194ff68aecd 100644 --- a/receiver/awslambdareceiver/receiver_test.go +++ b/receiver/awslambdareceiver/receiver_test.go @@ -11,9 +11,11 @@ import ( "testing" "github.com/aws/aws-lambda-go/events" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -206,9 +208,7 @@ func TestStartRequiresLambdaEnvironment(t *testing.T) { func TestProcessLambdaEvent(t *testing.T) { commonCfg := Config{ - S3: sharedConfig{ - Encoding: "awslogs", - }, + S3: S3Config{sharedConfig: sharedConfig{Encoding: "awslogs"}}, } commonLogger := zap.NewNop() @@ -554,6 +554,84 @@ func TestHandleCustomTrigger(t *testing.T) { } } +func TestS3MultiEncodingConfig_Validate(t *testing.T) { + // Verify that a config with S3 Encodings (all raw passthrough, no extension needed) + // passes validation and has the expected number of entries. + cfg := &Config{ + S3: S3Config{ + Encodings: []S3Encoding{ + {Name: "vpcflow"}, // raw passthrough, no extension needed + {Name: "catchall", PathPattern: "*"}, + }, + }, + CloudWatch: sharedConfig{}, + } + require.NoError(t, cfg.Validate()) + require.Len(t, cfg.S3.Encodings, 2) +} + +func TestNewLogsHandler_MultiEncodingS3_Branch(t *testing.T) { + // Verify that newLogsHandler uses the multi-format branch when cfg.S3.Encodings is non-empty. + cfg := &Config{ + S3: S3Config{ + Encodings: []S3Encoding{ + {Name: "vpcflow"}, // raw passthrough, uses default decoder + {Name: "catchall", PathPattern: "*"}, // catch-all, uses default decoder + }, + }, + CloudWatch: sharedConfig{}, + } + require.NoError(t, cfg.Validate()) + + ctr := gomock.NewController(t) + s3Service := internal.NewMockS3Service(ctr) + s3Provider := internal.NewMockS3Provider(ctr) + s3Provider.EXPECT().GetService(gomock.Any()).Return(s3Service, nil) + + settings := receivertest.NewNopSettings(metadata.Type) + host := componenttest.NewNopHost() // no extensions needed — all raw passthrough + sink := &consumertest.LogsSink{} + + hp, err := newLogsHandler(t.Context(), cfg, settings, host, sink, s3Provider) + require.NoError(t, err) + require.NotNil(t, hp) + + // The registered handler must be the multi-format variant. + handler, err := hp.getHandler(s3Event) + require.NoError(t, err) + _, ok := handler.(*multiFormatS3LogsHandler) + require.True(t, ok, "expected *multiFormatS3LogsHandler, got %T", handler) +} + +func TestBuildS3LogsRouter_RawPassthrough(t *testing.T) { + // Verify that buildS3LogsRouter succeeds when all encodings are raw passthrough + // (no extension references), and the resulting router routes correctly. + cfg := S3Config{ + Encodings: []S3Encoding{ + {Name: "vpcflow"}, // default pattern, no encoding = raw + {Name: "catchall", PathPattern: "*"}, // catch-all, no encoding = raw + }, + } + + // Use a nop host — no extensions needed since all entries are raw passthrough. + host := componenttest.NewNopHost() + router, err := buildS3LogsRouter(host, cfg, zap.NewNop()) + require.NoError(t, err) + require.NotNil(t, router) + + // VPC flow log key should match the vpcflow entry. + decoder, name, err := router.GetDecoder("AWSLogs/123/vpcflowlogs/us-east-1/file.log.gz") + require.NoError(t, err) + assert.Equal(t, "vpcflow", name) + assert.NotNil(t, decoder) + + // Random key should fall through to catch-all. + decoder, name, err = router.GetDecoder("random/path/file.log") + require.NoError(t, err) + assert.Equal(t, "catchall", name) + assert.NotNil(t, decoder) +} + type mockExtensionWithPLogUnmarshaler struct { mockExtension // Embed the base mock implementation. plog.Unmarshaler // Add the unmarshaler interface when needed. diff --git a/receiver/awslambdareceiver/router.go b/receiver/awslambdareceiver/router.go new file mode 100644 index 0000000000000..e7c42cb48c249 --- /dev/null +++ b/receiver/awslambdareceiver/router.go @@ -0,0 +1,76 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awslambdareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver" + +import ( + "fmt" + "strings" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" +) + +// routedEncoding pairs an S3Encoding with its pre-split path pattern parts. +type routedEncoding struct { + enc S3Encoding + patternParts []string +} + +// logsDecoderRouter routes S3 object keys to the appropriate LogsDecoderFactory +// based on path pattern matching with wildcards. +// +// Encodings must be pre-sorted by specificity (most specific first, catch-all last). +// Use S3Config.SortedEncodings() to obtain a correctly sorted slice. +type logsDecoderRouter struct { + encodings []routedEncoding + decoders map[string]encoding.LogsDecoderFactory + defaultDecoder encoding.LogsDecoderFactory +} + +// newLogsDecoderRouter creates a new S3 logs decoder router. +// - encodings: pre-sorted S3Encoding slice. +// - decoders: maps encoding.Name => LogsDecoderFactory for entries with an Encoding field set. +// - defaultDecoder: used for entries with no Encoding (raw passthrough). +// +// Path patterns are split on "/" at construction time so that GetDecoder only +// splits the object key once per S3 event rather than once per pattern. +func newLogsDecoderRouter( + encodings []S3Encoding, + decoders map[string]encoding.LogsDecoderFactory, + defaultDecoder encoding.LogsDecoderFactory, +) *logsDecoderRouter { + routed := make([]routedEncoding, len(encodings)) + for i, enc := range encodings { + routed[i] = routedEncoding{ + enc: enc, + patternParts: strings.Split(enc.ResolvePathPattern(), "/"), + } + } + return &logsDecoderRouter{ + encodings: routed, + decoders: decoders, + defaultDecoder: defaultDecoder, + } +} + +// GetDecoder returns the LogsDecoderFactory and encoding name for the given S3 object key. +// It splits the object key once, then iterates encodings in order (most specific first) +// and returns on the first match. Returns an error if no encoding matches. +func (r *logsDecoderRouter) GetDecoder(objectKey string) (encoding.LogsDecoderFactory, string, error) { + targetParts := strings.Split(objectKey, "/") + for _, re := range r.encodings { + if !matchPrefixWithWildcard(targetParts, re.patternParts) { + continue + } + // No encoding => raw passthrough using the default decoder. + if re.enc.Encoding == "" { + return r.defaultDecoder, re.enc.Name, nil + } + decoder, ok := r.decoders[re.enc.Name] + if !ok { + return nil, "", fmt.Errorf("no decoder registered for encoding %q", re.enc.Name) + } + return decoder, re.enc.Name, nil + } + return nil, "", fmt.Errorf("no encoding matches S3 object key: %s", objectKey) +} diff --git a/receiver/awslambdareceiver/router_test.go b/receiver/awslambdareceiver/router_test.go new file mode 100644 index 0000000000000..5f8aa3857ac68 --- /dev/null +++ b/receiver/awslambdareceiver/router_test.go @@ -0,0 +1,148 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awslambdareceiver + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal" +) + +func TestLogsDecoderRouter_GetDecoder(t *testing.T) { + t.Parallel() + + defaultDecoder := internal.NewDefaultS3LogsDecoder() + vpcDecoder := internal.NewDefaultS3LogsDecoder() + ctDecoder := internal.NewDefaultS3LogsDecoder() + + tests := []struct { + name string + encodings []S3Encoding + decoders map[string]encoding.LogsDecoderFactory + objectKey string + expectedFormat string + expectError bool + }{ + { + name: "matches vpcflow default pattern", + encodings: []S3Encoding{ + {Name: "vpcflow", Encoding: "awslogs_encoding/vpc"}, + {Name: "cloudtrail", Encoding: "awslogs_encoding/ct"}, + }, + decoders: map[string]encoding.LogsDecoderFactory{ + "vpcflow": vpcDecoder, + "cloudtrail": ctDecoder, + }, + objectKey: "AWSLogs/123456789012/vpcflowlogs/us-east-1/2024/01/15/file.log.gz", + expectedFormat: "vpcflow", + }, + { + name: "matches cloudtrail default pattern", + encodings: []S3Encoding{ + {Name: "vpcflow", Encoding: "awslogs_encoding/vpc"}, + {Name: "cloudtrail", Encoding: "awslogs_encoding/ct"}, + }, + decoders: map[string]encoding.LogsDecoderFactory{ + "vpcflow": vpcDecoder, + "cloudtrail": ctDecoder, + }, + objectKey: "AWSLogs/123456789012/CloudTrail/us-east-1/2024/01/15/file.json.gz", + expectedFormat: "cloudtrail", + }, + { + name: "catch-all matches unrecognized key", + encodings: []S3Encoding{ + {Name: "vpcflow", Encoding: "awslogs_encoding/vpc"}, + {Name: "catchall", PathPattern: "*"}, + }, + decoders: map[string]encoding.LogsDecoderFactory{ + "vpcflow": vpcDecoder, + }, + objectKey: "random/path/to/file.log", + expectedFormat: "catchall", + }, + { + name: "raw passthrough (no encoding) uses default decoder", + encodings: []S3Encoding{ + {Name: "vpcflow", Encoding: "awslogs_encoding/vpc"}, + {Name: "raw", PathPattern: "raw-logs"}, + }, + decoders: map[string]encoding.LogsDecoderFactory{ + "vpcflow": vpcDecoder, + }, + objectKey: "raw-logs/file.txt", + expectedFormat: "raw", + }, + { + name: "no matching pattern returns error", + encodings: []S3Encoding{ + {Name: "vpcflow", Encoding: "awslogs_encoding/vpc"}, + }, + decoders: map[string]encoding.LogsDecoderFactory{ + "vpcflow": vpcDecoder, + }, + objectKey: "completely/different/path/file.log", + expectError: true, + }, + { + name: "custom path pattern with wildcard", + encodings: []S3Encoding{ + {Name: "custom", Encoding: "custom_encoding", PathPattern: "my-app/*/logs"}, + }, + decoders: map[string]encoding.LogsDecoderFactory{ + "custom": vpcDecoder, + }, + objectKey: "my-app/production/logs/2024/file.log", + expectedFormat: "custom", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + router := newLogsDecoderRouter(tt.encodings, tt.decoders, defaultDecoder) + decoder, formatName, err := router.GetDecoder(tt.objectKey) + + if tt.expectError { + assert.Error(t, err) + assert.Nil(t, decoder) + } else { + require.NoError(t, err) + assert.NotNil(t, decoder) + assert.Equal(t, tt.expectedFormat, formatName) + } + }) + } +} + +func TestLogsDecoderRouter_PatternPriority(t *testing.T) { + t.Parallel() + + defaultDecoder := internal.NewDefaultS3LogsDecoder() + vpcDecoder := internal.NewDefaultS3LogsDecoder() + catchallDecoder := internal.NewDefaultS3LogsDecoder() + + // Formats passed in already sorted (as SortedEncodings() would return). + encodings := []S3Encoding{ + {Name: "vpcflow"}, // default: AWSLogs/*/vpcflowlogs + {Name: "catchall", PathPattern: "*"}, + } + decoders := map[string]encoding.LogsDecoderFactory{ + "vpcflow": vpcDecoder, + "catchall": catchallDecoder, + } + + router := newLogsDecoderRouter(encodings, decoders, defaultDecoder) + + _, name, err := router.GetDecoder("AWSLogs/123/vpcflowlogs/us-east-1/file.log") + require.NoError(t, err) + assert.Equal(t, "vpcflow", name, "VPC flow log should match vpcflow, not catchall") + + _, name, err = router.GetDecoder("random/file.log") + require.NoError(t, err) + assert.Equal(t, "catchall", name, "Random path should fall through to catchall") +} diff --git a/receiver/awslambdareceiver/testdata/config.yaml b/receiver/awslambdareceiver/testdata/config.yaml index 78e1194d9c800..baa21a8d89436 100644 --- a/receiver/awslambdareceiver/testdata/config.yaml +++ b/receiver/awslambdareceiver/testdata/config.yaml @@ -11,4 +11,14 @@ awslambda/json_log_encoding: awslambda/empty_encoding: awslambda/with_failure_arn: - failure_bucket_arn: arn:aws:s3:::example \ No newline at end of file + failure_bucket_arn: arn:aws:s3:::example + +awslambda/s3_multi_encoding: + s3: + encodings: + - name: vpcflow + encoding: awslogs_encoding/vpcflow + - name: cloudtrail + encoding: awslogs_encoding/cloudtrail + - name: catchall + path_pattern: "*" \ No newline at end of file