Skip to content

Commit abdc205

Browse files
committed
[pkg/ottl] Accessors for profile attributes
1 parent 245b083 commit abdc205

File tree

1 file changed

+127
-0
lines changed

1 file changed

+127
-0
lines changed

pkg/ottl/contexts/internal/ctxprofile/profile.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"context"
88
"encoding/hex"
99
"errors"
10+
"fmt"
11+
"math"
1012
"time"
1113

1214
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -26,6 +28,7 @@ func PathGetSetter[K ProfileContext](path ottl.Path[K]) (ottl.GetSetter[K], erro
2628
if path == nil {
2729
return nil, ctxerror.New("nil", "nil", Name, DocRef)
2830
}
31+
fmt.Printf("PathGetterSetter: %v\n", path.Name())
2932
switch path.Name() {
3033
case "sample_type":
3134
return accessSampleType[K](), nil
@@ -80,6 +83,11 @@ func PathGetSetter[K ProfileContext](path ottl.Path[K]) (ottl.GetSetter[K], erro
8083
return accessOriginalPayloadFormat[K](), nil
8184
case "original_payload":
8285
return accessOriginalPayload[K](), nil
86+
case "attributes":
87+
if path.Keys() == nil {
88+
return accessAttributes[K](), nil
89+
}
90+
return accessAttributesKey(path.Keys()), nil
8391
default:
8492
return nil, ctxerror.New(path.Name(), path.String(), Name, DocRef)
8593
}
@@ -419,3 +427,122 @@ func accessOriginalPayload[K ProfileContext]() ottl.StandardGetSetter[K] {
419427
},
420428
}
421429
}
430+
431+
func accessAttributes[K ProfileContext]() ottl.StandardGetSetter[K] {
432+
return ottl.StandardGetSetter[K]{
433+
Getter: func(_ context.Context, tCtx K) (any, error) {
434+
return pprofile.FromAttributeIndices(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile()), nil
435+
},
436+
Setter: func(_ context.Context, tCtx K, val any) error {
437+
m, ok := val.(pcommon.Map)
438+
if !ok {
439+
return fmt.Errorf("expected pcommon.Map, got %T", val)
440+
}
441+
tCtx.GetProfile().AttributeIndices().FromRaw([]int32{})
442+
for k, v := range m.All() {
443+
if err := PutAttribute(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile(), k, v); err != nil {
444+
return err
445+
}
446+
}
447+
return nil
448+
},
449+
}
450+
}
451+
452+
func accessAttributesKey[K Context](key []ottl.Key[K]) ottl.StandardGetSetter[K] {
453+
return ottl.StandardGetSetter[K]{
454+
Getter: func(ctx context.Context, tCtx K) (any, error) {
455+
return ctxutil.GetMapValue[K](ctx, tCtx, pprofile.FromAttributeIndices(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile()), key)
456+
},
457+
Setter: func(ctx context.Context, tCtx K, val any) error {
458+
newKey, err := key[0].String(ctx, tCtx)
459+
if err != nil {
460+
return err
461+
}
462+
v := pcommon.NewValueEmpty()
463+
if err = ctxutil.SetIndexableValue[K](ctx, tCtx, v, val, key[1:]); err != nil {
464+
return err
465+
}
466+
return PutAttribute(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile(), *newKey, v)
467+
},
468+
}
469+
}
470+
471+
type attributable interface {
472+
AttributeIndices() pcommon.Int32Slice
473+
}
474+
475+
var errTooManyTableEntries = errors.New("too many entries in AttributeTable")
476+
477+
// PutAttribute updates an AttributeTable and a record's AttributeIndices to
478+
// add a new attribute.
479+
// The record can be any struct that implements an `AttributeIndices` method.
480+
func PutAttribute(table pprofile.AttributeTableSlice, record attributable, key string, value pcommon.Value) error {
481+
for i := range record.AttributeIndices().Len() {
482+
idx := int(record.AttributeIndices().At(i))
483+
if idx < 0 || idx >= table.Len() {
484+
return fmt.Errorf("index value %d out of range in AttributeIndices[%d]", idx, i)
485+
}
486+
attr := table.At(idx)
487+
if attr.Key() == key {
488+
if attr.Value().Equal(value) {
489+
// Attribute already exists, nothing to do.
490+
return nil
491+
}
492+
493+
// If the attribute table already contains the key/value pair, just update the index.
494+
for j := range table.Len() {
495+
a := table.At(j)
496+
if a.Key() == key && a.Value().Equal(value) {
497+
if j > math.MaxInt32 {
498+
return errTooManyTableEntries
499+
}
500+
record.AttributeIndices().SetAt(i, int32(j))
501+
return nil
502+
}
503+
}
504+
505+
if table.Len() >= math.MaxInt32 {
506+
return errTooManyTableEntries
507+
}
508+
509+
// Add the key/value pair as a new attribute to the table...
510+
entry := table.AppendEmpty()
511+
entry.SetKey(key)
512+
value.CopyTo(entry.Value())
513+
514+
// ...and update the existing index.
515+
record.AttributeIndices().SetAt(i, int32(table.Len()-1))
516+
return nil
517+
}
518+
}
519+
520+
if record.AttributeIndices().Len() >= math.MaxInt32 {
521+
return errors.New("too many entries in AttributeIndices")
522+
}
523+
524+
for j := range table.Len() {
525+
a := table.At(j)
526+
if a.Key() == key && a.Value().Equal(value) {
527+
if j > math.MaxInt32 {
528+
return errTooManyTableEntries
529+
}
530+
// Add the index of the existing attribute to the indices.
531+
record.AttributeIndices().Append(int32(j))
532+
return nil
533+
}
534+
}
535+
536+
if table.Len() >= math.MaxInt32 {
537+
return errTooManyTableEntries
538+
}
539+
540+
// Add the key/value pair as a new attribute to the table...
541+
entry := table.AppendEmpty()
542+
entry.SetKey(key)
543+
value.CopyTo(entry.Value())
544+
545+
// ...and add a new index to the indices.
546+
record.AttributeIndices().Append(int32(table.Len() - 1))
547+
return nil
548+
}

0 commit comments

Comments
 (0)