Skip to content

Commit 5404f7c

Browse files
authored
Add profiles support to kafka receiver (open-telemetry#41479)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This adds profiles support to the kafka receiver. Related: the exporter PR: open-telemetry#41369
1 parent 2206632 commit 5404f7c

17 files changed

+407
-17
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add profiles support
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [41479]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkareceiver/README.md

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
<!-- status autogenerated section -->
44
| Status | |
55
| ------------- |-----------|
6-
| Stability | [beta]: metrics, logs, traces |
6+
| Stability | [development]: profiles |
7+
| | [beta]: metrics, logs, traces |
78
| Distributions | [core], [contrib] |
89
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fkafka%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fkafka) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fkafka%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fkafka) |
910
| Code coverage | [![codecov](https://codecov.io/github/open-telemetry/opentelemetry-collector-contrib/graph/main/badge.svg?component=receiver_kafka)](https://app.codecov.io/gh/open-telemetry/opentelemetry-collector-contrib/tree/main/?components%5B0%5D=receiver_kafka&displayType=list) |
1011
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@pavolloffay](https://www.github.com/pavolloffay), [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@axw](https://www.github.com/axw) |
1112

13+
[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
1214
[beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta
1315
[core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol
1416
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
@@ -48,6 +50,9 @@ The following settings can be optionally configured:
4850
- `traces`
4951
- `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces.
5052
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
53+
- `profiles`
54+
- `topic` (default = otlp\_profiles): The name of the Kafka topic from which to consume profiles.
55+
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
5156
- `topic` (Deprecated [v0.124.0]: use `logs::topic`, `traces::topic`, or `metrics::topic`).
5257
If this is set, it will take precedence over the default value for those fields.
5358
- `encoding` (Deprecated [v0.124.0]: use `logs::encoding`, `traces::encoding`, or `metrics::encoding`).
@@ -109,10 +114,10 @@ The following settings can be optionally configured:
109114
**Note: this can block the entire partition in case a message processing returns a permanent error**
110115
- `header_extraction`:
111116
- `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel pipeline
112-
- `headers` (default = []): List of headers they'd like to extract from kafka record.
113-
**Note: Matching pattern will be `exact`. Regexes are not supported as of now.**
117+
- `headers` (default = []): List of headers they'd like to extract from kafka record.
118+
**Note: Matching pattern will be `exact`. Regexes are not supported as of now.**
114119
- `error_backoff`: [BackOff](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.116.0/config/configretry/backoff.go#L27-L43) configuration in case of errors
115-
- `enabled`: (default = false) Whether to enable backoff when next consumers return errors
120+
- `enabled`: (default = false) Whether to enable backoff when next consumers return errors
116121
- `initial_interval`: The time to wait after the first error before retrying
117122
- `max_interval`: The upper bound on backoff interval between consecutive retries
118123
- `multiplier`: The value multiplied by the backoff interval bounds
@@ -190,7 +195,7 @@ be configured to extract and attach specific headers as resource attributes. e.g
190195
```yaml
191196
receivers:
192197
kafka:
193-
header_extraction:
198+
header_extraction:
194199
extract_headers: true
195200
headers: ["header1", "header2"]
196201
```

receiver/kafkareceiver/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ type Config struct {
2727
// Traces holds configuration about how traces should be consumed.
2828
Traces TopicEncodingConfig `mapstructure:"traces"`
2929

30+
// Profiles holds configuration about how profiles should be consumed.
31+
Profiles TopicEncodingConfig `mapstructure:"profiles"`
32+
3033
// Topic holds the name of the Kafka topic from which to consume data.
3134
//
3235
// Topic has no default. If explicitly specified, it will take precedence
@@ -80,6 +83,9 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error {
8083
if zeroConfig.Traces.Topic == "" {
8184
c.Traces.Topic = c.Topic
8285
}
86+
if zeroConfig.Profiles.Topic == "" {
87+
c.Profiles.Topic = c.Topic
88+
}
8389
}
8490
if c.Encoding != "" {
8591
if zeroConfig.Logs.Encoding == "" {
@@ -91,6 +97,9 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error {
9197
if zeroConfig.Traces.Encoding == "" {
9298
c.Traces.Encoding = c.Encoding
9399
}
100+
if zeroConfig.Profiles.Encoding == "" {
101+
c.Profiles.Encoding = c.Encoding
102+
}
94103
}
95104

96105
// Set OnPermanentError default value to inherit from OnError for backward compatibility
@@ -119,6 +128,7 @@ type TopicEncodingConfig struct {
119128
// - "otlp_spans" for traces
120129
// - "otlp_metrics" for metrics
121130
// - "otlp_logs" for logs
131+
// - "otlp_profiles" for profiles
122132
Topic string `mapstructure:"topic"`
123133

124134
// Encoding holds the expected encoding of messages for the signal type

receiver/kafkareceiver/config_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ func TestLoadConfig(t *testing.T) {
5858
Topic: "spans",
5959
Encoding: "otlp_proto",
6060
},
61+
Profiles: TopicEncodingConfig{
62+
Topic: "spans",
63+
Encoding: "otlp_proto",
64+
},
6165
Topic: "spans",
6266
ErrorBackOff: configretry.BackOffConfig{
6367
Enabled: false,
@@ -88,6 +92,10 @@ func TestLoadConfig(t *testing.T) {
8892
Topic: "legacy_topic",
8993
Encoding: "otlp_proto",
9094
},
95+
Profiles: TopicEncodingConfig{
96+
Topic: "legacy_topic",
97+
Encoding: "otlp_proto",
98+
},
9199
Topic: "legacy_topic",
92100
ErrorBackOff: configretry.BackOffConfig{
93101
Enabled: false,
@@ -111,6 +119,10 @@ func TestLoadConfig(t *testing.T) {
111119
Topic: "otlp_spans",
112120
Encoding: "legacy_encoding",
113121
},
122+
Profiles: TopicEncodingConfig{
123+
Topic: "otlp_profiles",
124+
Encoding: "legacy_encoding",
125+
},
114126
Encoding: "legacy_encoding",
115127
ErrorBackOff: configretry.BackOffConfig{
116128
Enabled: false,
@@ -158,6 +170,10 @@ func TestLoadConfig(t *testing.T) {
158170
Topic: "otlp_spans",
159171
Encoding: "otlp_proto",
160172
},
173+
Profiles: TopicEncodingConfig{
174+
Topic: "otlp_profiles",
175+
Encoding: "otlp_proto",
176+
},
161177
ErrorBackOff: configretry.BackOffConfig{
162178
Enabled: true,
163179
InitialInterval: 1 * time.Second,
@@ -189,6 +205,10 @@ func TestLoadConfig(t *testing.T) {
189205
Topic: "otlp_spans",
190206
Encoding: "otlp_proto",
191207
},
208+
Profiles: TopicEncodingConfig{
209+
Topic: "otlp_profiles",
210+
Encoding: "otlp_proto",
211+
},
192212
ErrorBackOff: configretry.BackOffConfig{
193213
Enabled: false,
194214
},
@@ -211,6 +231,10 @@ func TestLoadConfig(t *testing.T) {
211231
Topic: "otlp_spans",
212232
Encoding: "otlp_proto",
213233
},
234+
Profiles: TopicEncodingConfig{
235+
Topic: "otlp_profiles",
236+
Encoding: "otlp_proto",
237+
},
214238
MessageMarking: MessageMarking{
215239
After: true,
216240
OnError: true,
@@ -238,6 +262,10 @@ func TestLoadConfig(t *testing.T) {
238262
Topic: "otlp_spans",
239263
Encoding: "otlp_proto",
240264
},
265+
Profiles: TopicEncodingConfig{
266+
Topic: "otlp_profiles",
267+
Encoding: "otlp_proto",
268+
},
241269
MessageMarking: MessageMarking{
242270
After: false,
243271
OnError: false,
@@ -265,6 +293,10 @@ func TestLoadConfig(t *testing.T) {
265293
Topic: "otlp_spans",
266294
Encoding: "otlp_proto",
267295
},
296+
Profiles: TopicEncodingConfig{
297+
Topic: "otlp_profiles",
298+
Encoding: "otlp_proto",
299+
},
268300
MessageMarking: MessageMarking{
269301
After: true,
270302
OnError: true,

receiver/kafkareceiver/documentation.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,21 @@ Number of metric points failed to be unmarshaled
278278
| topic | The Kafka topic. | Any Str |
279279
| partition | The Kafka topic partition. | Any Int |
280280

281+
### otelcol_kafka_receiver_unmarshal_failed_profiles
282+
283+
Number of profiles failed to be unmarshaled
284+
285+
| Unit | Metric Type | Value Type | Monotonic |
286+
| ---- | ----------- | ---------- | --------- |
287+
| 1 | Sum | Int | true |
288+
289+
#### Attributes
290+
291+
| Name | Description | Values |
292+
| ---- | ----------- | ------ |
293+
| topic | The Kafka topic. | Any Str |
294+
| partition | The Kafka topic partition. | Any Int |
295+
281296
### otelcol_kafka_receiver_unmarshal_failed_spans
282297

283298
Number of spans failed to be unmarshaled

receiver/kafkareceiver/encoding.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.opentelemetry.io/collector/component"
1212
"go.opentelemetry.io/collector/pdata/plog"
1313
"go.opentelemetry.io/collector/pdata/pmetric"
14+
"go.opentelemetry.io/collector/pdata/pprofile"
1415
"go.opentelemetry.io/collector/pdata/ptrace"
1516
"go.opentelemetry.io/collector/receiver"
1617

@@ -127,6 +128,24 @@ func loadEncodingExtension[T any](host component.Host, encoding, signalType stri
127128
return unmarshaler, nil
128129
}
129130

131+
func newProfilesUnmarshaler(encoding string, _ receiver.Settings, host component.Host) (pprofile.Unmarshaler, error) {
132+
// Extensions take precedence.
133+
if unmarshaler, err := loadEncodingExtension[pprofile.Unmarshaler](host, encoding, "profiles"); err != nil {
134+
if !errors.Is(err, errInvalidComponentType) && !errors.Is(err, errUnknownEncodingExtension) {
135+
return nil, err
136+
}
137+
} else {
138+
return unmarshaler, nil
139+
}
140+
switch encoding {
141+
case "otlp_proto":
142+
return &pprofile.ProtoUnmarshaler{}, nil
143+
case "otlp_json":
144+
return &pprofile.JSONUnmarshaler{}, nil
145+
}
146+
return nil, fmt.Errorf("unrecognized profiles encoding %q", encoding)
147+
}
148+
130149
// encodingToComponentID attempts to parse the encoding string as a component ID.
131150
func encodingToComponentID(encoding string) (*component.ID, error) {
132151
var id component.ID

receiver/kafkareceiver/encoding_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@ import (
1616
"go.opentelemetry.io/collector/pdata/pcommon"
1717
"go.opentelemetry.io/collector/pdata/plog"
1818
"go.opentelemetry.io/collector/pdata/pmetric"
19+
"go.opentelemetry.io/collector/pdata/pprofile"
1920
"go.opentelemetry.io/collector/pdata/ptrace"
21+
"go.opentelemetry.io/collector/pdata/testdata"
2022
"go.opentelemetry.io/collector/receiver/receivertest"
2123
"golang.org/x/text/encoding/unicode"
2224

2325
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
2426
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
27+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pprofiletest"
2528
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest"
2629
zipkinthriftconverter "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinthriftconverter"
2730
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2"
@@ -41,6 +44,10 @@ var (
4144
component.Component
4245
ptrace.Unmarshaler
4346
}
47+
customProfilesUnmarshalerExtension struct {
48+
component.Component
49+
pprofile.Unmarshaler
50+
}
4451
)
4552

4653
func TestNewLogsUnmarshaler(t *testing.T) {
@@ -376,6 +383,60 @@ func TestNewTracesUnmarshalerExtension(t *testing.T) {
376383
assert.Nil(t, u)
377384
}
378385

386+
func TestNewProfilesUnmarshaler(t *testing.T) {
387+
profiles := testdata.GenerateProfiles(3)
388+
389+
otlpProtoProfiles, err := (&pprofile.ProtoMarshaler{}).MarshalProfiles(profiles)
390+
require.NoError(t, err)
391+
otlpJSONProfiles, err := (&pprofile.JSONMarshaler{}).MarshalProfiles(profiles)
392+
require.NoError(t, err)
393+
394+
for _, tc := range []struct {
395+
encoding string
396+
input []byte
397+
check func(*testing.T, pprofile.Profiles)
398+
}{
399+
{
400+
encoding: "otlp_proto",
401+
input: otlpProtoProfiles,
402+
check: func(t *testing.T, actual pprofile.Profiles) {
403+
assert.NoError(t, pprofiletest.CompareProfiles(profiles, actual))
404+
},
405+
},
406+
{
407+
encoding: "otlp_json",
408+
input: otlpJSONProfiles,
409+
check: func(t *testing.T, actual pprofile.Profiles) {
410+
assert.NoError(t, pprofiletest.CompareProfiles(profiles, actual))
411+
},
412+
},
413+
} {
414+
t.Run(tc.encoding, func(t *testing.T) {
415+
u := mustNewProfilesUnmarshaler(t, tc.encoding, componenttest.NewNopHost())
416+
out, err := u.UnmarshalProfiles(tc.input)
417+
require.NoError(t, err)
418+
tc.check(t, out)
419+
})
420+
}
421+
}
422+
423+
func TestNewProfilesUnmarshalerExtension(t *testing.T) {
424+
settings := receivertest.NewNopSettings(metadata.Type)
425+
426+
// Verify extensions take precedence over built-in unmarshalers.
427+
u := mustNewProfilesUnmarshaler(t, "otlp_proto", extensionsHost{
428+
component.MustNewID("otlp_proto"): &customProfilesUnmarshalerExtension,
429+
})
430+
assert.Equal(t, &customProfilesUnmarshalerExtension, u)
431+
432+
// Specifying an extension for a different type should fail fast.
433+
u, err := newProfilesUnmarshaler("not_profiles", settings, extensionsHost{
434+
component.MustNewID("not_profiles"): &customLogsUnmarshalerExtension,
435+
})
436+
require.EqualError(t, err, `extension "not_profiles" is not a profiles unmarshaler`)
437+
assert.Nil(t, u)
438+
}
439+
379440
func mustNewLogsUnmarshaler(tb testing.TB, encoding string, host component.Host) plog.Unmarshaler {
380441
settings := receivertest.NewNopSettings(metadata.Type)
381442
u, err := newLogsUnmarshaler(encoding, settings, host)
@@ -397,6 +458,13 @@ func mustNewTracesUnmarshaler(tb testing.TB, encoding string, host component.Hos
397458
return u
398459
}
399460

461+
func mustNewProfilesUnmarshaler(tb testing.TB, encoding string, host component.Host) pprofile.Unmarshaler {
462+
settings := receivertest.NewNopSettings(metadata.Type)
463+
u, err := newProfilesUnmarshaler(encoding, settings, host)
464+
require.NoError(tb, err)
465+
return u
466+
}
467+
400468
type extensionsHost map[component.ID]component.Component
401469

402470
func (h extensionsHost) GetExtensions() map[component.ID]component.Component {

0 commit comments

Comments
 (0)