Skip to content

Commit 2523a93

Browse files
committed
[processor/transform] Add support for profiles
1 parent 34acefd commit 2523a93

File tree

17 files changed

+1616
-18
lines changed

17 files changed

+1616
-18
lines changed

internal/filter/filterottl/filter.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
1111
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
1212
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
13+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlprofile"
1314
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource"
1415
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope"
1516
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
@@ -121,6 +122,27 @@ func NewBoolExprForLogWithOptions(conditions []string, functions map[string]ottl
121122
return &c, nil
122123
}
123124

125+
// NewBoolExprForProfile creates a BoolExpr[ottlprofile.TransformContext] that will return true if any of the given OTTL conditions evaluate to true.
126+
// The passed in functions should use the ottlprofile.TransformContext.
127+
// If a function named `match` is not present in the function map it will be added automatically so that parsing works as expected
128+
func NewBoolExprForProfile(conditions []string, functions map[string]ottl.Factory[ottlprofile.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings) (*ottl.ConditionSequence[ottlprofile.TransformContext], error) {
129+
return NewBoolExprForProfileWithOptions(conditions, functions, errorMode, set, nil)
130+
}
131+
132+
// NewBoolExprForLogWithOptions is like NewBoolExprForLog, but with additional options.
133+
func NewBoolExprForProfileWithOptions(conditions []string, functions map[string]ottl.Factory[ottlprofile.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings, parserOptions []ottl.Option[ottlprofile.TransformContext]) (*ottl.ConditionSequence[ottlprofile.TransformContext], error) {
134+
parser, err := ottlprofile.NewParser(functions, set, parserOptions...)
135+
if err != nil {
136+
return nil, err
137+
}
138+
statements, err := parser.ParseConditions(conditions)
139+
if err != nil {
140+
return nil, err
141+
}
142+
c := ottlprofile.NewConditionSequence(statements, set, ottlprofile.WithConditionSequenceErrorMode(errorMode))
143+
return &c, nil
144+
}
145+
124146
// NewBoolExprForResource creates a BoolExpr[ottlresource.TransformContext] that will return true if any of the given OTTL conditions evaluate to true.
125147
// The passed in functions should use the ottlresource.TransformContext.
126148
// If a function named `match` is not present in the function map it will be added automatically so that parsing works as expected

internal/filter/filterottl/functions.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
1414
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
1515
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlprofile"
1617
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource"
1718
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope"
1819
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
@@ -52,6 +53,10 @@ func StandardLogFuncs() map[string]ottl.Factory[ottllog.TransformContext] {
5253
return ottlfuncs.StandardConverters[ottllog.TransformContext]()
5354
}
5455

56+
func StandardProfileFuncs() map[string]ottl.Factory[ottlprofile.TransformContext] {
57+
return ottlfuncs.StandardConverters[ottlprofile.TransformContext]()
58+
}
59+
5560
func StandardResourceFuncs() map[string]ottl.Factory[ottlresource.TransformContext] {
5661
return ottlfuncs.StandardConverters[ottlresource.TransformContext]()
5762
}

internal/filter/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ require (
6262
golang.org/x/net v0.37.0 // indirect
6363
golang.org/x/sys v0.31.0 // indirect
6464
golang.org/x/text v0.23.0 // indirect
65-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
65+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e // indirect
6666
google.golang.org/grpc v1.71.0 // indirect
6767
google.golang.org/protobuf v1.36.5 // indirect
6868
gopkg.in/yaml.v2 v2.4.0 // indirect

internal/filter/go.sum

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/ottl/contexts/ottlprofile/profile.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import (
2222
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/logprofile"
2323
)
2424

25+
// Experimental: *NOTE* this constant is subject to change or removal in the future.
26+
const ContextName = ctxprofile.Name
27+
2528
var (
2629
_ ctxresource.Context = TransformContext{}
2730
_ ctxscope.Context = TransformContext{}
@@ -46,8 +49,6 @@ type TransformContext struct {
4649
resourceProfiles pprofile.ResourceProfiles
4750
}
4851

49-
type Option func(*ottl.Parser[TransformContext])
50-
5152
type TransformContextOption func(*TransformContext)
5253

5354
func NewTransformContext(profile pprofile.Profile, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeProfiles pprofile.ScopeProfiles, resourceProfiles pprofile.ResourceProfiles, options ...TransformContextOption) TransformContext {
@@ -107,7 +108,7 @@ type pathExpressionParser struct {
107108
cacheGetSetter ottl.PathExpressionParser[TransformContext]
108109
}
109110

110-
func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySettings component.TelemetrySettings, options ...Option) (ottl.Parser[TransformContext], error) {
111+
func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySettings component.TelemetrySettings, options ...ottl.Option[TransformContext]) (ottl.Parser[TransformContext], error) {
111112
pep := pathExpressionParser{
112113
telemetrySettings: telemetrySettings,
113114
cacheGetSetter: ctxcache.PathExpressionParser(getCache),
@@ -131,7 +132,7 @@ func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySet
131132
// otherwise an error is reported.
132133
//
133134
// Experimental: *NOTE* this option is subject to change or removal in the future.
134-
func EnablePathContextNames() Option {
135+
func EnablePathContextNames() ottl.Option[TransformContext] {
135136
return func(p *ottl.Parser[TransformContext]) {
136137
ottl.WithPathContextNames[TransformContext]([]string{
137138
ctxprofile.Name,

processor/transformprocessor/factory.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,31 @@ import (
99

1010
"go.opentelemetry.io/collector/component"
1111
"go.opentelemetry.io/collector/consumer"
12+
"go.opentelemetry.io/collector/consumer/xconsumer"
1213
"go.opentelemetry.io/collector/processor"
1314
"go.opentelemetry.io/collector/processor/processorhelper"
15+
"go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper"
16+
"go.opentelemetry.io/collector/processor/xprocessor"
1417

1518
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
1619
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
1720
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
1821
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metadata"
1922
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
23+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/profiles"
2024
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
2125
)
2226

2327
var processorCapabilities = consumer.Capabilities{MutatesData: true}
2428

2529
func NewFactory() processor.Factory {
26-
return processor.NewFactory(
30+
return xprocessor.NewFactory(
2731
metadata.Type,
2832
createDefaultConfig,
29-
processor.WithLogs(createLogsProcessor, metadata.LogsStability),
30-
processor.WithTraces(createTracesProcessor, metadata.TracesStability),
31-
processor.WithMetrics(createMetricsProcessor, metadata.MetricsStability),
33+
xprocessor.WithLogs(createLogsProcessor, metadata.LogsStability),
34+
xprocessor.WithTraces(createTracesProcessor, metadata.TracesStability),
35+
xprocessor.WithMetrics(createMetricsProcessor, metadata.MetricsStability),
36+
xprocessor.WithProfiles(createProfilesProcessor, metadata.ProfilesStability),
3237
)
3338
}
3439

@@ -104,3 +109,25 @@ func createMetricsProcessor(
104109
proc.ProcessMetrics,
105110
processorhelper.WithCapabilities(processorCapabilities))
106111
}
112+
113+
func createProfilesProcessor(
114+
ctx context.Context,
115+
set processor.Settings,
116+
cfg component.Config,
117+
nextConsumer xconsumer.Profiles,
118+
) (xprocessor.Profiles, error) {
119+
oCfg := cfg.(*Config)
120+
oCfg.logger = set.Logger
121+
122+
proc, err := profiles.NewProcessor(oCfg.MetricStatements, oCfg.ErrorMode, set.TelemetrySettings)
123+
if err != nil {
124+
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
125+
}
126+
return xprocessorhelper.NewProfiles(
127+
ctx,
128+
set,
129+
cfg,
130+
nextConsumer,
131+
proc.ProcessProfiles,
132+
xprocessorhelper.WithCapabilities(processorCapabilities))
133+
}

processor/transformprocessor/go.mod

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ require (
2828
go.opentelemetry.io/collector/component/componenttest v0.122.2-0.20250319144947-41a9ea7f7402
2929
go.opentelemetry.io/collector/confmap/xconfmap v0.122.2-0.20250319144947-41a9ea7f7402
3030
go.opentelemetry.io/collector/consumer/consumertest v0.122.2-0.20250319144947-41a9ea7f7402
31+
go.opentelemetry.io/collector/consumer/xconsumer v0.122.2-0.20250319144947-41a9ea7f7402
32+
go.opentelemetry.io/collector/pdata/pprofile v0.122.2-0.20250319144947-41a9ea7f7402
33+
go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.122.1
3134
go.opentelemetry.io/collector/processor/processortest v0.122.2-0.20250319144947-41a9ea7f7402
35+
go.opentelemetry.io/collector/processor/xprocessor v0.122.2-0.20250319144947-41a9ea7f7402
3236
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
3337
)
3438

@@ -67,18 +71,15 @@ require (
6771
github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 // indirect
6872
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
6973
go.opentelemetry.io/collector/component/componentstatus v0.122.2-0.20250319144947-41a9ea7f7402 // indirect
70-
go.opentelemetry.io/collector/consumer/xconsumer v0.122.2-0.20250319144947-41a9ea7f7402 // indirect
71-
go.opentelemetry.io/collector/pdata/pprofile v0.122.2-0.20250319144947-41a9ea7f7402 // indirect
7274
go.opentelemetry.io/collector/pdata/testdata v0.122.2-0.20250319144947-41a9ea7f7402 // indirect
7375
go.opentelemetry.io/collector/pipeline v0.122.2-0.20250319144947-41a9ea7f7402 // indirect
74-
go.opentelemetry.io/collector/processor/xprocessor v0.122.2-0.20250319144947-41a9ea7f7402 // indirect
7576
go.opentelemetry.io/otel v1.35.0 // indirect
7677
go.opentelemetry.io/otel/sdk v1.35.0 // indirect
7778
go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect
7879
golang.org/x/net v0.37.0 // indirect
7980
golang.org/x/sys v0.31.0 // indirect
8081
golang.org/x/text v0.23.0 // indirect
81-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
82+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e // indirect
8283
google.golang.org/grpc v1.71.0 // indirect
8384
google.golang.org/protobuf v1.36.5 // indirect
8485
gopkg.in/yaml.v2 v2.4.0 // indirect

processor/transformprocessor/go.sum

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/transformprocessor/internal/common/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ const (
2222
Metric ContextID = "metric"
2323
DataPoint ContextID = "datapoint"
2424
Log ContextID = "log"
25+
Profile ContextID = "profile"
2526
)
2627

2728
func (c *ContextID) UnmarshalText(text []byte) error {
2829
str := ContextID(strings.ToLower(string(text)))
2930
switch str {
30-
case Resource, Scope, Span, SpanEvent, Metric, DataPoint, Log:
31+
case Resource, Scope, Span, SpanEvent, Metric, DataPoint, Log, Profile:
3132
*c = str
3233
return nil
3334
default:

processor/transformprocessor/internal/common/processor.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"go.opentelemetry.io/collector/pdata/pcommon"
1111
"go.opentelemetry.io/collector/pdata/plog"
1212
"go.opentelemetry.io/collector/pdata/pmetric"
13+
"go.opentelemetry.io/collector/pdata/pprofile"
1314
"go.opentelemetry.io/collector/pdata/ptrace"
1415

1516
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr"
@@ -84,6 +85,24 @@ func (r resourceStatements) ConsumeLogs(ctx context.Context, ld plog.Logs, cache
8485
return nil
8586
}
8687

88+
func (r resourceStatements) ConsumeProfiles(ctx context.Context, ld pprofile.Profiles, cache *pcommon.Map) error {
89+
for i := 0; i < ld.ResourceProfiles().Len(); i++ {
90+
rprofiles := ld.ResourceProfiles().At(i)
91+
tCtx := ottlresource.NewTransformContext(rprofiles.Resource(), rprofiles, ottlresource.WithCache(cache))
92+
condition, err := r.BoolExpr.Eval(ctx, tCtx)
93+
if err != nil {
94+
return err
95+
}
96+
if condition {
97+
err := r.Execute(ctx, tCtx)
98+
if err != nil {
99+
return err
100+
}
101+
}
102+
}
103+
return nil
104+
}
105+
87106
var _ baseContext = &scopeStatements{}
88107

89108
type scopeStatements struct {
@@ -158,10 +177,32 @@ func (s scopeStatements) ConsumeLogs(ctx context.Context, ld plog.Logs, cache *p
158177
return nil
159178
}
160179

180+
func (s scopeStatements) ConsumeProfiles(ctx context.Context, ld pprofile.Profiles, cache *pcommon.Map) error {
181+
for i := 0; i < ld.ResourceProfiles().Len(); i++ {
182+
rprofiles := ld.ResourceProfiles().At(i)
183+
for j := 0; j < rprofiles.ScopeProfiles().Len(); j++ {
184+
sprofiles := rprofiles.ScopeProfiles().At(j)
185+
tCtx := ottlscope.NewTransformContext(sprofiles.Scope(), rprofiles.Resource(), sprofiles, ottlscope.WithCache(cache))
186+
condition, err := s.BoolExpr.Eval(ctx, tCtx)
187+
if err != nil {
188+
return err
189+
}
190+
if condition {
191+
err := s.Execute(ctx, tCtx)
192+
if err != nil {
193+
return err
194+
}
195+
}
196+
}
197+
}
198+
return nil
199+
}
200+
161201
type baseContext interface {
162202
TracesConsumer
163203
MetricsConsumer
164204
LogsConsumer
205+
ProfilesConsumer
165206
}
166207

167208
func withCommonContextParsers[R any]() ottl.ParserCollectionOption[R] {

0 commit comments

Comments
 (0)