-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[receiver/awsfirehose] Add support for encoding extensions #37262
base: main
Are you sure you want to change the base?
Changes from all commits
abc720f
a2f0c9d
2637d55
fcc16d1
e5a941d
53458aa
2fcec6b
7a93d77
fd580ba
07d1b52
e493ed4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: enhancement | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: awsfirehosereceiver | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Add support for encoding extensions | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [37113] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: Adds `encoding` config setting, and deprecates the `record_type` setting. | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [user] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package awsfirehosereceiver | ||
|
||
import ( | ||
"bytes" | ||
"compress/gzip" | ||
"context" | ||
"fmt" | ||
"math/rand/v2" | ||
"net/http" | ||
"testing" | ||
|
||
"go.opentelemetry.io/collector/consumer/consumertest" | ||
"go.uber.org/zap" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" | ||
) | ||
|
||
func BenchmarkLogsConsumer_cwlogs(b *testing.B) { | ||
// numLogGroups is the maximum number of unique log groups | ||
// to use across the generated logs, using a random generator. | ||
const numLogGroups = 10 | ||
|
||
// numRecords is the number of records in the Firehose envelope. | ||
for _, numRecords := range []int{10, 100} { | ||
// numLogs is the number of CoudWatch log records within a Firehose record. | ||
for _, numLogs := range []int{1, 10} { | ||
b.Run(fmt.Sprintf("%dresources_%drecords_%dlogs", numLogGroups, numRecords, numLogs), func(b *testing.B) { | ||
lc := &logsConsumer{ | ||
unmarshaler: cwlog.NewUnmarshaler(zap.NewNop()), | ||
consumer: consumertest.NewNop(), | ||
} | ||
records := make([][]byte, numRecords) | ||
for i := range records { | ||
records[i] = makeCloudWatchLogRecord(numLogs, numLogGroups) | ||
} | ||
b.ResetTimer() | ||
for i := 0; i < b.N; i++ { | ||
code, err := lc.Consume(context.Background(), records, nil) | ||
if err != nil { | ||
b.Fatal(err) | ||
} | ||
if code != http.StatusOK { | ||
b.Fatalf("expected status code 200, got %d", code) | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
} | ||
|
||
func BenchmarkMetricsConsumer_cwmetrics(b *testing.B) { | ||
// numStreams is the maximum number of unique metric streams | ||
// to use across the generated metrics, using a random generator. | ||
const numStreams = 10 | ||
|
||
// numRecords is the number of records in the Firehose envelope. | ||
for _, numRecords := range []int{10, 100} { | ||
// numMetrics is the number of CoudWatch metrics within a Firehose record. | ||
for _, numMetrics := range []int{1, 10} { | ||
b.Run(fmt.Sprintf("%dresources_%drecords_%dmetrics", numStreams, numRecords, numMetrics), func(b *testing.B) { | ||
mc := &metricsConsumer{ | ||
unmarshaler: cwmetricstream.NewUnmarshaler(zap.NewNop()), | ||
consumer: consumertest.NewNop(), | ||
} | ||
records := make([][]byte, numRecords) | ||
for i := range records { | ||
records[i] = makeCloudWatchMetricRecord(numMetrics, numStreams) | ||
} | ||
b.ResetTimer() | ||
for i := 0; i < b.N; i++ { | ||
code, err := mc.Consume(context.Background(), records, nil) | ||
if err != nil { | ||
b.Fatal(err) | ||
} | ||
if code != http.StatusOK { | ||
b.Fatalf("expected status code 200, got %d", code) | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
} | ||
|
||
func makeCloudWatchLogRecord(numLogs, numLogGroups int) []byte { | ||
var buf bytes.Buffer | ||
w := gzip.NewWriter(&buf) | ||
for i := 0; i < numLogs; i++ { | ||
group := rand.IntN(numLogGroups) | ||
fmt.Fprintf(w, | ||
`{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"group_%d","logStream":"stream","logEvents":[{"id":"the_id","timestamp":1725594035523,"message":"message %d"}]}`, | ||
group, i, | ||
) | ||
fmt.Fprintln(w) | ||
} | ||
if err := w.Close(); err != nil { | ||
panic(err) | ||
} | ||
return buf.Bytes() | ||
} | ||
|
||
func makeCloudWatchMetricRecord(numMetrics, numStreams int) []byte { | ||
var buf bytes.Buffer | ||
for i := 0; i < numMetrics; i++ { | ||
stream := rand.IntN(numStreams) | ||
fmt.Fprintf(&buf, | ||
`{"metric_stream_name":"stream_%d","account_id":"1234567890","region":"us-east-1","namespace":"AWS/NATGateway","metric_name":"metric_%d","dimensions":{"NatGatewayId":"nat-01a4160dfb995b990"},"timestamp":1643916720000,"value":{"max":0.0,"min":0.0,"sum":0.0,"count":2.0},"unit":"Count"}`, | ||
stream, i, | ||
) | ||
fmt.Fprintln(&buf) | ||
} | ||
return buf.Bytes() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ import ( | |
|
||
func TestLoadConfig(t *testing.T) { | ||
for _, configType := range []string{ | ||
"cwmetrics", "cwlogs", "otlp_v1", "invalid", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Testing for an invalid record type or encoding is different from testing that both an encoding and record type have been provided. Both tests should remain. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we're now supporting extensions, the record type is only known to be valid/invalid at the time we call |
||
"cwmetrics", "cwlogs", "otlp_v1", | ||
} { | ||
t.Run(configType, func(t *testing.T) { | ||
fileName := configType + "_config.yaml" | ||
|
@@ -34,24 +34,35 @@ func TestLoadConfig(t *testing.T) { | |
require.NoError(t, sub.Unmarshal(cfg)) | ||
|
||
err = component.ValidateConfig(cfg) | ||
if configType == "invalid" { | ||
assert.Error(t, err) | ||
} else { | ||
assert.NoError(t, err) | ||
require.Equal(t, &Config{ | ||
RecordType: configType, | ||
AccessKey: "some_access_key", | ||
ServerConfig: confighttp.ServerConfig{ | ||
Endpoint: "0.0.0.0:4433", | ||
TLSSetting: &configtls.ServerConfig{ | ||
Config: configtls.Config{ | ||
CertFile: "server.crt", | ||
KeyFile: "server.key", | ||
}, | ||
assert.NoError(t, err) | ||
require.Equal(t, &Config{ | ||
RecordType: configType, | ||
AccessKey: "some_access_key", | ||
ServerConfig: confighttp.ServerConfig{ | ||
Endpoint: "0.0.0.0:4433", | ||
TLSSetting: &configtls.ServerConfig{ | ||
Config: configtls.Config{ | ||
CertFile: "server.crt", | ||
KeyFile: "server.key", | ||
}, | ||
}, | ||
}, cfg) | ||
} | ||
}, | ||
}, cfg) | ||
}) | ||
} | ||
} | ||
|
||
func TestLoadConfigInvalid(t *testing.T) { | ||
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "invalid_config.yaml")) | ||
require.NoError(t, err) | ||
|
||
factory := NewFactory() | ||
cfg := factory.CreateDefaultConfig() | ||
|
||
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String()) | ||
require.NoError(t, err) | ||
require.NoError(t, sub.Unmarshal(cfg)) | ||
|
||
err = component.ValidateConfig(cfg) | ||
assert.Equal(t, errRecordTypeEncodingSet, err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think validation of the record type or encoding can be deferred. This has to fail fast to alert the user to their configuration error rather than allowing the collector to start and then failing to process received data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The collector will still fail fast. e.g.
It's doing a bit more work than before it gets to the error, but AFAIK it's not possible to access extensions earlier than the
Start
method.