Skip to content

Commit f7100b2

Browse files
committed
[pkg/ottl] Accessors for profile attributes
1 parent 45f97af commit f7100b2

File tree

1 file changed

+162
-0
lines changed

1 file changed

+162
-0
lines changed

pkg/ottl/contexts/internal/ctxprofile/profile.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"context"
88
"encoding/hex"
99
"errors"
10+
"fmt"
11+
"math"
1012
"time"
1113

1214
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -26,6 +28,7 @@ func PathGetSetter[K ProfileContext](path ottl.Path[K]) (ottl.GetSetter[K], erro
2628
if path == nil {
2729
return nil, ctxerror.New("nil", "nil", Name, DocRef)
2830
}
31+
fmt.Printf("PathGetterSetter: %v\n", path.Name())
2932
switch path.Name() {
3033
case "sample_type":
3134
return accessSampleType[K](), nil
@@ -80,6 +83,11 @@ func PathGetSetter[K ProfileContext](path ottl.Path[K]) (ottl.GetSetter[K], erro
8083
return accessOriginalPayloadFormat[K](), nil
8184
case "original_payload":
8285
return accessOriginalPayload[K](), nil
86+
case "attributes":
87+
if path.Keys() == nil {
88+
return accessAttributes[K](), nil
89+
}
90+
return accessAttributesKey(path.Keys()), nil
8391
default:
8492
return nil, ctxerror.New(path.Name(), path.String(), Name, DocRef)
8593
}
@@ -419,3 +427,157 @@ func accessOriginalPayload[K ProfileContext]() ottl.StandardGetSetter[K] {
419427
},
420428
}
421429
}
430+
431+
func accessAttributes[K ProfileContext]() ottl.StandardGetSetter[K] {
432+
return ottl.StandardGetSetter[K]{
433+
Getter: func(_ context.Context, tCtx K) (any, error) {
434+
fmt.Printf("accessAttributes.Getter\n")
435+
dumpMap("get map", pprofile.FromAttributeIndices(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile()))
436+
return pprofile.FromAttributeIndices(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile()), nil
437+
},
438+
Setter: func(_ context.Context, tCtx K, val any) error {
439+
fmt.Printf("accessAttributes.Setter\n")
440+
m, ok := val.(pcommon.Map)
441+
if !ok {
442+
return fmt.Errorf("expected pcommon.Map, got %T", val)
443+
}
444+
dumpMap("set map", m)
445+
tCtx.GetProfile().AttributeIndices().FromRaw([]int32{})
446+
for k, v := range m.All() {
447+
if err := PutAttribute(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile(), k, v); err != nil {
448+
return err
449+
}
450+
}
451+
return nil
452+
},
453+
}
454+
}
455+
456+
func accessAttributesKey[K Context](key []ottl.Key[K]) ottl.StandardGetSetter[K] {
457+
return ottl.StandardGetSetter[K]{
458+
Getter: func(ctx context.Context, tCtx K) (any, error) {
459+
fmt.Printf("accessAttributesKey.Getter\n")
460+
return ctxutil.GetMapValue[K](ctx, tCtx, pprofile.FromAttributeIndices(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile()), key)
461+
/*
462+
table := tCtx.GetProfile().AttributeTable()
463+
searchKey, err := key[0].String(ctx, tCtx)
464+
fmt.Printf("accessAttributesKey.Getter: %v\n", searchKey)
465+
if err != nil {
466+
return nil, err
467+
}
468+
for idx := range tCtx.GetProfile().AttributeIndices().AsRaw() {
469+
if idx >= table.Len() {
470+
continue
471+
}
472+
attr := table.At(idx)
473+
if attr.Key() == *searchKey {
474+
fmt.Printf("return accessAttributesKey.Getter: %v\n", attr.Value().AsString())
475+
return attr.Value(), nil
476+
}
477+
}
478+
return nil, fmt.Errorf("key %s not found in attribute table", *searchKey)
479+
*/
480+
},
481+
Setter: func(ctx context.Context, tCtx K, val any) error {
482+
newKey, err := key[0].String(ctx, tCtx)
483+
fmt.Printf("accessAttributesKey.Setter: %v val %v\n", *newKey, val)
484+
if err != nil {
485+
return err
486+
}
487+
fmt.Printf("accessAttributesKey.Setter: %v\n", *newKey)
488+
v := pcommon.NewValueEmpty()
489+
if err = v.FromRaw(val); err != nil {
490+
return err
491+
}
492+
return PutAttribute(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile(), *newKey, v)
493+
},
494+
}
495+
}
496+
497+
type attributable interface {
498+
AttributeIndices() pcommon.Int32Slice
499+
}
500+
501+
var errTooManyTableEntries = errors.New("too many entries in AttributeTable")
502+
503+
// PutAttribute updates an AttributeTable and a record's AttributeIndices to
504+
// add a new attribute.
505+
// The record can be any struct that implements an `AttributeIndices` method.
506+
func PutAttribute(table pprofile.AttributeTableSlice, record attributable, key string, value pcommon.Value) error {
507+
for i := range record.AttributeIndices().Len() {
508+
idx := int(record.AttributeIndices().At(i))
509+
if idx < 0 || idx >= table.Len() {
510+
return fmt.Errorf("index value %d out of range in AttributeIndices[%d]", idx, i)
511+
}
512+
attr := table.At(idx)
513+
if attr.Key() == key {
514+
if attr.Value().Equal(value) {
515+
// Attribute already exists, nothing to do.
516+
return nil
517+
}
518+
519+
// If the attribute table already contains the key/value pair, just update the index.
520+
for j := range table.Len() {
521+
a := table.At(j)
522+
if a.Key() == key && a.Value().Equal(value) {
523+
if j > math.MaxInt32 {
524+
return errTooManyTableEntries
525+
}
526+
record.AttributeIndices().SetAt(i, int32(j))
527+
return nil
528+
}
529+
}
530+
531+
if table.Len() >= math.MaxInt32 {
532+
return errTooManyTableEntries
533+
}
534+
535+
// Add the key/value pair as a new attribute to the table...
536+
entry := table.AppendEmpty()
537+
entry.SetKey(key)
538+
value.CopyTo(entry.Value())
539+
540+
// ...and update the existing index.
541+
record.AttributeIndices().SetAt(i, int32(table.Len()-1))
542+
return nil
543+
}
544+
}
545+
546+
if record.AttributeIndices().Len() >= math.MaxInt32 {
547+
return errors.New("too many entries in AttributeIndices")
548+
}
549+
550+
for j := range table.Len() {
551+
a := table.At(j)
552+
if a.Key() == key && a.Value().Equal(value) {
553+
if j > math.MaxInt32 {
554+
return errTooManyTableEntries
555+
}
556+
// Add the index of the existing attribute to the indices.
557+
record.AttributeIndices().Append(int32(j))
558+
return nil
559+
}
560+
}
561+
562+
if table.Len() >= math.MaxInt32 {
563+
return errTooManyTableEntries
564+
}
565+
566+
// Add the key/value pair as a new attribute to the table...
567+
entry := table.AppendEmpty()
568+
entry.SetKey(key)
569+
value.CopyTo(entry.Value())
570+
571+
// ...and add a new index to the indices.
572+
record.AttributeIndices().Append(int32(table.Len() - 1))
573+
return nil
574+
}
575+
576+
func dumpMap(prefix string, m pcommon.Map) {
577+
fmt.Printf("%s: ", prefix)
578+
m.Range(func(k string, v pcommon.Value) bool {
579+
fmt.Printf("{%s, %v}", k, v.AsString())
580+
return true
581+
})
582+
fmt.Printf("\n")
583+
}

0 commit comments

Comments
 (0)