From abca3e9e73189b09adf07ca769fcf002126501e7 Mon Sep 17 00:00:00 2001 From: Sanchit2662 Date: Mon, 19 Jan 2026 03:28:51 +0530 Subject: [PATCH] receiver: Add partial success support for receivers Signed-off-by: Sanchit2662 --- consumer/consumererror/partial.go | 80 +++++++ consumer/consumererror/partial_test.go | 127 +++++++++++ receiver/receiverhelper/obsreport.go | 22 +- receiver/receiverhelper/obsreport_test.go | 256 ++++++++++++++++++++++ 4 files changed, 481 insertions(+), 4 deletions(-) create mode 100644 consumer/consumererror/partial.go create mode 100644 consumer/consumererror/partial_test.go diff --git a/consumer/consumererror/partial.go b/consumer/consumererror/partial.go new file mode 100644 index 00000000000..2a0d6f4f853 --- /dev/null +++ b/consumer/consumererror/partial.go @@ -0,0 +1,80 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror" + +import "errors" + +// PartialError indicates that some items in a batch were processed successfully +// while others failed. It carries the count of failed items so that observability +// infrastructure can accurately report partial success. +// +// This error type is primarily intended for use by receivers that can partially +// process incoming data. For example, a receiver that successfully processes +// 80 out of 100 metrics can return a PartialError with failedCount=20. +// +// The receiverhelper will use this information to correctly compute: +// - accepted items = total received - failed count +// - failed/refused items = failed count (based on error type) +type PartialError struct { + err error + failedCount int +} + +// NewPartialError creates an error indicating partial failure. +// failedCount is the number of items that failed to be processed. +// The wrapped error should describe what went wrong with the failed items. +// +// Example usage: +// +// // In a receiver that processes metrics +// if len(translationErrors) > 0 { +// return consumererror.NewPartialError( +// fmt.Errorf("failed to translate %d metrics", len(translationErrors)), +// len(translationErrors), +// ) +// } +// +// To indicate that the failure was due to downstream refusal, wrap with NewDownstream: +// +// return consumererror.NewDownstream( +// consumererror.NewPartialError(err, failedCount), +// ) +func NewPartialError(err error, failedCount int) error { + if failedCount < 0 { + failedCount = 0 + } + return &PartialError{ + err: err, + failedCount: failedCount, + } +} + +// Error implements the error interface. +func (p *PartialError) Error() string { + return p.err.Error() +} + +// Unwrap returns the wrapped error for use with errors.Is and errors.As. +func (p *PartialError) Unwrap() error { + return p.err +} + +// FailedCount returns the number of items that failed to be processed. +func (p *PartialError) FailedCount() int { + return p.failedCount +} + +// GetPartialError extracts a PartialError from an error chain if present. +// Returns nil if no PartialError is found in the error chain. +// +// This function is typically used by observability infrastructure to +// determine if an error represents a partial failure and extract the +// count of failed items. +func GetPartialError(err error) *PartialError { + var pe *PartialError + if errors.As(err, &pe) { + return pe + } + return nil +} diff --git a/consumer/consumererror/partial_test.go b/consumer/consumererror/partial_test.go new file mode 100644 index 00000000000..78e3d69fc4d --- /dev/null +++ b/consumer/consumererror/partial_test.go @@ -0,0 +1,127 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumererror + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewPartialError(t *testing.T) { + origErr := errors.New("translation failed") + + t.Run("basic", func(t *testing.T) { + err := NewPartialError(origErr, 5) + assert.ErrorIs(t, err, origErr) + assert.Equal(t, "translation failed", err.Error()) + + pe := GetPartialError(err) + require.NotNil(t, pe) + assert.Equal(t, 5, pe.FailedCount()) + }) + + t.Run("zero_count", func(t *testing.T) { + err := NewPartialError(origErr, 0) + pe := GetPartialError(err) + require.NotNil(t, pe) + assert.Equal(t, 0, pe.FailedCount()) + }) + + t.Run("negative_count_becomes_zero", func(t *testing.T) { + err := NewPartialError(origErr, -1) + pe := GetPartialError(err) + require.NotNil(t, pe) + assert.Equal(t, 0, pe.FailedCount()) + }) +} + +func TestGetPartialError(t *testing.T) { + t.Run("nil_error", func(t *testing.T) { + pe := GetPartialError(nil) + assert.Nil(t, pe) + }) + + t.Run("non_partial_error", func(t *testing.T) { + err := errors.New("regular error") + pe := GetPartialError(err) + assert.Nil(t, pe) + }) + + t.Run("wrapped_partial_error", func(t *testing.T) { + origErr := errors.New("inner error") + partialErr := NewPartialError(origErr, 10) + wrappedErr := fmt.Errorf("outer: %w", partialErr) + + pe := GetPartialError(wrappedErr) + require.NotNil(t, pe) + assert.Equal(t, 10, pe.FailedCount()) + }) +} + +func TestPartialError_Unwrap(t *testing.T) { + var innerErr error = testErrorType{"testError"} + + partialErr := NewPartialError(innerErr, 7) + require.NotNil(t, GetPartialError(partialErr)) + + // Verify the inner error can be extracted with errors.As + target := testErrorType{} + require.NotEqual(t, innerErr, target) + + isWrapped := errors.As(partialErr, &target) + require.True(t, isWrapped) + require.Equal(t, innerErr, target) +} + +func TestPartialError_CombinedWithDownstream(t *testing.T) { + origErr := errors.New("queue full") + partialErr := NewPartialError(origErr, 30) + downstreamErr := NewDownstream(partialErr) + + // Should be recognized as downstream error + assert.True(t, IsDownstream(downstreamErr)) + + // Should still be able to extract the partial error + pe := GetPartialError(downstreamErr) + require.NotNil(t, pe) + assert.Equal(t, 30, pe.FailedCount()) + + // Original error should still be accessible + assert.ErrorIs(t, downstreamErr, origErr) +} + +func TestPartialError_CombinedWithPermanent(t *testing.T) { + origErr := errors.New("invalid format") + partialErr := NewPartialError(origErr, 15) + permanentErr := NewPermanent(partialErr) + + // Should be recognized as permanent error + assert.True(t, IsPermanent(permanentErr)) + + // Should still be able to extract the partial error + pe := GetPartialError(permanentErr) + require.NotNil(t, pe) + assert.Equal(t, 15, pe.FailedCount()) + + // Original error should still be accessible + assert.ErrorIs(t, permanentErr, origErr) +} + +func TestPartialError_DownstreamWrappingPartial(t *testing.T) { + // Test the recommended pattern: downstream wrapping partial + origErr := errors.New("pipeline refused") + partialErr := NewPartialError(origErr, 20) + combinedErr := NewDownstream(partialErr) + + // All checks should work + assert.True(t, IsDownstream(combinedErr)) + pe := GetPartialError(combinedErr) + require.NotNil(t, pe) + assert.Equal(t, 20, pe.FailedCount()) + assert.ErrorIs(t, combinedErr, origErr) +} diff --git a/receiver/receiverhelper/obsreport.go b/receiver/receiverhelper/obsreport.go index ece4d7efac0..accde32332c 100644 --- a/receiver/receiverhelper/obsreport.go +++ b/receiver/receiverhelper/obsreport.go @@ -182,17 +182,31 @@ func (rec *ObsReport) endOp( numRefused := 0 numFailedErrors := 0 if err != nil { - numAccepted = 0 + // Check if this is a partial error with a specific failed count. + // This allows receivers to report partial success when some items + // were processed successfully while others failed. + failedCount := numReceivedItems // default: all items failed + if pe := consumererror.GetPartialError(err); pe != nil { + failedCount = pe.FailedCount() + // Sanity check: failed count cannot exceed received items + if failedCount > numReceivedItems { + failedCount = numReceivedItems + } + } + + // Calculate accepted items (supports partial success) + numAccepted = numReceivedItems - failedCount + // If gate is enabled, we distinguish between refused and failed. if NewReceiverMetricsGate.IsEnabled() { if consumererror.IsDownstream(err) { - numRefused = numReceivedItems + numRefused = failedCount } else { - numFailedErrors = numReceivedItems + numFailedErrors = failedCount } } else { // When the gate is disabled, all errors are considered "refused". - numRefused = numReceivedItems + numRefused = failedCount } } diff --git a/receiver/receiverhelper/obsreport_test.go b/receiver/receiverhelper/obsreport_test.go index 95b2e3e3678..00f0509b697 100644 --- a/receiver/receiverhelper/obsreport_test.go +++ b/receiver/receiverhelper/obsreport_test.go @@ -772,6 +772,262 @@ func TestCheckReceiverProfilesViews(t *testing.T) { }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) } +func TestReceivePartialSuccess(t *testing.T) { + originalState := NewReceiverMetricsGate.IsEnabled() + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(NewReceiverMetricsGate.ID(), originalState)) + }) + + for _, tc := range []struct { + name string + enabled bool + }{{"gate_enabled", true}, {"gate_disabled", false}} { + t.Run(tc.name, func(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(NewReceiverMetricsGate.ID(), tc.enabled)) + + t.Run("partial_failure_internal", func(t *testing.T) { + testTelemetry(t, func(t *testing.T, tt *componenttest.Telemetry) { + rec, err := newReceiver(ObsReportSettings{ + ReceiverID: receiverID, + Transport: transport, + ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + }) + require.NoError(t, err) + + ctx := rec.StartMetricsOp(context.Background()) + // 100 items received, 20 failed internally (not downstream) + partialErr := consumererror.NewPartialError(errors.New("parse error"), 20) + rec.EndMetricsOp(ctx, format, 100, partialErr) + + // Verify metrics: 80 accepted, 20 failed (or refused if gate disabled) + metadatatest.AssertEqualReceiverAcceptedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(80), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + if tc.enabled { + // Gate enabled: internal errors go to failed + metadatatest.AssertEqualReceiverFailedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(20), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverRefusedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(0), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + } else { + // Gate disabled: all errors go to refused + metadatatest.AssertEqualReceiverRefusedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(20), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverFailedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(0), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + } + + // Verify span attributes + spans := tt.SpanRecorder.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedMetricPointsKey, Value: attribute.Int64Value(80)}) + if tc.enabled { + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedMetricPointsKey, Value: attribute.Int64Value(20)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedMetricPointsKey, Value: attribute.Int64Value(0)}) + } else { + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedMetricPointsKey, Value: attribute.Int64Value(20)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedMetricPointsKey, Value: attribute.Int64Value(0)}) + } + assert.Equal(t, codes.Error, span.Status().Code) + }) + }) + + t.Run("partial_failure_downstream", func(t *testing.T) { + testTelemetry(t, func(t *testing.T, tt *componenttest.Telemetry) { + rec, err := newReceiver(ObsReportSettings{ + ReceiverID: receiverID, + Transport: transport, + ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + }) + require.NoError(t, err) + + ctx := rec.StartTracesOp(context.Background()) + // 100 items received, 30 refused by downstream + partialErr := consumererror.NewDownstream( + consumererror.NewPartialError(errors.New("queue full"), 30), + ) + rec.EndTracesOp(ctx, format, 100, partialErr) + + // Verify metrics: 70 accepted, 30 refused + metadatatest.AssertEqualReceiverAcceptedSpans(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(70), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverRefusedSpans(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(30), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverFailedSpans(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(0), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // Verify span attributes + spans := tt.SpanRecorder.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedSpansKey, Value: attribute.Int64Value(70)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedSpansKey, Value: attribute.Int64Value(30)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedSpansKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Error, span.Status().Code) + }) + }) + + t.Run("partial_failure_exceeds_total", func(t *testing.T) { + // Test that failed count is capped at received items + testTelemetry(t, func(t *testing.T, tt *componenttest.Telemetry) { + rec, err := newReceiver(ObsReportSettings{ + ReceiverID: receiverID, + Transport: transport, + ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + }) + require.NoError(t, err) + + ctx := rec.StartLogsOp(context.Background()) + // 50 items received, but error claims 100 failed (should be capped to 50) + partialErr := consumererror.NewPartialError(errors.New("bad count"), 100) + rec.EndLogsOp(ctx, format, 50, partialErr) + + // Verify metrics: 0 accepted (capped), all to failed/refused + metadatatest.AssertEqualReceiverAcceptedLogRecords(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(0), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + if tc.enabled { + metadatatest.AssertEqualReceiverFailedLogRecords(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(50), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + } else { + metadatatest.AssertEqualReceiverRefusedLogRecords(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(50), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + } + }) + }) + + t.Run("zero_partial_failure", func(t *testing.T) { + // Test partial error with zero failed count (edge case) + testTelemetry(t, func(t *testing.T, tt *componenttest.Telemetry) { + rec, err := newReceiver(ObsReportSettings{ + ReceiverID: receiverID, + Transport: transport, + ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + }) + require.NoError(t, err) + + ctx := rec.StartMetricsOp(context.Background()) + // 100 items received, 0 failed (all accepted, but still an error) + partialErr := consumererror.NewPartialError(errors.New("warning only"), 0) + rec.EndMetricsOp(ctx, format, 100, partialErr) + + // Verify metrics: all 100 accepted + metadatatest.AssertEqualReceiverAcceptedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(100), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverRefusedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(0), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverFailedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(0), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // Span should still show error status + spans := tt.SpanRecorder.Ended() + require.Len(t, spans, 1) + assert.Equal(t, codes.Error, spans[0].Status().Code) + }) + }) + }) + } +} + func testTelemetry(t *testing.T, testFunc func(t *testing.T, tt *componenttest.Telemetry)) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })