Skip to content
Draft
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,5 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/smartcontractkit/chainlink-common/pkg/chipingress => ./pkg/chipingress
2 changes: 0 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pkg/beholder/batch_emitter_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type ChipIngressBatchEmitterService struct {
services.Service
eng *services.Engine

batchClient *batch.Client
batchClient *batch.Client
resourceAttrs map[string]string

metricAttrsCache sync.Map // map[string]otelmetric.MeasurementOption
metrics batchEmitterMetrics
Expand Down Expand Up @@ -84,8 +85,9 @@ func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lg
}

e := &ChipIngressBatchEmitterService{
batchClient: batchClient,
metrics: metrics,
batchClient: batchClient,
resourceAttrs: resourceAttributesToStringMap(cfg.ResourceAttributes),
metrics: metrics,
}

e.Service, e.eng = services.Config{
Expand Down Expand Up @@ -136,7 +138,7 @@ func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body

attributes := newAttributes(attrKVs...)

event, err := chipingress.NewEvent(domain, entity, body, attributes)
event, err := chipingress.NewEventWithOpts(domain, entity, body, attributes, chipingress.WithResourceAttributeExtensions(e.resourceAttrs))
if err != nil {
return fmt.Errorf("failed to create CloudEvent: %w", err)
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/beholder/batch_emitter_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,50 @@ func TestChipIngressBatchEmitterService_Emit(t *testing.T) {
})
}

func TestChipIngressBatchEmitterService_ResourceAttributes(t *testing.T) {
clientMock := mocks.NewClient(t)
clientMock.EXPECT().Close().Return(nil).Maybe()

var mu sync.Mutex
var receivedBatch *chipingress.CloudEventBatch
clientMock.
On("PublishBatch", mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
mu.Lock()
defer mu.Unlock()
receivedBatch = args.Get(1).(*chipingress.CloudEventBatch)
}).
Return(nil, nil)

cfg := newTestConfig()
cfg.ChipIngressSendInterval = 50 * time.Millisecond
cfg.ResourceAttributes = []attribute.KeyValue{attribute.String("chain_id", "1")}

emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t))
require.NoError(t, err)
require.NoError(t, emitter.Start(t.Context()))

err = emitter.Emit(t.Context(), []byte("test-payload"),
beholder.AttrKeyDomain, "my-domain",
beholder.AttrKeyEntity, "my-entity",
)
require.NoError(t, err)

assert.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return receivedBatch != nil
}, 2*time.Second, 10*time.Millisecond)

require.NoError(t, emitter.Close())

mu.Lock()
defer mu.Unlock()
require.Len(t, receivedBatch.Events, 1)
require.NotNil(t, receivedBatch.Events[0].Attributes["chainid"])
assert.Equal(t, "1", receivedBatch.Events[0].Attributes["chainid"].GetCeString())
}

func TestChipIngressBatchEmitterService_CloudEventFormat(t *testing.T) {
clientMock := mocks.NewClient(t)
clientMock.EXPECT().Close().Return(nil).Maybe()
Expand Down
48 changes: 38 additions & 10 deletions pkg/beholder/chip_ingress_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,24 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

// resourceAttrExtensions holds resource attributes to stamp as CloudEvent extensions on every
// emitted event. It is stored behind a pointer on ChipIngressEmitter (rather than as a bare map
// field) so the struct itself stays a comparable type — a map field would make it incomparable,
// which is an exported-API-breaking change per apidiff. A nil *resourceAttrExtensions means no
// resource attributes are configured.
type resourceAttrExtensions struct {
attrs map[string]string
}

// ChipIngressEmitter wraps a synchronous chipingress.Client.Publish call
// in a fire-and-forget goroutine so callers are never blocked.
type ChipIngressEmitter struct {
client chipingress.Client
lggr logger.Logger
stopCh services.StopChan
wg services.WaitGroup
closed atomic.Bool
client chipingress.Client
lggr logger.Logger
resourceAttrs *resourceAttrExtensions
stopCh services.StopChan
wg services.WaitGroup
closed atomic.Bool
}

func NewChipIngressEmitter(client chipingress.Client) (Emitter, error) {
Expand All @@ -31,8 +41,15 @@ type ChipIngressEmitterConfig struct {
Lggr logger.Logger
}

// New creates a ChipIngressEmitter from the config.
// New creates a ChipIngressEmitter from the config, with no resource attributes configured.
func (c ChipIngressEmitterConfig) New(client chipingress.Client) (Emitter, error) {
return c.NewWithResourceAttributes(client, nil)
}

// NewWithResourceAttributes creates a ChipIngressEmitter from the config, additionally stamping
// attrs as CloudEvent extensions (via chipingress.WithResourceAttributeExtensions) on every
// emitted event.
func (c ChipIngressEmitterConfig) NewWithResourceAttributes(client chipingress.Client, attrs map[string]string) (Emitter, error) {
if client == nil {
return nil, errors.New("chip ingress client is nil")
}
Expand All @@ -41,10 +58,16 @@ func (c ChipIngressEmitterConfig) New(client chipingress.Client) (Emitter, error
lggr = logger.Nop()
}

var resourceAttrs *resourceAttrExtensions
if len(attrs) > 0 {
resourceAttrs = &resourceAttrExtensions{attrs: attrs}
}

return &ChipIngressEmitter{
client: client,
lggr: lggr,
stopCh: make(services.StopChan),
client: client,
lggr: lggr,
resourceAttrs: resourceAttrs,
stopCh: make(services.StopChan),
}, nil
}

Expand All @@ -65,7 +88,12 @@ func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...a
return err
}

event, err := chipingress.NewEvent(sourceDomain, entityType, body, newAttributes(attrKVs...))
var event chipingress.CloudEvent
if c.resourceAttrs != nil {
event, err = chipingress.NewEventWithOpts(sourceDomain, entityType, body, newAttributes(attrKVs...), chipingress.WithResourceAttributeExtensions(c.resourceAttrs.attrs))
} else {
event, err = chipingress.NewEvent(sourceDomain, entityType, body, newAttributes(attrKVs...))
}
if err != nil {
return err
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/beholder/chip_ingress_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)
Expand Down Expand Up @@ -73,6 +74,59 @@ func TestChipIngressEmit(t *testing.T) {
assert.Error(t, err)
})

t.Run("resource attributes are stamped as sanitized CE extensions", func(t *testing.T) {
clientMock := mocks.NewClient(t)

var published *chipingress.CloudEventPb
clientMock.
On("Publish", mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
published = args.Get(1).(*chipingress.CloudEventPb)
}).
Return(nil, nil)
clientMock.On("Close").Return(nil)

emitter, err := beholder.ChipIngressEmitterConfig{
Lggr: logger.Test(t),
}.NewWithResourceAttributes(clientMock, map[string]string{"chain_id": "1"})
require.NoError(t, err)

err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity)
require.NoError(t, err)

require.NoError(t, emitter.Close())
clientMock.AssertExpectations(t)

require.NotNil(t, published)
require.NotNil(t, published.Attributes["chainid"])
assert.Equal(t, "1", published.Attributes["chainid"].GetCeString())
})

t.Run("New(client) with no resource attributes stamps no extra extensions (backward-compat)", func(t *testing.T) {
clientMock := mocks.NewClient(t)

var published *chipingress.CloudEventPb
clientMock.
On("Publish", mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
published = args.Get(1).(*chipingress.CloudEventPb)
}).
Return(nil, nil)
clientMock.On("Close").Return(nil)

emitter, err := beholder.ChipIngressEmitterConfig{Lggr: logger.Test(t)}.New(clientMock)
require.NoError(t, err)

err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity)
require.NoError(t, err)

require.NoError(t, emitter.Close())
clientMock.AssertExpectations(t)

require.NotNil(t, published)
assert.Nil(t, published.Attributes["chainid"])
})

t.Run("logs error when Publish fails", func(t *testing.T) {
clientMock := mocks.NewClient(t)

Expand Down
7 changes: 6 additions & 1 deletion pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
// eventually we will remove the dual source emitter and just use chip ingress
if cfg.ChipIngressEmitterEnabled || cfg.ChipIngressEmitterGRPCEndpoint != "" {
var opts []chipingress.Opt
resourceAttrs := resourceAttributesToStringMap(cfg.ResourceAttributes)

if cfg.ChipIngressInsecureConnection {
opts = append(opts, chipingress.WithInsecureConnection())
Expand All @@ -215,6 +216,10 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
opts = append(opts, chipingress.WithMeterProvider(meterProvider))
opts = append(opts, chipingress.WithTracerProvider(tracerProvider))

if len(resourceAttrs) > 0 {
opts = append(opts, chipingress.WithHeaderProvider(chipingress.NewStaticHeaderProvider(chipingress.SanitizeMetadataHeaders(resourceAttrs))))
}

chipIngressClient, err = chipingress.NewClient(cfg.ChipIngressEmitterGRPCEndpoint, opts...)
if err != nil {
return nil, err
Expand All @@ -235,7 +240,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
// teardown after parent close hook completes.
chipIngressEmitter = noCloseEmitter{Emitter: batchEmitterService}
} else {
chipIngressEmitter, err = ChipIngressEmitterConfig{Lggr: lggr}.New(chipIngressClient)
chipIngressEmitter, err = ChipIngressEmitterConfig{Lggr: lggr}.NewWithResourceAttributes(chipIngressClient, resourceAttrs)
if err != nil {
return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/beholder/resource_attributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package beholder

import "go.opentelemetry.io/otel/attribute"

// resourceAttributesToStringMap converts OTel resource attributes into a plain string map,
// using attribute.Value.Emit for canonical stringification of any value type. This is the
// single source of truth used to derive both the gRPC metadata headers and the CloudEvent
// extension keys/values sent to ChipIngress, so both mechanisms stay consistent.
func resourceAttributesToStringMap(attrs []attribute.KeyValue) map[string]string {
m := make(map[string]string, len(attrs))
for _, kv := range attrs {
m[string(kv.Key)] = kv.Value.Emit()
}
return m
}
28 changes: 28 additions & 0 deletions pkg/beholder/resource_attributes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package beholder

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
)

func TestResourceAttributesToStringMap(t *testing.T) {
attrs := []attribute.KeyValue{
attribute.String("chain_id", "1"),
attribute.Bool("is_bootstrap", true),
attribute.Int64("node_index", 42),
}

got := resourceAttributesToStringMap(attrs)

assert.Equal(t, map[string]string{
"chain_id": "1",
"is_bootstrap": "true",
"node_index": "42",
}, got)
}

func TestResourceAttributesToStringMap_Empty(t *testing.T) {
assert.Empty(t, resourceAttributesToStringMap(nil))
}
Loading
Loading