-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[receiver/awsfirehose] Add support for encoding extensions #37262
base: main
Are you sure you want to change the base?
Conversation
7e6a082
to
fbf5919
Compare
The internal unmarshalers now implement plog.Unmarshaler and pmetric.Unmarshaler. This will enable extracting them later as encoding extensions; for now they remain embedded within the receiver. As a result of the interface change, the unmarshalers now unmarshal a single record at a time, which means we cannot merge resources/metrics as we go, but only after each record. This also fixes a bug in the cwmetrics unmarshaller where the unit of a metric was not considered part of its identity, and so two metrics that differed only by unit would be merged.
fbf5919
to
abc720f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -19,7 +19,7 @@ import ( | |||
|
|||
func TestLoadConfig(t *testing.T) { | |||
for _, configType := range []string{ | |||
"cwmetrics", "cwlogs", "otlp_v1", "invalid", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing for an invalid record type or encoding is different from testing that both an encoding and record type have been provided. Both tests should remain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we're now supporting extensions, the record type is only known to be valid/invalid at the time we call Start
. There's a test case in there for invalid encoding/record type. See the WithUnknownEncoding
test cases for TestLogsReceiver_Start
and TestMetricsReceiver_Start
.
// If a record type is specified, it must be valid. | ||
// An empty string is acceptable, however, because it will use a telemetry-type-specific default. | ||
if c.RecordType != "" { | ||
return validateRecordType(c.RecordType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think validation of the record type or encoding can be deferred. This has to fail fast to alert the user to their configuration error rather than allowing the collector to start and then failing to process received data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The collector will still fail fast. e.g.
$ cat local/config.yaml
receivers:
awsfirehose:
record_type: invalid
exporters:
debug: {}
service:
pipelines:
logs:
receivers: [awsfirehose]
processors: []
exporters: [debug]
$ ./bin/otelcontribcol_linux_amd64 --config local/config.yaml
2025-01-17T10:51:28.527+0800 info [email protected]/service.go:164 Setting up own telemetry...
2025-01-17T10:51:28.527+0800 info telemetry/metrics.go:70 Serving metrics {"address": "localhost:8888", "metrics level": "Normal"}
2025-01-17T10:51:28.527+0800 info builders/builders.go:26 Development component. May change in the future. {"kind": "exporter", "data_type": "logs", "name": "debug"}
2025-01-17T10:51:28.527+0800 warn [email protected]/config.go:48 record_type is deprecated, and will be removed in a future version. Use encoding instead. {"kind": "receiver", "name": "awsfirehose", "data_type": "logs"}
2025-01-17T10:51:28.530+0800 info [email protected]/service.go:230 Starting otelcontribcol... {"Version": "0.117.0-dev", "NumCPU": 16}
2025-01-17T10:51:28.530+0800 info extensions/extensions.go:39 Starting extensions...
2025-01-17T10:51:28.530+0800 error graph/graph.go:426 Failed to start component {"error": "unknown encoding extension \"invalid\"", "type": "Receiver", "id": "awsfirehose"}
2025-01-17T10:51:28.530+0800 info [email protected]/service.go:295 Starting shutdown...
2025-01-17T10:51:28.530+0800 info extensions/extensions.go:66 Stopping extensions...
2025-01-17T10:51:28.530+0800 info [email protected]/service.go:309 Shutdown complete.
Error: cannot start pipelines: unknown encoding extension "invalid"
2025/01/17 10:51:28 collector server run finished with error: cannot start pipelines: unknown encoding extension "invalid"
It's doing a bit more work than before it gets to the error, but AFAIK it's not possible to access extensions earlier than the Start
method.
receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go
Outdated
Show resolved
Hide resolved
if logs.ResourceLogs().Len() == 0 { | ||
return logs, errInvalidRecords | ||
} | ||
pdatautil.GroupByResourceLogs(logs.ResourceLogs()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a benchmark to compare the effect of allocating a new resource log entry for each log record and combining them later? If not, please add one. This feels like something that could be a significant regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's not, I've added one. This way of grouping is indeed much more expensive than what was there before. I've reverted to grouping by the known CloudWatch attributes.
} | ||
|
||
return md, nil | ||
metrics = expmetrics.Merge(pmetric.NewMetrics(), metrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again would like a benchmark. This feels like a lot of duplicated work for inputs of any appreciable size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, and like in cwlog I've reverted to grouping by the known CloudWatch attributes, rather than using expmetrics.Merge
here. Note that there's still merging at the consumer level, since at that level we don't know anything about the data -- so it has to merge in a more generic and expensive way.
@@ -45,18 +57,16 @@ func TestUnmarshal(t *testing.T) { | |||
"WithSomeInvalidRecords": { | |||
filename: "some_invalid_records", | |||
wantResourceCount: 5, | |||
wantMetricCount: 35, | |||
wantMetricCount: 36, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? Was there a latent defect or is this papering over a regression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah there was a defect, from the description:
Due to using the above merging functions, this PR also fixes a bug in the cwmetrics unmarshaller where the unit of a metric was not considered part of its identity, causing two metrics that differed only by unit to be merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifically, the MetadataNoToken metric in some_invalid_records has different units: in most cases it's "None", in one case it's "Count".
Adds a test that covers merging various combinations of resource/scope identities.
pdatautil.GroupByResourceLogs is much slower, at least partially because it's generic. Revert to a simpler grouping by known resource attributes.
- Use json-iterator for decoding JSON - Use klauspost/compress for decompressing gzip - Pool gzip readers - Remove pointer type from cwMetricValue to avoid allocation - Don't read the whole request body into memory - Implement more efficient metrics merging
@Aneurysm9 as mentioned in thread replies, I have reverted some of the grouping changes, but grouping across records is still going to be more expensive, and I think that's unavoidable given the signature of unmarhallers. To counteract that, I've made various optimisations unrelated to the main changes so unmarshalling is now faster -- albeit uses more memory for unmarshalling metrics (significantly less for logs). I may have gotten a little bit carried away, let me know if you'd like me to pull these into separate PRs.
|
Another option: we can keep the merging in the unmarshallers, which would operate on the Firehose record level, and drop the merging across records. That would be a change in behaviour, but would avoid the overhead. |
I've done this in #37361 |
Description
The internal unmarshallers now implement
plog.Unmarshaler
andpmetric.Unmarshaler
. This will enable extracting them later as encoding extensions; for now they remain embedded within the receiver. The existingrecord_type
config has been deprecated and replaced byencoding
, which can reference either the internal unmarshallers or encoding extensions.As a result of the interface change, the unmarshallers now unmarshal a single record at a time, which means we cannot merge resources/metrics as we go, but only after each record. This is achieved by using existing common code: internal/exp/metrics for merging metrics, and internal/pdatautil for merging logs.
Due to using the above merging functions, this PR also fixes a bug in the
cwmetrics
unmarshaller where the unit of a metric was not considered part of its identity, causing two metrics that differed only by unit to be merged.Link to tracking issue
Fixes #37113
Testing
Should be a non-functional change, so relying on existing unit tests to catch issues. Tests have been added for new extension functionality.
Documentation
Updated README.