Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions pkg/beholder/batch_emitter_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package beholder

import (
"context"
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -157,8 +158,22 @@ func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body
// for metric recording — OTel Add is non-blocking and tolerates
// cancelled contexts.
if sendErr != nil {
e.metrics.eventsDropped.Add(ctx, 1, metricAttrs)
e.eng.Errorw("failed to emit to chip ingress", "error", sendErr, "domain", domain, "entity", entity)
var pubErr *batch.PublishError
if errors.As(sendErr, &pubErr) {
// Per-event partial delivery failure: server accepted the batch but
// rejected this individual event (e.g. schema validation, encode error).
e.metrics.eventsDropped.Add(ctx, 1, e.dropMetricAttrsFor(domain, entity, "partial_delivery"))
e.eng.Errorw("failed to emit to chip ingress (partial delivery)",
"error_code", pubErr.Code.String(),
"reason", pubErr.Reason,
"domain", domain,
"entity", entity,
)
} else {
// Whole-batch RPC failure (network error, timeout, auth, etc.).
e.metrics.eventsDropped.Add(ctx, 1, e.dropMetricAttrsFor(domain, entity, "rpc_error"))
e.eng.Errorw("failed to emit to chip ingress", "error", sendErr, "domain", domain, "entity", entity)
}
} else {
e.metrics.eventsSent.Add(ctx, 1, metricAttrs)
}
Expand Down Expand Up @@ -191,6 +206,18 @@ func (e *ChipIngressBatchEmitterService) metricAttrsFor(domain, entity string) o
return v.(otelmetric.MeasurementOption)
}

// dropMetricAttrsFor returns a measurement option for the eventsDropped counter that
// includes domain, entity, and a failure_type label distinguishing partial per-event
// failures ("partial_delivery") from whole-batch RPC failures ("rpc_error").
// Not cached — drop paths are not on the hot path.
func (e *ChipIngressBatchEmitterService) dropMetricAttrsFor(domain, entity, failureType string) otelmetric.MeasurementOption {
return otelmetric.WithAttributeSet(attribute.NewSet(
attribute.String("domain", domain),
attribute.String("entity", entity),
attribute.String("failure_type", failureType),
))
}

func newBatchEmitterMetrics(meter otelmetric.Meter) (batchEmitterMetrics, error) {
eventsSent, err := meter.Int64Counter("chip_ingress.events_sent",
otelmetric.WithDescription("Total events successfully sent via PublishBatch"),
Expand Down
110 changes: 110 additions & 0 deletions pkg/beholder/batch_emitter_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,116 @@ func TestChipIngressBatchEmitterService_EmitWithCallback(t *testing.T) {
})
}

func TestChipIngressBatchEmitterService_PartialDeliveryError(t *testing.T) {
t.Run("logs error_code and reason on per-event PublishError", func(t *testing.T) {
lggr, observed := logger.TestObserved(t, zap.InfoLevel)

clientMock := mocks.NewClient(t)
clientMock.EXPECT().Close().Return(nil).Maybe()

partialResp := &chipingress.PublishResponse{
Results: []*chipingress.PublishResult{
{
Error: &chipingress.PublishError{
ErrorCode: chipingress.PublishErrorCode(1), // VALIDATION_FAILED
Reason: "schema not found",
},
},
},
}
done := make(chan struct{})
clientMock.
On("PublishBatch", mock.Anything, mock.Anything).
Return(partialResp, nil).
Run(func(_ mock.Arguments) { close(done) }).
Once()

cfg := newTestConfig()
cfg.ChipIngressMaxBatchSize = 1
cfg.ChipIngressSendInterval = time.Second

emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, lggr)
require.NoError(t, err)
require.NoError(t, emitter.Start(t.Context()))

err = emitter.Emit(t.Context(), []byte("body"),
beholder.AttrKeyDomain, "platform",
beholder.AttrKeyEntity, "TestEvent",
)
require.NoError(t, err)

select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("timeout waiting for publish")
}
require.NoError(t, emitter.Close())

logs := observed.FilterMessage("failed to emit to chip ingress (partial delivery)")
require.GreaterOrEqual(t, logs.Len(), 1, "expected partial delivery error log")
entry := logs.All()[0]
assert.Equal(t, zap.ErrorLevel, entry.Level)
fieldMap := logFieldMap(entry)
assert.Equal(t, "platform", fieldMap["domain"])
assert.Equal(t, "TestEvent", fieldMap["entity"])
assert.Contains(t, fieldMap, "error_code")
assert.Equal(t, "schema not found", fieldMap["reason"])
})

t.Run("records events_dropped with partial_delivery failure_type", func(t *testing.T) {
reader, restore := useEmitterTestMeterProvider(t)
defer restore()

clientMock := mocks.NewClient(t)
clientMock.EXPECT().Close().Return(nil).Maybe()

partialResp := &chipingress.PublishResponse{
Results: []*chipingress.PublishResult{
{
Error: &chipingress.PublishError{
ErrorCode: chipingress.PublishErrorCode(1),
Reason: "encode error",
},
},
},
}
done := make(chan struct{})
clientMock.
On("PublishBatch", mock.Anything, mock.Anything).
Return(partialResp, nil).
Run(func(_ mock.Arguments) { close(done) }).
Once()

cfg := newTestConfig()
cfg.ChipIngressMaxBatchSize = 1
cfg.ChipIngressSendInterval = time.Second

emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t))
require.NoError(t, err)
require.NoError(t, emitter.Start(t.Context()))

err = emitter.Emit(t.Context(), []byte("body"),
beholder.AttrKeyDomain, "platform",
beholder.AttrKeyEntity, "PartialEvent",
)
require.NoError(t, err)

select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("timeout waiting for publish")
}
require.NoError(t, emitter.Close())

rm := collectEmitterMetrics(t, reader)
metric := mustEmitterMetric(t, rm, "chip_ingress.events_dropped")
sum, ok := metric.Data.(metricdata.Sum[int64])
require.True(t, ok)
dp := mustEmitterInt64SumPoint(t, sum, "failure_type", "partial_delivery", "entity", "PartialEvent")
assert.GreaterOrEqual(t, dp.Value, int64(1))
})
}

func TestChipIngressBatchEmitterService_Metrics(t *testing.T) {
t.Run("records events_sent on successful publish", func(t *testing.T) {
reader, restore := useEmitterTestMeterProvider(t)
Expand Down
Loading