Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions consumer/consumererror/partial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror"

import "errors"

// PartialError indicates that some items in a batch were processed successfully
// while others failed. It carries the count of failed items so that observability
// infrastructure can accurately report partial success.
//
// This error type is primarily intended for use by receivers that can partially
// process incoming data. For example, a receiver that successfully processes
// 80 out of 100 metrics can return a PartialError with failedCount=20.
//
// The receiverhelper will use this information to correctly compute:
// - accepted items = total received - failed count
// - failed/refused items = failed count (based on error type)
type PartialError struct {
err error
failedCount int
}

// NewPartialError creates an error indicating partial failure.
// failedCount is the number of items that failed to be processed.
// The wrapped error should describe what went wrong with the failed items.
//
// Example usage:
//
// // In a receiver that processes metrics
// if len(translationErrors) > 0 {
// return consumererror.NewPartialError(
// fmt.Errorf("failed to translate %d metrics", len(translationErrors)),
// len(translationErrors),
// )
// }
//
// To indicate that the failure was due to downstream refusal, wrap with NewDownstream:
//
// return consumererror.NewDownstream(
// consumererror.NewPartialError(err, failedCount),
// )
func NewPartialError(err error, failedCount int) error {
if failedCount < 0 {
failedCount = 0
}
return &PartialError{
err: err,
failedCount: failedCount,
}
}

// Error implements the error interface.
func (p *PartialError) Error() string {
return p.err.Error()
}

// Unwrap returns the wrapped error for use with errors.Is and errors.As.
func (p *PartialError) Unwrap() error {
return p.err
}

// FailedCount returns the number of items that failed to be processed.
func (p *PartialError) FailedCount() int {
return p.failedCount
}

// GetPartialError extracts a PartialError from an error chain if present.
// Returns nil if no PartialError is found in the error chain.
//
// This function is typically used by observability infrastructure to
// determine if an error represents a partial failure and extract the
// count of failed items.
func GetPartialError(err error) *PartialError {
var pe *PartialError
if errors.As(err, &pe) {
return pe
}
return nil
}
127 changes: 127 additions & 0 deletions consumer/consumererror/partial_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package consumererror

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewPartialError(t *testing.T) {
origErr := errors.New("translation failed")

t.Run("basic", func(t *testing.T) {
err := NewPartialError(origErr, 5)
assert.ErrorIs(t, err, origErr)
assert.Equal(t, "translation failed", err.Error())

pe := GetPartialError(err)
require.NotNil(t, pe)
assert.Equal(t, 5, pe.FailedCount())
})

t.Run("zero_count", func(t *testing.T) {
err := NewPartialError(origErr, 0)
pe := GetPartialError(err)
require.NotNil(t, pe)
assert.Equal(t, 0, pe.FailedCount())
})

t.Run("negative_count_becomes_zero", func(t *testing.T) {
err := NewPartialError(origErr, -1)
pe := GetPartialError(err)
require.NotNil(t, pe)
assert.Equal(t, 0, pe.FailedCount())
})
}

func TestGetPartialError(t *testing.T) {
t.Run("nil_error", func(t *testing.T) {
pe := GetPartialError(nil)
assert.Nil(t, pe)
})

t.Run("non_partial_error", func(t *testing.T) {
err := errors.New("regular error")
pe := GetPartialError(err)
assert.Nil(t, pe)
})

t.Run("wrapped_partial_error", func(t *testing.T) {
origErr := errors.New("inner error")
partialErr := NewPartialError(origErr, 10)
wrappedErr := fmt.Errorf("outer: %w", partialErr)

pe := GetPartialError(wrappedErr)
require.NotNil(t, pe)
assert.Equal(t, 10, pe.FailedCount())
})
}

func TestPartialError_Unwrap(t *testing.T) {
var innerErr error = testErrorType{"testError"}

partialErr := NewPartialError(innerErr, 7)
require.NotNil(t, GetPartialError(partialErr))

// Verify the inner error can be extracted with errors.As
target := testErrorType{}
require.NotEqual(t, innerErr, target)

isWrapped := errors.As(partialErr, &target)
require.True(t, isWrapped)
require.Equal(t, innerErr, target)
}

func TestPartialError_CombinedWithDownstream(t *testing.T) {
origErr := errors.New("queue full")
partialErr := NewPartialError(origErr, 30)
downstreamErr := NewDownstream(partialErr)

// Should be recognized as downstream error
assert.True(t, IsDownstream(downstreamErr))

// Should still be able to extract the partial error
pe := GetPartialError(downstreamErr)
require.NotNil(t, pe)
assert.Equal(t, 30, pe.FailedCount())

// Original error should still be accessible
assert.ErrorIs(t, downstreamErr, origErr)
}

func TestPartialError_CombinedWithPermanent(t *testing.T) {
origErr := errors.New("invalid format")
partialErr := NewPartialError(origErr, 15)
permanentErr := NewPermanent(partialErr)

// Should be recognized as permanent error
assert.True(t, IsPermanent(permanentErr))

// Should still be able to extract the partial error
pe := GetPartialError(permanentErr)
require.NotNil(t, pe)
assert.Equal(t, 15, pe.FailedCount())

// Original error should still be accessible
assert.ErrorIs(t, permanentErr, origErr)
}

func TestPartialError_DownstreamWrappingPartial(t *testing.T) {
// Test the recommended pattern: downstream wrapping partial
origErr := errors.New("pipeline refused")
partialErr := NewPartialError(origErr, 20)
combinedErr := NewDownstream(partialErr)

// All checks should work
assert.True(t, IsDownstream(combinedErr))
pe := GetPartialError(combinedErr)
require.NotNil(t, pe)
assert.Equal(t, 20, pe.FailedCount())
assert.ErrorIs(t, combinedErr, origErr)
}
22 changes: 18 additions & 4 deletions receiver/receiverhelper/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,31 @@ func (rec *ObsReport) endOp(
numRefused := 0
numFailedErrors := 0
if err != nil {
numAccepted = 0
// Check if this is a partial error with a specific failed count.
// This allows receivers to report partial success when some items
// were processed successfully while others failed.
failedCount := numReceivedItems // default: all items failed
if pe := consumererror.GetPartialError(err); pe != nil {
failedCount = pe.FailedCount()
// Sanity check: failed count cannot exceed received items
if failedCount > numReceivedItems {
failedCount = numReceivedItems
}
}

// Calculate accepted items (supports partial success)
numAccepted = numReceivedItems - failedCount

// If gate is enabled, we distinguish between refused and failed.
if NewReceiverMetricsGate.IsEnabled() {
if consumererror.IsDownstream(err) {
numRefused = numReceivedItems
numRefused = failedCount
} else {
numFailedErrors = numReceivedItems
numFailedErrors = failedCount
}
} else {
// When the gate is disabled, all errors are considered "refused".
numRefused = numReceivedItems
numRefused = failedCount
}
}

Expand Down
Loading