Skip to content

Commit abc720f

Browse files
committed
Refactor unmarshalers to fit into encoding framework
The internal unmarshalers now implement plog.Unmarshaler and pmetric.Unmarshaler. This will enable extracting them later as encoding extensions; for now they remain embedded within the receiver. As a result of the interface change, the unmarshalers now unmarshal a single record at a time, which means we cannot merge resources/metrics as we go, but only after each record. This also fixes a bug in the cwmetrics unmarshaller where the unit of a metric was not considered part of its identity, and so two metrics that differed only by unit would be merged.
1 parent 0788185 commit abc720f

28 files changed

+719
-781
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awsfirehosereceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for encoding extensions
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37113]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Adds `encoding` config setting, and deprecates the `record_type` setting.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/awsfirehosereceiver/README.md

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,25 +45,28 @@ See [documentation](https://github.com/open-telemetry/opentelemetry-collector/bl
4545

4646
A `cert_file` and `key_file` are required.
4747

48-
### record_type:
49-
The type of record being received from the delivery stream. Each unmarshaler handles a specific type, so the field allows the receiver to use the correct one.
48+
### encoding:
49+
50+
The ID of an [encoding extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/encoding) for decoding logs or metrics.
51+
This configuration also supports the built-in encodings listed in the [Encodings](#encodings) section.
52+
If no encoding is specified, then the receiver will default to a signal-specific encoding: `cwmetrics` for metrics, and `cwlogs` for logs.
5053

51-
default: `cwmetrics`
54+
### record_type:
5255

53-
See the [Record Types](#record-types) section for all available options.
56+
Deprecated, use `encoding` instead. `record_type` will be removed in a future release; it is an alias for `encoding`.
5457

5558
### access_key (Optional):
5659
The access key to be checked on each request received. This can be set when creating or updating the delivery stream.
5760
See [documentation](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http) for details.
5861

59-
## Record Types
62+
## Encodings
6063

6164
### cwmetrics
62-
The record type for the CloudWatch metric stream. Expects the format for the records to be JSON.
65+
The encoding for the CloudWatch metric stream. Expects the format for the records to be JSON.
6366
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html) for details.
6467

6568
### cwlogs
66-
The record type for the CloudWatch log stream. Expects the format for the records to be JSON.
69+
The encoding for the CloudWatch log stream. Expects the format for the records to be JSON.
6770
For example:
6871

6972
```json
@@ -84,5 +87,5 @@ For example:
8487
```
8588

8689
### otlp_v1
87-
The OTLP v1 format as produced by CloudWatch metric streams.
90+
The OTLP v1 encoding as produced by CloudWatch metric streams.
8891
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html) for details.

receiver/awsfirehosereceiver/config.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,22 @@ import (
88

99
"go.opentelemetry.io/collector/config/confighttp"
1010
"go.opentelemetry.io/collector/config/configopaque"
11+
"go.uber.org/zap"
1112
)
1213

1314
type Config struct {
1415
// ServerConfig is used to set up the Firehose delivery
1516
// endpoint. The Firehose delivery stream expects an HTTPS
1617
// endpoint, so TLSSettings must be used to enable that.
1718
confighttp.ServerConfig `mapstructure:",squash"`
18-
// RecordType is the key used to determine which unmarshaler to use
19-
// when receiving the requests.
19+
// Encoding identifies the encoding of records received from
20+
// Firehose. Defaults to telemetry-specific encodings: "cwlog"
21+
// for logs, and "cwmetrics" for metrics.
22+
Encoding string `mapstructure:"encoding"`
23+
// RecordType is an alias for Encoding for backwards compatibility.
24+
// It is an error to specify both encoding and record_type.
25+
//
26+
// Deprecated: use Encoding instead.
2027
RecordType string `mapstructure:"record_type"`
2128
// AccessKey is checked against the one received with each request.
2229
// This can be set when creating or updating the Firehose delivery
@@ -30,10 +37,14 @@ func (c *Config) Validate() error {
3037
if c.Endpoint == "" {
3138
return errors.New("must specify endpoint")
3239
}
33-
// If a record type is specified, it must be valid.
34-
// An empty string is acceptable, however, because it will use a telemetry-type-specific default.
35-
if c.RecordType != "" {
36-
return validateRecordType(c.RecordType)
40+
if c.RecordType != "" && c.Encoding != "" {
41+
return errors.New("record_type must not be set when encoding is set")
3742
}
3843
return nil
3944
}
45+
46+
func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) {
47+
if cfg.RecordType != "" {
48+
logger.Warn("record_type is deprecated, and will be removed in a future version. Use encoding instead.")
49+
}
50+
}

receiver/awsfirehosereceiver/config_test.go

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919

2020
func TestLoadConfig(t *testing.T) {
2121
for _, configType := range []string{
22-
"cwmetrics", "cwlogs", "otlp_v1", "invalid",
22+
"cwmetrics", "cwlogs", "otlp_v1",
2323
} {
2424
t.Run(configType, func(t *testing.T) {
2525
fileName := configType + "_config.yaml"
@@ -34,24 +34,36 @@ func TestLoadConfig(t *testing.T) {
3434
require.NoError(t, sub.Unmarshal(cfg))
3535

3636
err = component.ValidateConfig(cfg)
37-
if configType == "invalid" {
38-
assert.Error(t, err)
39-
} else {
40-
assert.NoError(t, err)
41-
require.Equal(t, &Config{
42-
RecordType: configType,
43-
AccessKey: "some_access_key",
44-
ServerConfig: confighttp.ServerConfig{
45-
Endpoint: "0.0.0.0:4433",
46-
TLSSetting: &configtls.ServerConfig{
47-
Config: configtls.Config{
48-
CertFile: "server.crt",
49-
KeyFile: "server.key",
50-
},
37+
assert.NoError(t, err)
38+
require.Equal(t, &Config{
39+
RecordType: configType,
40+
AccessKey: "some_access_key",
41+
ServerConfig: confighttp.ServerConfig{
42+
Endpoint: "0.0.0.0:4433",
43+
TLSSetting: &configtls.ServerConfig{
44+
Config: configtls.Config{
45+
CertFile: "server.crt",
46+
KeyFile: "server.key",
5147
},
5248
},
53-
}, cfg)
54-
}
49+
},
50+
}, cfg)
5551
})
5652
}
5753
}
54+
55+
func TestLoadConfigInvalid(t *testing.T) {
56+
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "invalid_config.yaml"))
57+
require.NoError(t, err)
58+
59+
factory := NewFactory()
60+
cfg := factory.CreateDefaultConfig()
61+
62+
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String())
63+
require.NoError(t, err)
64+
require.NoError(t, sub.Unmarshal(cfg))
65+
66+
err = component.ValidateConfig(cfg)
67+
require.Error(t, err)
68+
assert.EqualError(t, err, "record_type must not be set when encoding is set")
69+
}

receiver/awsfirehosereceiver/factory.go

Lines changed: 6 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,19 @@ package awsfirehosereceiver // import "github.com/open-telemetry/opentelemetry-c
55

66
import (
77
"context"
8-
"errors"
98

109
"go.opentelemetry.io/collector/component"
1110
"go.opentelemetry.io/collector/config/confighttp"
1211
"go.opentelemetry.io/collector/consumer"
1312
"go.opentelemetry.io/collector/receiver"
14-
"go.uber.org/zap"
1513

1614
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
17-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler"
18-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog"
19-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream"
20-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
2115
)
2216

2317
const (
2418
defaultEndpoint = "localhost:4433"
2519
)
2620

27-
var (
28-
errUnrecognizedRecordType = errors.New("unrecognized record type")
29-
availableRecordTypes = map[string]bool{
30-
cwmetricstream.TypeStr: true,
31-
cwlog.TypeStr: true,
32-
otlpmetricstream.TypeStr: true,
33-
}
34-
)
35-
3621
// NewFactory creates a receiver factory for awsfirehose. Currently, only
3722
// available in metrics pipelines.
3823
func NewFactory() receiver.Factory {
@@ -43,34 +28,6 @@ func NewFactory() receiver.Factory {
4328
receiver.WithLogs(createLogsReceiver, metadata.LogsStability))
4429
}
4530

46-
// validateRecordType checks the available record types for the
47-
// passed in one and returns an error if not found.
48-
func validateRecordType(recordType string) error {
49-
if _, ok := availableRecordTypes[recordType]; !ok {
50-
return errUnrecognizedRecordType
51-
}
52-
return nil
53-
}
54-
55-
// defaultMetricsUnmarshalers creates a map of the available metrics
56-
// unmarshalers.
57-
func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.MetricsUnmarshaler {
58-
cwmsu := cwmetricstream.NewUnmarshaler(logger)
59-
otlpv1msu := otlpmetricstream.NewUnmarshaler(logger)
60-
return map[string]unmarshaler.MetricsUnmarshaler{
61-
cwmsu.Type(): cwmsu,
62-
otlpv1msu.Type(): otlpv1msu,
63-
}
64-
}
65-
66-
// defaultLogsUnmarshalers creates a map of the available logs unmarshalers.
67-
func defaultLogsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.LogsUnmarshaler {
68-
u := cwlog.NewUnmarshaler(logger)
69-
return map[string]unmarshaler.LogsUnmarshaler{
70-
u.Type(): u,
71-
}
72-
}
73-
7431
// createDefaultConfig creates a default config with the endpoint set
7532
// to port 8443 and the record type set to the CloudWatch metric stream.
7633
func createDefaultConfig() component.Config {
@@ -88,7 +45,9 @@ func createMetricsReceiver(
8845
cfg component.Config,
8946
nextConsumer consumer.Metrics,
9047
) (receiver.Metrics, error) {
91-
return newMetricsReceiver(cfg.(*Config), set, defaultMetricsUnmarshalers(set.Logger), nextConsumer)
48+
c := cfg.(*Config)
49+
handleDeprecatedConfig(c, set.Logger)
50+
return newMetricsReceiver(c, set, nextConsumer)
9251
}
9352

9453
// createMetricsReceiver implements the CreateMetricsReceiver function type.
@@ -98,5 +57,7 @@ func createLogsReceiver(
9857
cfg component.Config,
9958
nextConsumer consumer.Logs,
10059
) (receiver.Logs, error) {
101-
return newLogsReceiver(cfg.(*Config), set, defaultLogsUnmarshalers(set.Logger), nextConsumer)
60+
c := cfg.(*Config)
61+
handleDeprecatedConfig(c, set.Logger)
62+
return newLogsReceiver(c, set, nextConsumer)
10263
}

receiver/awsfirehosereceiver/factory_test.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import (
1111
"go.opentelemetry.io/collector/component/componenttest"
1212
"go.opentelemetry.io/collector/consumer/consumertest"
1313
"go.opentelemetry.io/collector/receiver/receivertest"
14-
15-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
1614
)
1715

1816
func TestValidConfig(t *testing.T) {
@@ -41,10 +39,3 @@ func TestCreateLogsReceiver(t *testing.T) {
4139
require.NoError(t, err)
4240
require.NotNil(t, r)
4341
}
44-
45-
func TestValidateRecordType(t *testing.T) {
46-
require.NoError(t, validateRecordType(defaultMetricsRecordType))
47-
require.NoError(t, validateRecordType(defaultLogsRecordType))
48-
require.NoError(t, validateRecordType(otlpmetricstream.TypeStr))
49-
require.Error(t, validateRecordType("nop"))
50-
}

receiver/awsfirehosereceiver/go.mod

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ go 1.22.0
44

55
require (
66
github.com/gogo/protobuf v1.3.2
7+
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.117.1-0.20250114172347-71aae791d7f8
8+
github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.117.1-0.20250114172347-71aae791d7f8
79
github.com/stretchr/testify v1.10.0
810
go.opentelemetry.io/collector/component v0.117.1-0.20250114172347-71aae791d7f8
911
go.opentelemetry.io/collector/component/componentstatus v0.117.1-0.20250114172347-71aae791d7f8
@@ -24,6 +26,7 @@ require (
2426
)
2527

2628
require (
29+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2730
github.com/davecgh/go-spew v1.1.1 // indirect
2831
github.com/felixge/httpsnoop v1.0.4 // indirect
2932
github.com/fsnotify/fsnotify v1.8.0 // indirect
@@ -41,6 +44,7 @@ require (
4144
github.com/mitchellh/reflectwalk v1.0.2 // indirect
4245
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
4346
github.com/modern-go/reflect2 v1.0.2 // indirect
47+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.117.1-0.20250114172347-71aae791d7f8 // indirect
4448
github.com/pierrec/lz4/v4 v4.1.22 // indirect
4549
github.com/pmezard/go-difflib v1.0.0 // indirect
4650
github.com/rs/cors v1.11.1 // indirect
@@ -75,3 +79,13 @@ retract (
7579
v0.76.1
7680
v0.65.0
7781
)
82+
83+
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../internal/pdatautil
84+
85+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
86+
87+
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics
88+
89+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
90+
91+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

receiver/awsfirehosereceiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)