Skip to content

Commit 7d96c01

Browse files
committed
[pkg/ottl] Accessors for profile attributes
1 parent 6d93f06 commit 7d96c01

File tree

1 file changed

+163
-0
lines changed

1 file changed

+163
-0
lines changed

pkg/ottl/contexts/internal/ctxprofile/profile.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ package ctxprofile // import "github.com/open-telemetry/opentelemetry-collector-
66
import (
77
"context"
88
"encoding/hex"
9+
"errors"
10+
"fmt"
11+
"math"
912
"time"
1013

1114
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -25,6 +28,7 @@ func PathGetSetter[K ProfileContext](path ottl.Path[K]) (ottl.GetSetter[K], erro
2528
if path == nil {
2629
return nil, ctxerror.New("nil", "nil", Name, DocRef)
2730
}
31+
fmt.Printf("PathGetterSetter: %v\n", path.Name())
2832
switch path.Name() {
2933
case "sample_type":
3034
return accessSampleType[K](), nil
@@ -79,6 +83,11 @@ func PathGetSetter[K ProfileContext](path ottl.Path[K]) (ottl.GetSetter[K], erro
7983
return accessOriginalPayloadFormat[K](), nil
8084
case "original_payload":
8185
return accessOriginalPayload[K](), nil
86+
case "attributes":
87+
if path.Keys() == nil {
88+
return accessAttributes[K](), nil
89+
}
90+
return accessAttributesKey(path.Keys()), nil
8291
default:
8392
return nil, ctxerror.New(path.Name(), path.String(), Name, DocRef)
8493
}
@@ -412,3 +421,157 @@ func accessOriginalPayload[K ProfileContext]() ottl.StandardGetSetter[K] {
412421
},
413422
}
414423
}
424+
425+
func accessAttributes[K ProfileContext]() ottl.StandardGetSetter[K] {
426+
return ottl.StandardGetSetter[K]{
427+
Getter: func(_ context.Context, tCtx K) (any, error) {
428+
fmt.Printf("accessAttributes.Getter\n")
429+
dumpMap("get map", pprofile.FromAttributeIndices(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile()))
430+
return pprofile.FromAttributeIndices(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile()), nil
431+
},
432+
Setter: func(_ context.Context, tCtx K, val any) error {
433+
fmt.Printf("accessAttributes.Setter\n")
434+
m, ok := val.(pcommon.Map)
435+
if !ok {
436+
return fmt.Errorf("expected pcommon.Map, got %T", val)
437+
}
438+
dumpMap("set map", m)
439+
tCtx.GetProfile().AttributeIndices().FromRaw([]int32{})
440+
for k, v := range m.All() {
441+
if err := PutAttribute(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile(), k, v); err != nil {
442+
return err
443+
}
444+
}
445+
return nil
446+
},
447+
}
448+
}
449+
450+
func accessAttributesKey[K Context](key []ottl.Key[K]) ottl.StandardGetSetter[K] {
451+
return ottl.StandardGetSetter[K]{
452+
Getter: func(ctx context.Context, tCtx K) (any, error) {
453+
fmt.Printf("accessAttributesKey.Getter\n")
454+
return ctxutil.GetMapValue[K](ctx, tCtx, pprofile.FromAttributeIndices(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile()), key)
455+
/*
456+
table := tCtx.GetProfile().AttributeTable()
457+
searchKey, err := key[0].String(ctx, tCtx)
458+
fmt.Printf("accessAttributesKey.Getter: %v\n", searchKey)
459+
if err != nil {
460+
return nil, err
461+
}
462+
for idx := range tCtx.GetProfile().AttributeIndices().AsRaw() {
463+
if idx >= table.Len() {
464+
continue
465+
}
466+
attr := table.At(idx)
467+
if attr.Key() == *searchKey {
468+
fmt.Printf("return accessAttributesKey.Getter: %v\n", attr.Value().AsString())
469+
return attr.Value(), nil
470+
}
471+
}
472+
return nil, fmt.Errorf("key %s not found in attribute table", *searchKey)
473+
*/
474+
},
475+
Setter: func(ctx context.Context, tCtx K, val any) error {
476+
newKey, err := key[0].String(ctx, tCtx)
477+
fmt.Printf("accessAttributesKey.Setter: %v val %v\n", *newKey, val)
478+
if err != nil {
479+
return err
480+
}
481+
fmt.Printf("accessAttributesKey.Setter: %v\n", *newKey)
482+
v := pcommon.NewValueEmpty()
483+
if err = v.FromRaw(val); err != nil {
484+
return err
485+
}
486+
return PutAttribute(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile(), *newKey, v)
487+
},
488+
}
489+
}
490+
491+
type attributable interface {
492+
AttributeIndices() pcommon.Int32Slice
493+
}
494+
495+
var errTooManyTableEntries = errors.New("too many entries in AttributeTable")
496+
497+
// PutAttribute updates an AttributeTable and a record's AttributeIndices to
498+
// add a new attribute.
499+
// The record can be any struct that implements an `AttributeIndices` method.
500+
func PutAttribute(table pprofile.AttributeTableSlice, record attributable, key string, value pcommon.Value) error {
501+
for i := range record.AttributeIndices().Len() {
502+
idx := int(record.AttributeIndices().At(i))
503+
if idx < 0 || idx >= table.Len() {
504+
return fmt.Errorf("index value %d out of range in AttributeIndices[%d]", idx, i)
505+
}
506+
attr := table.At(idx)
507+
if attr.Key() == key {
508+
if attr.Value().Equal(value) {
509+
// Attribute already exists, nothing to do.
510+
return nil
511+
}
512+
513+
// If the attribute table already contains the key/value pair, just update the index.
514+
for j := range table.Len() {
515+
a := table.At(j)
516+
if a.Key() == key && a.Value().Equal(value) {
517+
if j > math.MaxInt32 {
518+
return errTooManyTableEntries
519+
}
520+
record.AttributeIndices().SetAt(i, int32(j))
521+
return nil
522+
}
523+
}
524+
525+
if table.Len() >= math.MaxInt32 {
526+
return errTooManyTableEntries
527+
}
528+
529+
// Add the key/value pair as a new attribute to the table...
530+
entry := table.AppendEmpty()
531+
entry.SetKey(key)
532+
value.CopyTo(entry.Value())
533+
534+
// ...and update the existing index.
535+
record.AttributeIndices().SetAt(i, int32(table.Len()-1))
536+
return nil
537+
}
538+
}
539+
540+
if record.AttributeIndices().Len() >= math.MaxInt32 {
541+
return errors.New("too many entries in AttributeIndices")
542+
}
543+
544+
for j := range table.Len() {
545+
a := table.At(j)
546+
if a.Key() == key && a.Value().Equal(value) {
547+
if j > math.MaxInt32 {
548+
return errTooManyTableEntries
549+
}
550+
// Add the index of the existing attribute to the indices.
551+
record.AttributeIndices().Append(int32(j))
552+
return nil
553+
}
554+
}
555+
556+
if table.Len() >= math.MaxInt32 {
557+
return errTooManyTableEntries
558+
}
559+
560+
// Add the key/value pair as a new attribute to the table...
561+
entry := table.AppendEmpty()
562+
entry.SetKey(key)
563+
value.CopyTo(entry.Value())
564+
565+
// ...and add a new index to the indices.
566+
record.AttributeIndices().Append(int32(table.Len() - 1))
567+
return nil
568+
}
569+
570+
func dumpMap(prefix string, m pcommon.Map) {
571+
fmt.Printf("%s: ", prefix)
572+
m.Range(func(k string, v pcommon.Value) bool {
573+
fmt.Printf("{%s, %v}", k, v.AsString())
574+
return true
575+
})
576+
fmt.Printf("\n")
577+
}

0 commit comments

Comments
 (0)