Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .chloggen/feature_awslambdareceiver-s3-multi-format.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/awslambda

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add multi-format S3 log routing via the new `encodings` field in the S3 receiver configuration.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [46458]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The new `encodings` list field enables a single Lambda deployment to route S3 objects to
different decoders based on their key prefix.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
74 changes: 74 additions & 0 deletions receiver/awslambdareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,80 @@ service:

Similar to the first example, this configuration is for collecting ELB access logs stored in S3.

### Multi-Format S3 Configuration (encodings)

The `encodings` field enables routing different S3 object key patterns to different decoders within
a single Lambda deployment. This is useful when a Lambda function receives events from:

- **A single S3 bucket that stores multiple log types** — for example, VPC Flow Logs and CloudTrail
logs written to the same bucket under different key prefixes.
- **Multiple S3 buckets with different log types** — for example, one bucket for VPC Flow Logs and
another for WAF logs, both configured to trigger the same Lambda function.

`encoding` (single, top-level) and `encodings` (list) are mutually exclusive — use one or the other.

Each entry in `encodings` supports three fields:

| Field | Required | Description |
|----------------|----------|-------------|
| `name` | yes | Unique identifier for this entry. For known names (`vpcflow`, `cloudtrail`, etc.) the default `path_pattern` is applied automatically. |
| `encoding` | no | Extension ID of the decoder (e.g. `awslogs_encoding/vpcflow`). Omit to pass content through as raw bytes using the built-in default decoder. |
| `path_pattern` | no* | Prefix pattern matched against the S3 object key. `*` matches one path segment. Omit to use the built-in default for known names. Use `"*"` as a catch-all. |

\* May be omitted only for built-in known names. For any other name, `path_pattern` must be set explicitly (use `"*"` for a catch-all).

Each entry in `encodings` is evaluated in order of pattern specificity (more-specific patterns are
matched first; `"*"` catch-all is matched last). Users may list entries in any order.

#### Combining encodings with extensions

The `encoding` field references a collector extension by its component ID. Each referenced
extension must be declared in the `extensions:` block and listed under `service.extensions`.

The following example decodes VPC Flow Logs and CloudTrail events into structured log records,
and forwards anything else as raw bytes via the catch-all entry:

```yaml
extensions:
awslogs_encoding/vpcflow:
format: vpcflow
vpcflow:
file_format: plain-text
awslogs_encoding/cloudtrail:
format: cloudtrail

receivers:
awslambda:
s3:
encodings:
- name: vpcflow
encoding: awslogs_encoding/vpcflow # decode VPC Flow Log fields into structured records
- name: cloudtrail
encoding: awslogs_encoding/cloudtrail # decode CloudTrail JSON events into structured records
path_pattern: "myorg/*/CloudTrail" # optional: override default (AWSLogs/*/CloudTrail); omit to use the default
- name: catchall
path_pattern: "*" # forward anything else as raw bytes

```

#### Built-in default path patterns

The following well-known names have built-in default path patterns. When `path_pattern` is omitted
for these names, the receiver uses the corresponding default.

| Name | Default path pattern |
|-------------------|-----------------------------------------|
| `vpcflow` | `AWSLogs/*/vpcflowlogs` |
| `cloudtrail` | `AWSLogs/*/CloudTrail` |
| `elbaccess` | `AWSLogs/*/elasticloadbalancing` |
| `waf` | `AWSLogs/*/WAFLogs` |
| `networkfirewall` | `AWSLogs/*/network-firewall` |

In the default patterns `*` matches exactly one path segment (the AWS account ID in standard AWS
log paths).

For any name not listed above, `path_pattern` must be specified explicitly.

### Example 3: CloudWatch Logs using CloudWatch Subscription Filters

```yaml
Expand Down
161 changes: 136 additions & 25 deletions receiver/awslambdareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,154 @@
package awslambdareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver"

import (
"errors"
"fmt"
"sort"
"strings"

"go.opentelemetry.io/collector/component"
)

const s3ARNPrefix = "arn:aws:s3:::"

type Config struct {
// S3 defines configuration options for S3 Lambda trigger.
S3 sharedConfig `mapstructure:"s3"`
// defaultS3PathPatterns maps known encoding names to their default S3 path patterns.
// "*" matches exactly one path segment (the AWS account ID in standard AWS log paths).
var defaultS3PathPatterns = map[string]string{
"vpcflow": "AWSLogs/*/vpcflowlogs",
"cloudtrail": "AWSLogs/*/CloudTrail",
"elbaccess": "AWSLogs/*/elasticloadbalancing",
"waf": "AWSLogs/*/WAFLogs",
"networkfirewall": "AWSLogs/*/network-firewall",
}

// CloudWatch defines configuration options for CloudWatch Lambda trigger.
CloudWatch sharedConfig `mapstructure:"cloudwatch"`
// S3Encoding defines one entry in the S3 multi-encoding routing table.
type S3Encoding struct {
// Name identifies the encoding. For known names (vpcflow, cloudtrail, elbaccess, waf,
// networkfirewall) the default path_pattern is applied automatically.
Name string `mapstructure:"name"`

// FailureBucketARN is the ARN of receiver deployment Lambda's error destination.
FailureBucketARN string `mapstructure:"failure_bucket_arn"`
// Encoding is the extension ID for decoding (e.g. "awslogs_encoding/vpcflow").
// If empty, content is passed through as-is using the built-in raw decoder.
Encoding string `mapstructure:"encoding"`

// PathPattern is matched as a prefix against the S3 object key.
// "*" matches exactly one path segment. Example: "AWSLogs/*/vpcflowlogs"
// If empty, the default pattern for a known Name is used.
PathPattern string `mapstructure:"path_pattern"`
}

_ struct{} // Prevent unkeyed literal initialization
// ResolvePathPattern returns the effective path pattern for this encoding entry.
// Returns the configured PathPattern if set, else the default for known names.
func (e *S3Encoding) ResolvePathPattern() string {
if e.PathPattern != "" {
return e.PathPattern
}
return defaultS3PathPatterns[e.Name]
}

// sharedConfig defines configuration options shared between different AWS Lambda trigger types.
// Note - we may extend or split this into dedicated structures per trigger type in future if needed.
// Validate validates an S3Encoding entry.
func (e *S3Encoding) Validate() error {
if e.Name == "" {
return errors.New("'name' is required")
}
// Catch-all is always valid.
if e.PathPattern == catchAllPattern {
return nil
}
// Unknown name without an explicit path_pattern is not routable.
if e.PathPattern == "" {
if _, ok := defaultS3PathPatterns[e.Name]; !ok {
return fmt.Errorf("'path_pattern' is required for encoding %q (no default available); use %q for catch-all", e.Name, catchAllPattern)
}
}
return nil
}

// sharedConfig defines configuration options shared between Lambda trigger types.
type sharedConfig struct {
// Encoding defines the encoding to decode incoming Lambda invocation data.
// This extension is expected to further process content of the events that are extracted from Lambda trigger.
//
// If receiving data is in different formats(ex:- a mix of VPC flow logs, CloudTrail logs), receiver is recommended
// to have separate Lambda functions with specific extension configurations.
Encoding string `mapstructure:"encoding"`
}

// S3Config defines configuration options for the S3 Lambda trigger.
// It supersedes the sharedConfig for the s3 key.
// sharedConfig is embedded with mapstructure:",squash" so the "encoding" key
// remains at the top level of the s3 block in YAML — fully backwards-compatible.
type S3Config struct {
sharedConfig `mapstructure:",squash"`

// Encodings defines multiple encoding entries for S3 path-based routing (multi-format mode).
// Each entry maps a path_pattern prefix to an encoding extension.
// Mutually exclusive with sharedConfig.Encoding.
Encodings []S3Encoding `mapstructure:"encodings"`
}

// Validate validates the S3Config.
func (c *S3Config) Validate() error {
if c.Encoding != "" && len(c.Encodings) > 0 {
return errors.New("'encoding' and 'encodings' are mutually exclusive; use 'encodings' for multi-format support")
}
seen := make(map[string]bool, len(c.Encodings))
for i, e := range c.Encodings {
if err := e.Validate(); err != nil {
return fmt.Errorf("encodings[%d]: %w", i, err)
}
if seen[e.Name] {
return fmt.Errorf("encodings[%d]: duplicate encoding name %q", i, e.Name)
}
seen[e.Name] = true
}
return nil
}

// SortedEncodings returns a copy of Encodings sorted by path pattern specificity:
// more-specific patterns first, catch-all "*" last.
// This makes matching order-independent — users can list encodings in any order.
func (c *S3Config) SortedEncodings() []S3Encoding {
if len(c.Encodings) == 0 {
return nil
}
sorted := make([]S3Encoding, len(c.Encodings))
copy(sorted, c.Encodings)

// Pre-split patterns once; keyed by pattern string so the map stays correct
// as the sort swaps elements.
splitCache := make(map[string][]string, len(sorted))
for _, enc := range sorted {
p := enc.ResolvePathPattern()
if _, ok := splitCache[p]; !ok {
splitCache[p] = strings.Split(p, "/")
}
}

sort.SliceStable(sorted, func(i, j int) bool {
pi := sorted[i].ResolvePathPattern()
pj := sorted[j].ResolvePathPattern()
if isCatchAllPattern(pi) && !isCatchAllPattern(pj) {
return false
}
if !isCatchAllPattern(pi) && isCatchAllPattern(pj) {
return true
}
return comparePatternSpecificity(splitCache[pi], splitCache[pj]) < 0
})
return sorted
}

// Config is the top-level configuration for the awslambda receiver.
type Config struct {
// S3 defines configuration for the S3 Lambda trigger.
S3 S3Config `mapstructure:"s3"`

// CloudWatch defines configuration for the CloudWatch Logs Lambda trigger.
CloudWatch sharedConfig `mapstructure:"cloudwatch"`

// FailureBucketARN is the ARN of the S3 bucket used to store failed Lambda event records.
FailureBucketARN string `mapstructure:"failure_bucket_arn"`

_ struct{} // Prevent unkeyed literal initialization.
}

var _ component.Config = (*Config)(nil)

func createDefaultConfig() component.Config {
Expand All @@ -44,31 +160,26 @@ func createDefaultConfig() component.Config {

func (c *Config) Validate() error {
if c.FailureBucketARN != "" {
_, err := getBucketNameFromARN(c.FailureBucketARN)
if err != nil {
if _, err := getBucketNameFromARN(c.FailureBucketARN); err != nil {
return fmt.Errorf("invalid failure_bucket_arn: %w", err)
}
}

if err := c.S3.Validate(); err != nil {
return fmt.Errorf("invalid s3 config: %w", err)
}
return nil
}

// getBucketNameFromARN extracts S3 bucket name from ARN
// Example
//
// arn = "arn:aws:s3:::myBucket/folderA
// result = myBucket
// getBucketNameFromARN extracts the S3 bucket name from an ARN.
// Example: "arn:aws:s3:::myBucket/folderA" => "myBucket"
func getBucketNameFromARN(arn string) (string, error) {
if !strings.HasPrefix(arn, s3ARNPrefix) {
return "", fmt.Errorf("invalid S3 ARN format: %s", arn)
}

s3Path := strings.TrimPrefix(arn, s3ARNPrefix)
bucket, _, _ := strings.Cut(s3Path, "/")

if bucket == "" {
return "", fmt.Errorf("invalid S3 ARN format, bucket name missing: %s", arn)
}

return bucket, nil
}
41 changes: 32 additions & 9 deletions receiver/awslambdareceiver/config.schema.yaml
Original file line number Diff line number Diff line change
@@ -1,19 +1,42 @@
$defs:
s_3_config:
description: S3Config defines configuration options for the S3 Lambda trigger. It supersedes the sharedConfig for the s3 key. sharedConfig is embedded with mapstructure:",squash" so the "encoding" key remains at the top level of the s3 block in YAML — fully backwards-compatible.
type: object
properties:
encoding:
description: Encoding defines the encoding to decode incoming Lambda invocation data.
type: string
encodings:
description: Encodings defines multiple encoding entries for S3 path-based routing (multi-format mode). Each entry maps a path_pattern prefix to an encoding extension. Mutually exclusive with sharedConfig.Encoding.
type: array
items:
$ref: s_3_encoding
s_3_encoding:
description: S3Encoding defines one entry in the S3 multi-encoding routing table.
type: object
properties:
encoding:
description: Encoding is the extension ID for decoding (e.g. "awslogs_encoding/vpcflow"). If empty, content is passed through as-is using the built-in raw decoder.
type: string
name:
description: Name identifies the encoding. For known names (vpcflow, cloudtrail, elbaccess, waf, networkfirewall) the default path_pattern is applied automatically.
type: string
path_pattern:
description: 'PathPattern is matched as a prefix against the S3 object key. "*" matches exactly one path segment. Example: "AWSLogs/*/vpcflowlogs" If empty, the default pattern for a known Name is used.'
type: string
description: Config is the top-level configuration for the awslambda receiver.
type: object
properties:
cloudwatch:
description: CloudWatch defines configuration options for CloudWatch Lambda trigger.
description: CloudWatch defines configuration for the CloudWatch Logs Lambda trigger.
type: object
properties:
encoding:
description: Encoding defines the encoding to decode incoming Lambda invocation data. This extension is expected to further process content of the events that are extracted from Lambda trigger. If receiving data is in different formats(ex:- a mix of VPC flow logs, CloudTrail logs), receiver is recommended to have separate Lambda functions with specific extension configurations.
description: Encoding defines the encoding to decode incoming Lambda invocation data.
type: string
failure_bucket_arn:
description: FailureBucketARN is the ARN of receiver deployment Lambda's error destination.
description: FailureBucketARN is the ARN of the S3 bucket used to store failed Lambda event records.
type: string
s3:
description: S3 defines configuration options for S3 Lambda trigger.
type: object
properties:
encoding:
description: Encoding defines the encoding to decode incoming Lambda invocation data. This extension is expected to further process content of the events that are extracted from Lambda trigger. If receiving data is in different formats(ex:- a mix of VPC flow logs, CloudTrail logs), receiver is recommended to have separate Lambda functions with specific extension configurations.
type: string
description: S3 defines configuration for the S3 Lambda trigger.
$ref: s_3_config
Loading
Loading