Skip to content

Commit ecc282f

Browse files
committed
[pkg/ottl] Accessors for profile attributes
1 parent f658cf2 commit ecc282f

File tree

4 files changed

+182
-1
lines changed

4 files changed

+182
-1
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/ottl
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add accessors for profile attributes
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39681]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

pkg/ottl/contexts/internal/ctxprofile/profile.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ package ctxprofile // import "github.com/open-telemetry/opentelemetry-collector-
66
import (
77
"context"
88
"encoding/hex"
9+
"errors"
10+
"fmt"
11+
"math"
912
"time"
1013

1114
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -79,6 +82,11 @@ func PathGetSetter[K ProfileContext](path ottl.Path[K]) (ottl.GetSetter[K], erro
7982
return accessOriginalPayloadFormat[K](), nil
8083
case "original_payload":
8184
return accessOriginalPayload[K](), nil
85+
case "attributes":
86+
if path.Keys() == nil {
87+
return accessAttributes[K](), nil
88+
}
89+
return accessAttributesKey(path.Keys()), nil
8290
default:
8391
return nil, ctxerror.New(path.Name(), path.String(), Name, DocRef)
8492
}
@@ -412,3 +420,124 @@ func accessOriginalPayload[K ProfileContext]() ottl.StandardGetSetter[K] {
412420
},
413421
}
414422
}
423+
424+
func accessAttributes[K ProfileContext]() ottl.StandardGetSetter[K] {
425+
return ottl.StandardGetSetter[K]{
426+
Getter: func(_ context.Context, tCtx K) (any, error) {
427+
return pprofile.FromAttributeIndices(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile()), nil
428+
},
429+
Setter: func(_ context.Context, tCtx K, val any) error {
430+
m, ok := val.(pcommon.Map)
431+
if !ok {
432+
return fmt.Errorf("expected pcommon.Map, got %T", val)
433+
}
434+
tCtx.GetProfile().AttributeIndices().FromRaw([]int32{})
435+
for k, v := range m.All() {
436+
if err := PutAttribute(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile(), k, v); err != nil {
437+
return err
438+
}
439+
}
440+
return nil
441+
},
442+
}
443+
}
444+
445+
func accessAttributesKey[K Context](key []ottl.Key[K]) ottl.StandardGetSetter[K] {
446+
return ottl.StandardGetSetter[K]{
447+
Getter: func(ctx context.Context, tCtx K) (any, error) {
448+
return ctxutil.GetMapValue[K](ctx, tCtx, pprofile.FromAttributeIndices(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile()), key)
449+
},
450+
Setter: func(ctx context.Context, tCtx K, val any) error {
451+
newKey, err := key[0].String(ctx, tCtx)
452+
if err != nil {
453+
return err
454+
}
455+
v := pcommon.NewValueEmpty()
456+
if err = v.FromRaw(val); err != nil {
457+
return err
458+
}
459+
return PutAttribute(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile(), *newKey, v)
460+
},
461+
}
462+
}
463+
464+
// TODO: Remove the following code once https://github.com/open-telemetry/opentelemetry-collector/pull/12798 is merged.
465+
466+
type attributable interface {
467+
AttributeIndices() pcommon.Int32Slice
468+
}
469+
470+
var errTooManyTableEntries = errors.New("too many entries in AttributeTable")
471+
472+
// PutAttribute updates an AttributeTable and a record's AttributeIndices to
473+
// add a new attribute.
474+
// The record can be any struct that implements an `AttributeIndices` method.
475+
func PutAttribute(table pprofile.AttributeTableSlice, record attributable, key string, value pcommon.Value) error {
476+
for i := range record.AttributeIndices().Len() {
477+
idx := int(record.AttributeIndices().At(i))
478+
if idx < 0 || idx >= table.Len() {
479+
return fmt.Errorf("index value %d out of range in AttributeIndices[%d]", idx, i)
480+
}
481+
attr := table.At(idx)
482+
if attr.Key() == key {
483+
if attr.Value().Equal(value) {
484+
// Attribute already exists, nothing to do.
485+
return nil
486+
}
487+
488+
// If the attribute table already contains the key/value pair, just update the index.
489+
for j := range table.Len() {
490+
a := table.At(j)
491+
if a.Key() == key && a.Value().Equal(value) {
492+
if j > math.MaxInt32 {
493+
return errTooManyTableEntries
494+
}
495+
record.AttributeIndices().SetAt(i, int32(j))
496+
return nil
497+
}
498+
}
499+
500+
if table.Len() >= math.MaxInt32 {
501+
return errTooManyTableEntries
502+
}
503+
504+
// Add the key/value pair as a new attribute to the table...
505+
entry := table.AppendEmpty()
506+
entry.SetKey(key)
507+
value.CopyTo(entry.Value())
508+
509+
// ...and update the existing index.
510+
record.AttributeIndices().SetAt(i, int32(table.Len()-1))
511+
return nil
512+
}
513+
}
514+
515+
if record.AttributeIndices().Len() >= math.MaxInt32 {
516+
return errors.New("too many entries in AttributeIndices")
517+
}
518+
519+
for j := range table.Len() {
520+
a := table.At(j)
521+
if a.Key() == key && a.Value().Equal(value) {
522+
if j > math.MaxInt32 {
523+
return errTooManyTableEntries
524+
}
525+
// Add the index of the existing attribute to the indices.
526+
record.AttributeIndices().Append(int32(j))
527+
return nil
528+
}
529+
}
530+
531+
if table.Len() >= math.MaxInt32 {
532+
return errTooManyTableEntries
533+
}
534+
535+
// Add the key/value pair as a new attribute to the table...
536+
entry := table.AppendEmpty()
537+
entry.SetKey(key)
538+
value.CopyTo(entry.Value())
539+
540+
// ...and add a new index to the indices.
541+
record.AttributeIndices().Append(int32(table.Len() - 1))
542+
return nil
543+
}

pkg/ottl/contexts/internal/ctxprofile/profile_test.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,17 @@ import (
1313
"go.opentelemetry.io/collector/pdata/pcommon"
1414
"go.opentelemetry.io/collector/pdata/pprofile"
1515

16+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
1617
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/pathtest"
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest"
1719
)
1820

1921
func TestPathGetSetter(t *testing.T) {
2022
// create tests
2123
tests := []struct {
2224
path string
2325
val any
26+
keys []ottl.Key[*profileContext]
2427
setFails bool
2528
}{
2629
{
@@ -124,19 +127,39 @@ func TestPathGetSetter(t *testing.T) {
124127
path: "original_payload",
125128
val: []byte{1, 2, 3},
126129
},
130+
{
131+
path: "attributes",
132+
val: func() pcommon.Map {
133+
m := pcommon.NewMap()
134+
m.PutStr("akey", "val")
135+
return m
136+
}(),
137+
},
138+
{
139+
path: "attributes",
140+
keys: []ottl.Key[*profileContext]{
141+
&pathtest.Key[*profileContext]{
142+
S: ottltest.Strp("akey"),
143+
},
144+
},
145+
val: "val",
146+
},
127147
}
128148

129149
for _, tt := range tests {
130150
t.Run(tt.path, func(t *testing.T) {
131151
pathParts := strings.Split(tt.path, " ")
132152
path := &pathtest.Path[*profileContext]{N: pathParts[0]}
153+
if tt.keys != nil {
154+
path.KeySlice = tt.keys
155+
}
133156
if len(pathParts) > 1 {
134157
path.NextPath = &pathtest.Path[*profileContext]{N: pathParts[1]}
135158
}
136159

137160
profile := pprofile.NewProfile()
138161

139-
accessor, err := PathGetSetter[*profileContext](path)
162+
accessor, err := PathGetSetter(path)
140163
require.NoError(t, err)
141164

142165
err = accessor.Set(context.Background(), newProfileContext(profile), tt.val)

pkg/ottl/contexts/ottlprofile/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ The following paths are supported.
2222
| instrumentation_scope.version | version of the instrumentation scope of the profile being processed | string |
2323
| instrumentation_scope.attributes | instrumentation scope attributes of the data point being processed | pcommon.Map |
2424
| instrumentation_scope.attributes\[""\] | the value of the instrumentation scope attribute of the data point being processed. Supports multiple indexes to access nested fields. | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil |
25+
| profile.attributes | attributes of the profile being processed | pcommon.Map |
26+
| profile.attributes\[""\] | the value of the attribute of the profile being processed. Supports multiple indexes to access nested fields. | string, bool, int64, float64, pcommon.Map, pcommon.Slice, []byte or nil |
2527
| profile.sample_type | the sample types of the profile being processed | pprofile.ValueTypeSlice |
2628
| profile.sample | the samples of the profile being processed | pprofile.SampleSlice |
2729
| profile.mapping_table | the mapping table of the profile being processed | pprofile.MappingSlice |

0 commit comments

Comments
 (0)