|
7 | 7 | "context" |
8 | 8 | "encoding/hex" |
9 | 9 | "errors" |
| 10 | + "fmt" |
| 11 | + "math" |
10 | 12 | "time" |
11 | 13 |
|
12 | 14 | "go.opentelemetry.io/collector/pdata/pcommon" |
@@ -80,6 +82,11 @@ func PathGetSetter[K ProfileContext](path ottl.Path[K]) (ottl.GetSetter[K], erro |
80 | 82 | return accessOriginalPayloadFormat[K](), nil |
81 | 83 | case "original_payload": |
82 | 84 | return accessOriginalPayload[K](), nil |
| 85 | + case "attributes": |
| 86 | + if path.Keys() == nil { |
| 87 | + return accessAttributes[K](), nil |
| 88 | + } |
| 89 | + return accessAttributesKey(path.Keys()), nil |
83 | 90 | default: |
84 | 91 | return nil, ctxerror.New(path.Name(), path.String(), Name, DocRef) |
85 | 92 | } |
@@ -419,3 +426,124 @@ func accessOriginalPayload[K ProfileContext]() ottl.StandardGetSetter[K] { |
419 | 426 | }, |
420 | 427 | } |
421 | 428 | } |
| 429 | + |
| 430 | +func accessAttributes[K ProfileContext]() ottl.StandardGetSetter[K] { |
| 431 | + return ottl.StandardGetSetter[K]{ |
| 432 | + Getter: func(_ context.Context, tCtx K) (any, error) { |
| 433 | + return pprofile.FromAttributeIndices(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile()), nil |
| 434 | + }, |
| 435 | + Setter: func(_ context.Context, tCtx K, val any) error { |
| 436 | + m, ok := val.(pcommon.Map) |
| 437 | + if !ok { |
| 438 | + return fmt.Errorf("expected pcommon.Map, got %T", val) |
| 439 | + } |
| 440 | + tCtx.GetProfile().AttributeIndices().FromRaw([]int32{}) |
| 441 | + for k, v := range m.All() { |
| 442 | + if err := PutAttribute(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile(), k, v); err != nil { |
| 443 | + return err |
| 444 | + } |
| 445 | + } |
| 446 | + return nil |
| 447 | + }, |
| 448 | + } |
| 449 | +} |
| 450 | + |
| 451 | +func accessAttributesKey[K Context](key []ottl.Key[K]) ottl.StandardGetSetter[K] { |
| 452 | + return ottl.StandardGetSetter[K]{ |
| 453 | + Getter: func(ctx context.Context, tCtx K) (any, error) { |
| 454 | + return ctxutil.GetMapValue[K](ctx, tCtx, pprofile.FromAttributeIndices(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile()), key) |
| 455 | + }, |
| 456 | + Setter: func(ctx context.Context, tCtx K, val any) error { |
| 457 | + newKey, err := key[0].String(ctx, tCtx) |
| 458 | + if err != nil { |
| 459 | + return err |
| 460 | + } |
| 461 | + v := pcommon.NewValueEmpty() |
| 462 | + if err = v.FromRaw(val); err != nil { |
| 463 | + return err |
| 464 | + } |
| 465 | + return PutAttribute(tCtx.GetProfile().AttributeTable(), tCtx.GetProfile(), *newKey, v) |
| 466 | + }, |
| 467 | + } |
| 468 | +} |
| 469 | + |
| 470 | +// TODO: Remove the following code once https://github.com/open-telemetry/opentelemetry-collector/pull/12798 is merged. |
| 471 | + |
| 472 | +type attributable interface { |
| 473 | + AttributeIndices() pcommon.Int32Slice |
| 474 | +} |
| 475 | + |
| 476 | +var errTooManyTableEntries = errors.New("too many entries in AttributeTable") |
| 477 | + |
| 478 | +// PutAttribute updates an AttributeTable and a record's AttributeIndices to |
| 479 | +// add a new attribute. |
| 480 | +// The record can be any struct that implements an `AttributeIndices` method. |
| 481 | +func PutAttribute(table pprofile.AttributeTableSlice, record attributable, key string, value pcommon.Value) error { |
| 482 | + for i := range record.AttributeIndices().Len() { |
| 483 | + idx := int(record.AttributeIndices().At(i)) |
| 484 | + if idx < 0 || idx >= table.Len() { |
| 485 | + return fmt.Errorf("index value %d out of range in AttributeIndices[%d]", idx, i) |
| 486 | + } |
| 487 | + attr := table.At(idx) |
| 488 | + if attr.Key() == key { |
| 489 | + if attr.Value().Equal(value) { |
| 490 | + // Attribute already exists, nothing to do. |
| 491 | + return nil |
| 492 | + } |
| 493 | + |
| 494 | + // If the attribute table already contains the key/value pair, just update the index. |
| 495 | + for j := range table.Len() { |
| 496 | + a := table.At(j) |
| 497 | + if a.Key() == key && a.Value().Equal(value) { |
| 498 | + if j > math.MaxInt32 { |
| 499 | + return errTooManyTableEntries |
| 500 | + } |
| 501 | + record.AttributeIndices().SetAt(i, int32(j)) |
| 502 | + return nil |
| 503 | + } |
| 504 | + } |
| 505 | + |
| 506 | + if table.Len() >= math.MaxInt32 { |
| 507 | + return errTooManyTableEntries |
| 508 | + } |
| 509 | + |
| 510 | + // Add the key/value pair as a new attribute to the table... |
| 511 | + entry := table.AppendEmpty() |
| 512 | + entry.SetKey(key) |
| 513 | + value.CopyTo(entry.Value()) |
| 514 | + |
| 515 | + // ...and update the existing index. |
| 516 | + record.AttributeIndices().SetAt(i, int32(table.Len()-1)) |
| 517 | + return nil |
| 518 | + } |
| 519 | + } |
| 520 | + |
| 521 | + if record.AttributeIndices().Len() >= math.MaxInt32 { |
| 522 | + return errors.New("too many entries in AttributeIndices") |
| 523 | + } |
| 524 | + |
| 525 | + for j := range table.Len() { |
| 526 | + a := table.At(j) |
| 527 | + if a.Key() == key && a.Value().Equal(value) { |
| 528 | + if j > math.MaxInt32 { |
| 529 | + return errTooManyTableEntries |
| 530 | + } |
| 531 | + // Add the index of the existing attribute to the indices. |
| 532 | + record.AttributeIndices().Append(int32(j)) |
| 533 | + return nil |
| 534 | + } |
| 535 | + } |
| 536 | + |
| 537 | + if table.Len() >= math.MaxInt32 { |
| 538 | + return errTooManyTableEntries |
| 539 | + } |
| 540 | + |
| 541 | + // Add the key/value pair as a new attribute to the table... |
| 542 | + entry := table.AppendEmpty() |
| 543 | + entry.SetKey(key) |
| 544 | + value.CopyTo(entry.Value()) |
| 545 | + |
| 546 | + // ...and add a new index to the indices. |
| 547 | + record.AttributeIndices().Append(int32(table.Len() - 1)) |
| 548 | + return nil |
| 549 | +} |
0 commit comments