Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/feat_cloudtrail-from-cloudwatch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: extension/awslogs_encoding

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for processing CloudTrail logs from CloudWatch subscription filters

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [45354]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"messageType":"DATA_MESSAGE","owner":"123456789010","logGroup":"/aws/cloudtrail/logs","logStream":"123456789010_CloudTrail_us-east-1","subscriptionFilters":["CloudTrailLogSubscription"],"logEvents":[{"id":"eventId1","timestamp":1418530010000,"message":"{\"eventVersion\":\"1.11\",\"userIdentity\":{\"type\":\"AWSService\",\"invokedBy\":\"s3.amazonaws.com\"},\"eventTime\":\"2026-01-28T09:26:59Z\",\"eventSource\":\"s3.amazonaws.com\",\"eventName\":\"PutObject\",\"awsRegion\":\"eu-west-1\",\"sourceIPAddress\":\"s3.amazonaws.com\",\"userAgent\":\"s3.amazonaws.com\",\"requestParameters\":{\"bucketName\":\"elastic-sar-bucket-fe44ff52c91b2efa4bccb8e07c85a202\",\"Host\":\"s3.eu-west-1.amazonaws.com\",\"key\":\"AWSLogs/627286350134/s3/2026-01-28-09-26-59-AB82D7306CDE5EF8\"},\"responseElements\":{\"x-amz-server-side-encryption\":\"AES256\"},\"additionalEventData\":{\"SignatureVersion\":\"SigV4\",\"CipherSuite\":\"TLS_AES_128_GCM_SHA256\",\"bytesTransferredIn\":576,\"SSEApplied\":\"Default_SSE_S3\",\"AuthenticationMethod\":\"AuthHeader\",\"x-amz-id-2\":\"8QsIi8XYiGCTf9wDCAncwPM9QKnLiD3Suz4Gg6fcnNzKucKThMi2CYA67zvsIIF9BxHHs1ZPopka8xkHvWV1yem9xlB2NXGJ\",\"bytesTransferredOut\":0},\"requestID\":\"EH45WHPCAE9G5SX8\",\"eventID\":\"5cb53c22-36b5-3ca3-a389-dae3eec919df\",\"readOnly\":false,\"resources\":[{\"accountId\":\"627286350134\",\"type\":\"AWS::S3::Bucket\",\"ARN\":\"arn:aws:s3:::elastic-sar-bucket-fe44ff52c91b2efa4bccb8e07c85a202\"},{\"type\":\"AWS::S3::Object\",\"ARN\":\"arn:aws:s3:::elastic-sar-bucket-fe44ff52c91b2efa4bccb8e07c85a202/AWSLogs/627286350134/s3/2026-01-28-09-26-59-AB82D7306CDE5EF8\"}],\"eventType\":\"AwsApiCall\",\"managementEvent\":false,\"recipientAccountId\":\"627286350134\",\"sharedEventID\":\"ed5bfe2a-b8d8-4a7a-9267-fb58ca3b2f77\",\"vpcEndpointId\":\"AWS Internal\",\"vpcEndpointAccountId\":\"AWS Internal\",\"eventCategory\":\"Data\"}"}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
resourceLogs:
- resource:
attributes:
- key: cloud.provider
value:
stringValue: aws
- key: aws.log.group.names
value:
stringValue: /aws/cloudtrail/logs
- key: aws.log.stream.names
value:
stringValue: 123456789010_CloudTrail_us-east-1
- key: cloud.region
value:
stringValue: eu-west-1
- key: cloud.account.id
value:
stringValue: "627286350134"
scopeLogs:
- logRecords:
- attributes:
- key: aws.cloudtrail.event_version
value:
stringValue: "1.11"
- key: aws.cloudtrail.event_id
value:
stringValue: 5cb53c22-36b5-3ca3-a389-dae3eec919df
- key: rpc.method
value:
stringValue: PutObject
- key: rpc.system
value:
stringValue: AwsApiCall
- key: rpc.service
value:
stringValue: s3.amazonaws.com
- key: aws.request_id
value:
stringValue: EH45WHPCAE9G5SX8
- key: aws.event.category
value:
stringValue: Data
- key: aws.event.read_only
value:
boolValue: false
- key: aws.event.management
value:
boolValue: false
- key: source.address
value:
stringValue: s3.amazonaws.com
- key: user_agent.original
value:
stringValue: s3.amazonaws.com
- key: aws.user_identity.invoked_by
value:
stringValue: s3.amazonaws.com
- key: aws.user_identity.principal.type
value:
stringValue: AWSService
- key: aws.shared_event_id
value:
stringValue: ed5bfe2a-b8d8-4a7a-9267-fb58ca3b2f77
- key: aws.request.parameters
value:
kvlistValue:
values:
- key: bucketName
value:
stringValue: elastic-sar-bucket-fe44ff52c91b2efa4bccb8e07c85a202
- key: Host
value:
stringValue: s3.eu-west-1.amazonaws.com
- key: key
value:
stringValue: AWSLogs/627286350134/s3/2026-01-28-09-26-59-AB82D7306CDE5EF8
- key: aws.response.elements
value:
kvlistValue:
values:
- key: x-amz-server-side-encryption
value:
stringValue: AES256
- key: aws.cloudtrail.additional_event_data
value:
kvlistValue:
values:
- key: CipherSuite
value:
stringValue: TLS_AES_128_GCM_SHA256
- key: bytesTransferredIn
value:
doubleValue: 576
- key: SSEApplied
value:
stringValue: Default_SSE_S3
- key: AuthenticationMethod
value:
stringValue: AuthHeader
- key: x-amz-id-2
value:
stringValue: 8QsIi8XYiGCTf9wDCAncwPM9QKnLiD3Suz4Gg6fcnNzKucKThMi2CYA67zvsIIF9BxHHs1ZPopka8xkHvWV1yem9xlB2NXGJ
- key: bytesTransferredOut
value:
doubleValue: 0
- key: SignatureVersion
value:
stringValue: SigV4
- key: aws.resources
value:
arrayValue:
values:
- kvlistValue:
values:
- key: account.id
value:
stringValue: "627286350134"
- key: type
value:
stringValue: AWS::S3::Bucket
- key: arn
value:
stringValue: arn:aws:s3:::elastic-sar-bucket-fe44ff52c91b2efa4bccb8e07c85a202
- kvlistValue:
values:
- key: type
value:
stringValue: AWS::S3::Object
- key: arn
value:
stringValue: arn:aws:s3:::elastic-sar-bucket-fe44ff52c91b2efa4bccb8e07c85a202/AWSLogs/627286350134/s3/2026-01-28-09-26-59-AB82D7306CDE5EF8
body: {}
timeUnixNano: "1769592419000000000"
scope:
attributes:
- key: encoding.format
value:
stringValue: aws.cloudtrail
name: github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension
version: test-version
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"
"time"

"github.com/aws/aws-lambda-go/events"
gojson "github.com/goccy/go-json"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -163,11 +164,16 @@ func (u *CloudTrailLogUnmarshaler) UnmarshalAWSLogs(reader io.Reader) (plog.Logs
return plog.Logs{}, fmt.Errorf("failed to extract the first JSON key: %w", err)
}

// CloudWatch subscription filter format
if firstKey == "messageType" {
return u.fromCloudWatch(bufferedReader)
}
Comment on lines +167 to +170
Copy link
Contributor

@Kavindu-Dodan Kavindu-Dodan Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this won't work with CloudWatch subscription filter.

  • For CloudTrail to S3, we have well-defined Records root key 1, so we can work with firstKey
  • For CloudTrail digest, we have a fallback as there's no root key that helps to detect type 2
  • For CloudWatch subscription filter, there are also multiple root keys 3, so it's not guaranteed to get messageType as firstKey. It could be owner, logGroup and depends on AWS unmarshaling to JSON

So IMO we must fallback in this order,

  • First check firstKey to be Records : This is guaranteed per current contract -> Stream processing of the JSON
  • Then as fallback, perform a json.RawMesage unmarshal
    • Check for known CW key like logGroup , messageType -> This is CW trigger
    • Try unmarshaling as Digest record

Let me know your thoughts :)

Footnotes

  1. https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-log-file-examples.html

  2. https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-log-file-validation-digest-file-structure.html

  3. https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#LambdaFunctionExample


decoder := gojson.NewDecoder(bufferedReader)

// Records indicates a CloudTrail log file
// Records indicates a CloudTrail log file from S3
if firstKey == "Records" {
return u.processRecords(decoder)
return u.fromS3(decoder)
}

// Try to parse as a CloudTrail digest record
Expand All @@ -180,11 +186,55 @@ func (u *CloudTrailLogUnmarshaler) UnmarshalAWSLogs(reader io.Reader) (plog.Logs
return u.processDigestRecord(cloudTrailDigest)
}

// fromCloudWatch handles CloudTrail logs from CloudWatch Logs subscription filter
func (u *CloudTrailLogUnmarshaler) fromCloudWatch(reader *bufio.Reader) (plog.Logs, error) {
var cwLog events.CloudwatchLogsData
if err := gojson.NewDecoder(reader).Decode(&cwLog); err != nil {
return plog.Logs{}, fmt.Errorf("failed to unmarshal data as cloudwatch logs event: %w", err)
}

if len(cwLog.LogEvents) == 0 {
return plog.NewLogs(), nil
}

logs, resourceLogs, scopeLogs := u.createLogs()

// Set CloudWatch-specific resource attributes
resourceAttrs := resourceLogs.Resource().Attributes()
resourceAttrs.PutStr(string(conventions.CloudProviderKey), conventions.CloudProviderAWS.Value.AsString())
resourceAttrs.PutStr(string(conventions.AWSLogGroupNamesKey), cwLog.LogGroup)
resourceAttrs.PutStr(string(conventions.AWSLogStreamNamesKey), cwLog.LogStream)

logRecords := scopeLogs.LogRecords()
logRecords.EnsureCapacity(len(cwLog.LogEvents))

init := true

for _, event := range cwLog.LogEvents {
// Parse message as a single CloudTrail record
var record CloudTrailRecord
if err := gojson.Unmarshal([]byte(event.Message), &record); err != nil {
return plog.Logs{}, fmt.Errorf("failed to unmarshal CloudTrail event from message: %w", err)
}

// Set resource attributes from first record (region, account)
if init {
u.setResourceAttributes(resourceLogs.Resource().Attributes(), record)
init = false
}

logRecord := logRecords.AppendEmpty()
if err := u.setLogRecord(logRecord, &record); err != nil {
return plog.Logs{}, err
}
}

return logs, nil
}

// processRecords is specialized in processing CloudTrail log records.
// Implementation works with a gojson.Decoder to efficiently stream through potentially large log files.
func (u *CloudTrailLogUnmarshaler) processRecords(decoder *gojson.Decoder) (plog.Logs, error) {
logs := plog.NewLogs()

func (u *CloudTrailLogUnmarshaler) fromS3(decoder *gojson.Decoder) (plog.Logs, error) {
// Check opening bracket
if token, err := decoder.Token(); err != nil || token != gojson.Delim('{') {
return plog.Logs{}, fmt.Errorf("expected '{': %w", err)
Expand All @@ -200,10 +250,7 @@ func (u *CloudTrailLogUnmarshaler) processRecords(decoder *gojson.Decoder) (plog
return plog.Logs{}, fmt.Errorf("expected '[': %w", err)
}

// Create a single resource logs entry for all records
resourceLogs := logs.ResourceLogs().AppendEmpty()
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
u.setCommonScopeAttributes(scopeLogs)
logs, resourceLogs, scopeLogs := u.createLogs()

logRecords := scopeLogs.LogRecords()
// Pre-allocate space for log records to improve performance
Expand Down Expand Up @@ -291,6 +338,15 @@ func (u *CloudTrailLogUnmarshaler) setCommonScopeAttributes(scope plog.ScopeLogs
scope.Scope().Attributes().PutStr(constants.FormatIdentificationTag, "aws."+constants.FormatCloudTrailLog)
}

func (u *CloudTrailLogUnmarshaler) createLogs() (plog.Logs, plog.ResourceLogs, plog.ScopeLogs) {
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()

scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
u.setCommonScopeAttributes(scopeLogs)
return logs, resourceLogs, scopeLogs
}

func (*CloudTrailLogUnmarshaler) setResourceAttributes(attrs pcommon.Map, record CloudTrailRecord) {
attrs.PutStr(string(conventions.CloudProviderKey), conventions.CloudProviderAWS.Value.AsString())
attrs.PutStr(string(conventions.CloudRegionKey), record.AwsRegion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func TestCloudTrailLogUnmarshaler_UnmarshalAWSLogs_Valid(t *testing.T) {
outputLogsFile: "cloudtrail_log_expected_with_uid_feature.yaml",
userIDFeature: true,
},
{
name: "Valid CloudWatch subscription filter format",
inputLogsFile: "cloudtrail_log_cw.json",
outputLogsFile: "cloudtrail_log_cw_expected.yaml",
userIDFeature: true,
},
}

for _, test := range tests {
Expand Down