Skip to content

Commit 388bae7

Browse files
dmathieumx-psi
andauthored
[chore] add exporterhelperprofiles package (open-telemetry#11226)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This is extracted from open-telemetry#11131. It adds profiles support for the epxorterhelper, so both the otlp and otlphttp exporters can support that signal. It requires open-telemetry#11207✅ and open-telemetry#11225✅ to be merged first. --------- Co-authored-by: Pablo Baeyens <[email protected]> Co-authored-by: Pablo Baeyens <[email protected]>
1 parent 4f2a8d3 commit 388bae7

File tree

12 files changed

+1041
-4
lines changed

12 files changed

+1041
-4
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include ../../../Makefile.Common
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterhelperprofiles // import "go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles"
5+
6+
import (
7+
"errors"
8+
)
9+
10+
var (
11+
// errNilConfig is returned when an empty name is given.
12+
errNilConfig = errors.New("nil config")
13+
// errNilLogger is returned when a logger is nil
14+
errNilLogger = errors.New("nil logger")
15+
// errNilPushProfileData is returned when a nil PushProfiles is given.
16+
errNilPushProfileData = errors.New("nil PushProfiles")
17+
// errNilProfilesConverter is returned when a nil RequestFromProfilesFunc is given.
18+
errNilProfilesConverter = errors.New("nil RequestFromProfilesFunc")
19+
)
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
module go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles
2+
3+
go 1.22.0
4+
5+
require (
6+
github.com/stretchr/testify v1.9.0
7+
go.opentelemetry.io/collector/component v0.111.0
8+
go.opentelemetry.io/collector/config/configretry v1.17.0
9+
go.opentelemetry.io/collector/consumer v0.111.0
10+
go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles v0.0.0-00010101000000-000000000000
11+
go.opentelemetry.io/collector/consumer/consumerprofiles v0.111.0
12+
go.opentelemetry.io/collector/consumer/consumertest v0.111.0
13+
go.opentelemetry.io/collector/exporter v0.111.0
14+
go.opentelemetry.io/collector/exporter/exporterprofiles v0.111.0
15+
go.opentelemetry.io/collector/pdata v1.17.0
16+
go.opentelemetry.io/collector/pdata/pprofile v0.111.0
17+
go.opentelemetry.io/collector/pdata/testdata v0.111.0
18+
go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.0.0-00010101000000-000000000000
19+
go.opentelemetry.io/otel v1.31.0
20+
go.opentelemetry.io/otel/sdk v1.31.0
21+
go.opentelemetry.io/otel/trace v1.31.0
22+
go.uber.org/zap v1.27.0
23+
)
24+
25+
require (
26+
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
27+
github.com/davecgh/go-spew v1.1.1 // indirect
28+
github.com/go-logr/logr v1.4.2 // indirect
29+
github.com/go-logr/stdr v1.2.2 // indirect
30+
github.com/gogo/protobuf v1.3.2 // indirect
31+
github.com/google/uuid v1.6.0 // indirect
32+
github.com/json-iterator/go v1.1.12 // indirect
33+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
34+
github.com/modern-go/reflect2 v1.0.2 // indirect
35+
github.com/pmezard/go-difflib v1.0.0 // indirect
36+
go.opentelemetry.io/collector/config/configtelemetry v0.111.0 // indirect
37+
go.opentelemetry.io/collector/extension v0.111.0 // indirect
38+
go.opentelemetry.io/collector/extension/experimental/storage v0.111.0 // indirect
39+
go.opentelemetry.io/collector/pipeline v0.111.0 // indirect
40+
go.opentelemetry.io/collector/receiver v0.111.0 // indirect
41+
go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.0 // indirect
42+
go.opentelemetry.io/otel/metric v1.31.0 // indirect
43+
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
44+
go.uber.org/multierr v1.11.0 // indirect
45+
golang.org/x/net v0.28.0 // indirect
46+
golang.org/x/sys v0.26.0 // indirect
47+
golang.org/x/text v0.17.0 // indirect
48+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
49+
google.golang.org/grpc v1.67.1 // indirect
50+
google.golang.org/protobuf v1.35.1 // indirect
51+
gopkg.in/yaml.v3 v3.0.1 // indirect
52+
)
53+
54+
replace go.opentelemetry.io/collector/consumer/consumertest => ../../../consumer/consumertest
55+
56+
replace go.opentelemetry.io/collector/pdata/pprofile => ../../../pdata/pprofile
57+
58+
replace go.opentelemetry.io/collector/pdata/testdata => ../../../pdata/testdata
59+
60+
replace go.opentelemetry.io/collector/exporter => ../../
61+
62+
replace go.opentelemetry.io/collector/consumer => ../../../consumer
63+
64+
replace go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles => ../../../consumer/consumererror/consumererrorprofiles
65+
66+
replace go.opentelemetry.io/collector/receiver => ../../../receiver
67+
68+
replace go.opentelemetry.io/collector/consumer/consumerprofiles => ../../../consumer/consumerprofiles
69+
70+
replace go.opentelemetry.io/collector/component => ../../../component
71+
72+
replace go.opentelemetry.io/collector/receiver/receiverprofiles => ../../../receiver/receiverprofiles
73+
74+
replace go.opentelemetry.io/collector/extension => ../../../extension
75+
76+
replace go.opentelemetry.io/collector/pdata => ../../../pdata
77+
78+
replace go.opentelemetry.io/collector/exporter/exporterprofiles => ../../exporterprofiles
79+
80+
replace go.opentelemetry.io/collector/config/configtelemetry => ../../../config/configtelemetry
81+
82+
replace go.opentelemetry.io/collector/config/configretry => ../../../config/configretry
83+
84+
replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../../pipeline/pipelineprofiles
85+
86+
replace go.opentelemetry.io/collector/extension/experimental/storage => ../../../extension/experimental/storage
87+
88+
replace go.opentelemetry.io/collector/pipeline => ../../../pipeline

exporter/exporterhelper/exporterhelperprofiles/go.sum

Lines changed: 98 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterhelperprofiles // import "go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles"
5+
6+
import (
7+
"context"
8+
"errors"
9+
10+
"go.uber.org/zap"
11+
12+
"go.opentelemetry.io/collector/component"
13+
"go.opentelemetry.io/collector/consumer/consumererror"
14+
"go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles"
15+
"go.opentelemetry.io/collector/consumer/consumerprofiles"
16+
"go.opentelemetry.io/collector/exporter"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper"
18+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
19+
"go.opentelemetry.io/collector/exporter/exporterprofiles"
20+
"go.opentelemetry.io/collector/exporter/exporterqueue"
21+
"go.opentelemetry.io/collector/pdata/pprofile"
22+
"go.opentelemetry.io/collector/pipeline/pipelineprofiles"
23+
)
24+
25+
var profilesMarshaler = &pprofile.ProtoMarshaler{}
26+
var profilesUnmarshaler = &pprofile.ProtoUnmarshaler{}
27+
28+
type profilesRequest struct {
29+
pd pprofile.Profiles
30+
pusher consumerprofiles.ConsumeProfilesFunc
31+
}
32+
33+
func newProfilesRequest(pd pprofile.Profiles, pusher consumerprofiles.ConsumeProfilesFunc) exporterhelper.Request {
34+
return &profilesRequest{
35+
pd: pd,
36+
pusher: pusher,
37+
}
38+
}
39+
40+
func newProfileRequestUnmarshalerFunc(pusher consumerprofiles.ConsumeProfilesFunc) exporterqueue.Unmarshaler[exporterhelper.Request] {
41+
return func(bytes []byte) (exporterhelper.Request, error) {
42+
profiles, err := profilesUnmarshaler.UnmarshalProfiles(bytes)
43+
if err != nil {
44+
return nil, err
45+
}
46+
return newProfilesRequest(profiles, pusher), nil
47+
}
48+
}
49+
50+
func profilesRequestMarshaler(req exporterhelper.Request) ([]byte, error) {
51+
return profilesMarshaler.MarshalProfiles(req.(*profilesRequest).pd)
52+
}
53+
54+
func (req *profilesRequest) OnError(err error) exporterhelper.Request {
55+
var profileError consumererrorprofiles.Profiles
56+
if errors.As(err, &profileError) {
57+
return newProfilesRequest(profileError.Data(), req.pusher)
58+
}
59+
return req
60+
}
61+
62+
func (req *profilesRequest) Export(ctx context.Context) error {
63+
return req.pusher(ctx, req.pd)
64+
}
65+
66+
func (req *profilesRequest) ItemsCount() int {
67+
return req.pd.SampleCount()
68+
}
69+
70+
type profileExporter struct {
71+
*internal.BaseExporter
72+
consumerprofiles.Profiles
73+
}
74+
75+
// NewProfilesExporter creates an exporterprofiles.Profiles that records observability metrics and wraps every request with a Span.
76+
func NewProfilesExporter(
77+
ctx context.Context,
78+
set exporter.Settings,
79+
cfg component.Config,
80+
pusher consumerprofiles.ConsumeProfilesFunc,
81+
options ...exporterhelper.Option,
82+
) (exporterprofiles.Profiles, error) {
83+
if cfg == nil {
84+
return nil, errNilConfig
85+
}
86+
if pusher == nil {
87+
return nil, errNilPushProfileData
88+
}
89+
profilesOpts := []exporterhelper.Option{
90+
internal.WithMarshaler(profilesRequestMarshaler), internal.WithUnmarshaler(newProfileRequestUnmarshalerFunc(pusher)),
91+
internal.WithBatchFuncs(mergeProfiles, mergeSplitProfiles),
92+
}
93+
return NewProfilesRequestExporter(ctx, set, requestFromProfiles(pusher), append(profilesOpts, options...)...)
94+
}
95+
96+
// RequestFromProfilesFunc converts pprofile.Profiles into a user-defined Request.
97+
// Experimental: This API is at the early stage of development and may change without backward compatibility
98+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
99+
type RequestFromProfilesFunc func(context.Context, pprofile.Profiles) (exporterhelper.Request, error)
100+
101+
// requestFromProfiles returns a RequestFromProfilesFunc that converts pprofile.Profiles into a Request.
102+
func requestFromProfiles(pusher consumerprofiles.ConsumeProfilesFunc) RequestFromProfilesFunc {
103+
return func(_ context.Context, profiles pprofile.Profiles) (exporterhelper.Request, error) {
104+
return newProfilesRequest(profiles, pusher), nil
105+
}
106+
}
107+
108+
// NewProfilesRequestExporter creates a new profiles exporter based on a custom ProfilesConverter and RequestSender.
109+
// Experimental: This API is at the early stage of development and may change without backward compatibility
110+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
111+
func NewProfilesRequestExporter(
112+
_ context.Context,
113+
set exporter.Settings,
114+
converter RequestFromProfilesFunc,
115+
options ...exporterhelper.Option,
116+
) (exporterprofiles.Profiles, error) {
117+
if set.Logger == nil {
118+
return nil, errNilLogger
119+
}
120+
121+
if converter == nil {
122+
return nil, errNilProfilesConverter
123+
}
124+
125+
be, err := internal.NewBaseExporter(set, pipelineprofiles.SignalProfiles, newProfilesExporterWithObservability, options...)
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
tc, err := consumerprofiles.NewProfiles(func(ctx context.Context, pd pprofile.Profiles) error {
131+
req, cErr := converter(ctx, pd)
132+
if cErr != nil {
133+
set.Logger.Error("Failed to convert profiles. Dropping data.",
134+
zap.Int("dropped_samples", pd.SampleCount()),
135+
zap.Error(err))
136+
return consumererror.NewPermanent(cErr)
137+
}
138+
return be.Send(ctx, req)
139+
}, be.ConsumerOptions...)
140+
141+
return &profileExporter{
142+
BaseExporter: be,
143+
Profiles: tc,
144+
}, err
145+
}
146+
147+
type profilesExporterWithObservability struct {
148+
internal.BaseRequestSender
149+
obsrep *internal.ObsReport
150+
}
151+
152+
func newProfilesExporterWithObservability(obsrep *internal.ObsReport) internal.RequestSender {
153+
return &profilesExporterWithObservability{obsrep: obsrep}
154+
}
155+
156+
func (tewo *profilesExporterWithObservability) Send(ctx context.Context, req exporterhelper.Request) error {
157+
c := tewo.obsrep.StartProfilesOp(ctx)
158+
numSamples := req.ItemsCount()
159+
// Forward the data to the next consumer (this pusher is the next).
160+
err := tewo.NextSender.Send(c, req)
161+
tewo.obsrep.EndProfilesOp(c, numSamples, err)
162+
return err
163+
}

0 commit comments

Comments
 (0)