Skip to content

Commit 900e36d

Browse files
[receiver/awslambda] Add multi-format S3 log routing (#47237)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR adds support for routing S3 objects to different encoding extensions based on their key prefix within a single Lambda deployment. This is useful when a Lambda receives events from S3 buckets that store multiple log types (e.g. VPC Flow Logs and CloudTrail in the same bucket, or across multiple buckets with different log types). We introduce a new `encodings` field in the S3 receiver config that can be used like this: ```yaml extensions: awslogs_encoding/vpcflow: format: vpcflow vpcflow: file_format: plain-text awslogs_encoding/cloudtrail: format: cloudtrail receivers: awslambda: s3: encodings: - name: vpcflow encoding: awslogs_encoding/vpcflow # decode VPC Flow Log fields into structured records - name: cloudtrail encoding: awslogs_encoding/cloudtrail # decode CloudTrail JSON events into structured records path_pattern: "myorg/*/CloudTrail" # optional: override default (AWSLogs/*/CloudTrail); omit to use the default - name: catchall path_pattern: "*" # forward anything else as raw bytes ``` The existing `encoding` field is unchanged. `encoding` and `encodings` are mutually exclusive. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Part of #46458 <!--Describe what testing was performed and which tests were added.--> #### Testing Unit Testing. Pending E2E testing. <!--Describe the documentation added.--> #### Documentation Readme has been updated. <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 658f8cd commit 900e36d

15 files changed

Lines changed: 1304 additions & 83 deletions
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: receiver/awslambda
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add multi-format S3 log routing via the new `encodings` field in the S3 receiver configuration.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [46458]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The new `encodings` list field enables a single Lambda deployment to route S3 objects to
20+
different decoders based on their key prefix.
21+
22+
# If your change doesn't affect end users or the exported elements of any package,
23+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: [user]

receiver/awslambdareceiver/README.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,80 @@ service:
289289

290290
Similar to the first example, this configuration is for collecting ELB access logs stored in S3.
291291

292+
### Multi-Format S3 Configuration (encodings)
293+
294+
The `encodings` field enables routing different S3 object key patterns to different decoders within
295+
a single Lambda deployment. This is useful when a Lambda function receives events from:
296+
297+
- **A single S3 bucket that stores multiple log types** — for example, VPC Flow Logs and CloudTrail
298+
logs written to the same bucket under different key prefixes.
299+
- **Multiple S3 buckets with different log types** — for example, one bucket for VPC Flow Logs and
300+
another for WAF logs, both configured to trigger the same Lambda function.
301+
302+
`encoding` (single, top-level) and `encodings` (list) are mutually exclusive — use one or the other.
303+
304+
Each entry in `encodings` supports three fields:
305+
306+
| Field | Required | Description |
307+
|----------------|----------|-------------|
308+
| `name` | yes | Unique identifier for this entry. For known names (`vpcflow`, `cloudtrail`, etc.) the default `path_pattern` is applied automatically. |
309+
| `encoding` | no | Extension ID of the decoder (e.g. `awslogs_encoding/vpcflow`). Omit to pass content through as raw bytes using the built-in default decoder. |
310+
| `path_pattern` | no* | Prefix pattern matched against the S3 object key. `*` matches one path segment. Omit to use the built-in default for known names. Use `"*"` as a catch-all. |
311+
312+
\* May be omitted only for built-in known names. For any other name, `path_pattern` must be set explicitly (use `"*"` for a catch-all).
313+
314+
Each entry in `encodings` is evaluated in order of pattern specificity (more-specific patterns are
315+
matched first; `"*"` catch-all is matched last). Users may list entries in any order.
316+
317+
#### Combining encodings with extensions
318+
319+
The `encoding` field references a collector extension by its component ID. Each referenced
320+
extension must be declared in the `extensions:` block and listed under `service.extensions`.
321+
322+
The following example decodes VPC Flow Logs and CloudTrail events into structured log records,
323+
and forwards anything else as raw bytes via the catch-all entry:
324+
325+
```yaml
326+
extensions:
327+
awslogs_encoding/vpcflow:
328+
format: vpcflow
329+
vpcflow:
330+
file_format: plain-text
331+
awslogs_encoding/cloudtrail:
332+
format: cloudtrail
333+
334+
receivers:
335+
awslambda:
336+
s3:
337+
encodings:
338+
- name: vpcflow
339+
encoding: awslogs_encoding/vpcflow # decode VPC Flow Log fields into structured records
340+
- name: cloudtrail
341+
encoding: awslogs_encoding/cloudtrail # decode CloudTrail JSON events into structured records
342+
path_pattern: "myorg/*/CloudTrail" # optional: override default (AWSLogs/*/CloudTrail); omit to use the default
343+
- name: catchall
344+
path_pattern: "*" # forward anything else as raw bytes
345+
346+
```
347+
348+
#### Built-in default path patterns
349+
350+
The following well-known names have built-in default path patterns. When `path_pattern` is omitted
351+
for these names, the receiver uses the corresponding default.
352+
353+
| Name | Default path pattern |
354+
|-------------------|-----------------------------------------|
355+
| `vpcflow` | `AWSLogs/*/vpcflowlogs` |
356+
| `cloudtrail` | `AWSLogs/*/CloudTrail` |
357+
| `elbaccess` | `AWSLogs/*/elasticloadbalancing` |
358+
| `waf` | `AWSLogs/*/WAFLogs` |
359+
| `networkfirewall` | `AWSLogs/*/network-firewall` |
360+
361+
In the default patterns `*` matches exactly one path segment (the AWS account ID in standard AWS
362+
log paths).
363+
364+
For any name not listed above, `path_pattern` must be specified explicitly.
365+
292366
### Example 3: CloudWatch Logs using CloudWatch Subscription Filters
293367

294368
```yaml

receiver/awslambdareceiver/benchmark_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func BenchmarkHandleS3Notification(b *testing.B) {
3636
service := internal.NewMockS3Service(gomock.NewController(b))
3737
service.EXPECT().ReadObject(gomock.Any(), bucket, object).Return([]byte("bucket content"), nil).AnyTimes()
3838

39-
handler := newS3LogsHandler(service, zap.NewNop(), &customLogUnmarshaler{}, &noOpLogsConsumer{})
39+
handler := newS3LogsHandler(service, zap.NewNop(), fixedLogsDecoder(&customLogUnmarshaler{}), &noOpLogsConsumer{})
4040

4141
b.Run("HandleS3Event", func(b *testing.B) {
4242
b.ReportAllocs()

receiver/awslambdareceiver/config.go

Lines changed: 134 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,38 +4,152 @@
44
package awslambdareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver"
55

66
import (
7+
"errors"
78
"fmt"
9+
"sort"
810
"strings"
911

1012
"go.opentelemetry.io/collector/component"
1113
)
1214

1315
const s3ARNPrefix = "arn:aws:s3:::"
1416

15-
type Config struct {
16-
// S3 defines configuration options for S3 Lambda trigger.
17-
S3 sharedConfig `mapstructure:"s3"`
17+
// defaultS3PathPatterns maps known encoding names to their default S3 path patterns.
18+
// "*" matches exactly one path segment (the AWS account ID in standard AWS log paths).
19+
var defaultS3PathPatterns = map[string]string{
20+
"vpcflow": "AWSLogs/*/vpcflowlogs",
21+
"cloudtrail": "AWSLogs/*/CloudTrail",
22+
"elbaccess": "AWSLogs/*/elasticloadbalancing",
23+
"waf": "AWSLogs/*/WAFLogs",
24+
"networkfirewall": "AWSLogs/*/network-firewall",
25+
}
1826

19-
// CloudWatch defines configuration options for CloudWatch Lambda trigger.
20-
CloudWatch sharedConfig `mapstructure:"cloudwatch"`
27+
// S3Encoding defines one entry in the S3 multi-encoding routing table.
28+
type S3Encoding struct {
29+
// Name identifies the encoding. For known names (vpcflow, cloudtrail, elbaccess, waf,
30+
// networkfirewall) the default path_pattern is applied automatically.
31+
Name string `mapstructure:"name"`
2132

22-
// FailureBucketARN is the ARN of receiver deployment Lambda's error destination.
23-
FailureBucketARN string `mapstructure:"failure_bucket_arn"`
33+
// Encoding is the extension ID for decoding (e.g. "awslogs_encoding/vpcflow").
34+
// If empty, content is passed through as-is using the built-in raw decoder.
35+
Encoding string `mapstructure:"encoding"`
36+
37+
// PathPattern is matched as a prefix against the S3 object key.
38+
// "*" matches exactly one path segment. Example: "AWSLogs/*/vpcflowlogs"
39+
// If empty, the default pattern for a known Name is used.
40+
PathPattern string `mapstructure:"path_pattern"`
41+
}
42+
43+
// resolvePathPattern returns the effective path pattern for this encoding entry.
44+
// Returns the configured PathPattern if set, else the default for known names.
45+
func (e *S3Encoding) resolvePathPattern() string {
46+
if e.PathPattern != "" {
47+
return e.PathPattern
48+
}
49+
return defaultS3PathPatterns[e.Name]
50+
}
2451

25-
_ struct{} // Prevent unkeyed literal initialization
52+
// Validate validates an S3Encoding entry.
53+
func (e *S3Encoding) Validate() error {
54+
if e.Name == "" {
55+
return errors.New("'name' is required")
56+
}
57+
// Unknown name without an explicit path_pattern is not routable.
58+
if e.PathPattern == "" {
59+
if _, ok := defaultS3PathPatterns[e.Name]; !ok {
60+
return fmt.Errorf("'path_pattern' is required for encoding %q (no default available); use %q for catch-all", e.Name, catchAllPattern)
61+
}
62+
}
63+
return nil
2664
}
2765

28-
// sharedConfig defines configuration options shared between different AWS Lambda trigger types.
29-
// Note - we may extend or split this into dedicated structures per trigger type in future if needed.
66+
// sharedConfig defines configuration options shared between Lambda trigger types.
3067
type sharedConfig struct {
3168
// Encoding defines the encoding to decode incoming Lambda invocation data.
32-
// This extension is expected to further process content of the events that are extracted from Lambda trigger.
33-
//
34-
// If receiving data is in different formats(ex:- a mix of VPC flow logs, CloudTrail logs), receiver is recommended
35-
// to have separate Lambda functions with specific extension configurations.
3669
Encoding string `mapstructure:"encoding"`
3770
}
3871

72+
// S3Config defines configuration options for the S3 Lambda trigger.
73+
// It supersedes the sharedConfig for the s3 key.
74+
// sharedConfig is embedded with mapstructure:",squash" so the "encoding" key
75+
// remains at the top level of the s3 block in YAML — fully backwards-compatible.
76+
type S3Config struct {
77+
sharedConfig `mapstructure:",squash"`
78+
79+
// Encodings defines multiple encoding entries for S3 path-based routing (multi-format mode).
80+
// Each entry maps a path_pattern prefix to an encoding extension.
81+
// Mutually exclusive with sharedConfig.Encoding.
82+
//
83+
// Only supported for logs signal type. Metrics receivers reject configs that set this field.
84+
Encodings []S3Encoding `mapstructure:"encodings"`
85+
}
86+
87+
// Validate validates the S3Config.
88+
func (c *S3Config) Validate() error {
89+
if c.Encoding != "" && len(c.Encodings) > 0 {
90+
return errors.New("'encoding' and 'encodings' are mutually exclusive; use 'encodings' for multi-format support")
91+
}
92+
seen := make(map[string]bool, len(c.Encodings))
93+
for i, e := range c.Encodings {
94+
if err := e.Validate(); err != nil {
95+
return fmt.Errorf("encodings[%d]: %w", i, err)
96+
}
97+
if seen[e.Name] {
98+
return fmt.Errorf("encodings[%d]: duplicate encoding name %q", i, e.Name)
99+
}
100+
seen[e.Name] = true
101+
}
102+
return nil
103+
}
104+
105+
// sortedEncodings returns a copy of Encodings sorted by path pattern specificity:
106+
// more-specific patterns first, catch-all "*" last.
107+
// This makes matching order-independent — users can list encodings in any order.
108+
func (c *S3Config) sortedEncodings() []S3Encoding {
109+
if len(c.Encodings) == 0 {
110+
return nil
111+
}
112+
sorted := make([]S3Encoding, len(c.Encodings))
113+
copy(sorted, c.Encodings)
114+
115+
// Pre-split patterns once; keyed by pattern string so the map stays correct
116+
// as the sort swaps elements.
117+
splitCache := make(map[string][]string, len(sorted))
118+
for _, enc := range sorted {
119+
p := enc.resolvePathPattern()
120+
if _, ok := splitCache[p]; !ok {
121+
splitCache[p] = strings.Split(p, "/")
122+
}
123+
}
124+
125+
sort.SliceStable(sorted, func(i, j int) bool {
126+
pi := sorted[i].resolvePathPattern()
127+
pj := sorted[j].resolvePathPattern()
128+
if isCatchAllPattern(pi) && !isCatchAllPattern(pj) {
129+
return false
130+
}
131+
if !isCatchAllPattern(pi) && isCatchAllPattern(pj) {
132+
return true
133+
}
134+
return comparePatternSpecificity(splitCache[pi], splitCache[pj]) < 0
135+
})
136+
return sorted
137+
}
138+
139+
// Config is the top-level configuration for the awslambda receiver.
140+
type Config struct {
141+
// S3 defines configuration for the S3 Lambda trigger.
142+
S3 S3Config `mapstructure:"s3"`
143+
144+
// CloudWatch defines configuration for the CloudWatch Logs Lambda trigger.
145+
CloudWatch sharedConfig `mapstructure:"cloudwatch"`
146+
147+
// FailureBucketARN is the ARN of the S3 bucket used to store failed Lambda event records.
148+
FailureBucketARN string `mapstructure:"failure_bucket_arn"`
149+
150+
_ struct{} // Prevent unkeyed literal initialization.
151+
}
152+
39153
var _ component.Config = (*Config)(nil)
40154

41155
func createDefaultConfig() component.Config {
@@ -44,31 +158,26 @@ func createDefaultConfig() component.Config {
44158

45159
func (c *Config) Validate() error {
46160
if c.FailureBucketARN != "" {
47-
_, err := getBucketNameFromARN(c.FailureBucketARN)
48-
if err != nil {
161+
if _, err := getBucketNameFromARN(c.FailureBucketARN); err != nil {
49162
return fmt.Errorf("invalid failure_bucket_arn: %w", err)
50163
}
51164
}
52-
165+
if err := c.S3.Validate(); err != nil {
166+
return fmt.Errorf("invalid s3 config: %w", err)
167+
}
53168
return nil
54169
}
55170

56-
// getBucketNameFromARN extracts S3 bucket name from ARN
57-
// Example
58-
//
59-
// arn = "arn:aws:s3:::myBucket/folderA
60-
// result = myBucket
171+
// getBucketNameFromARN extracts the S3 bucket name from an ARN.
172+
// Example: "arn:aws:s3:::myBucket/folderA" => "myBucket"
61173
func getBucketNameFromARN(arn string) (string, error) {
62174
if !strings.HasPrefix(arn, s3ARNPrefix) {
63175
return "", fmt.Errorf("invalid S3 ARN format: %s", arn)
64176
}
65-
66177
s3Path := strings.TrimPrefix(arn, s3ARNPrefix)
67178
bucket, _, _ := strings.Cut(s3Path, "/")
68-
69179
if bucket == "" {
70180
return "", fmt.Errorf("invalid S3 ARN format, bucket name missing: %s", arn)
71181
}
72-
73182
return bucket, nil
74183
}
Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,42 @@
1+
$defs:
2+
s_3_config:
3+
description: S3Config defines configuration options for the S3 Lambda trigger. It supersedes the sharedConfig for the s3 key. sharedConfig is embedded with mapstructure:",squash" so the "encoding" key remains at the top level of the s3 block in YAML — fully backwards-compatible.
4+
type: object
5+
properties:
6+
encoding:
7+
description: Encoding defines the encoding to decode incoming Lambda invocation data.
8+
type: string
9+
encodings:
10+
description: Encodings defines multiple encoding entries for S3 path-based routing (multi-format mode). Each entry maps a path_pattern prefix to an encoding extension. Mutually exclusive with sharedConfig.Encoding. Only supported for logs signal type. Metrics receivers reject configs that set this field.
11+
type: array
12+
items:
13+
$ref: s_3_encoding
14+
s_3_encoding:
15+
description: S3Encoding defines one entry in the S3 multi-encoding routing table.
16+
type: object
17+
properties:
18+
encoding:
19+
description: Encoding is the extension ID for decoding (e.g. "awslogs_encoding/vpcflow"). If empty, content is passed through as-is using the built-in raw decoder.
20+
type: string
21+
name:
22+
description: Name identifies the encoding. For known names (vpcflow, cloudtrail, elbaccess, waf, networkfirewall) the default path_pattern is applied automatically.
23+
type: string
24+
path_pattern:
25+
description: 'PathPattern is matched as a prefix against the S3 object key. "*" matches exactly one path segment. Example: "AWSLogs/*/vpcflowlogs" If empty, the default pattern for a known Name is used.'
26+
type: string
27+
description: Config is the top-level configuration for the awslambda receiver.
128
type: object
229
properties:
330
cloudwatch:
4-
description: CloudWatch defines configuration options for CloudWatch Lambda trigger.
31+
description: CloudWatch defines configuration for the CloudWatch Logs Lambda trigger.
532
type: object
633
properties:
734
encoding:
8-
description: Encoding defines the encoding to decode incoming Lambda invocation data. This extension is expected to further process content of the events that are extracted from Lambda trigger. If receiving data is in different formats(ex:- a mix of VPC flow logs, CloudTrail logs), receiver is recommended to have separate Lambda functions with specific extension configurations.
35+
description: Encoding defines the encoding to decode incoming Lambda invocation data.
936
type: string
1037
failure_bucket_arn:
11-
description: FailureBucketARN is the ARN of receiver deployment Lambda's error destination.
38+
description: FailureBucketARN is the ARN of the S3 bucket used to store failed Lambda event records.
1239
type: string
1340
s3:
14-
description: S3 defines configuration options for S3 Lambda trigger.
15-
type: object
16-
properties:
17-
encoding:
18-
description: Encoding defines the encoding to decode incoming Lambda invocation data. This extension is expected to further process content of the events that are extracted from Lambda trigger. If receiving data is in different formats(ex:- a mix of VPC flow logs, CloudTrail logs), receiver is recommended to have separate Lambda functions with specific extension configurations.
19-
type: string
41+
description: S3 defines configuration for the S3 Lambda trigger.
42+
$ref: s_3_config

0 commit comments

Comments
 (0)