Skip to content

Commit abca3e9

Browse files
committed
receiver: Add partial success support for receivers
Signed-off-by: Sanchit2662 <[email protected]>
1 parent 7c31dd5 commit abca3e9

File tree

4 files changed

+481
-4
lines changed

4 files changed

+481
-4
lines changed

consumer/consumererror/partial.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror"
5+
6+
import "errors"
7+
8+
// PartialError indicates that some items in a batch were processed successfully
9+
// while others failed. It carries the count of failed items so that observability
10+
// infrastructure can accurately report partial success.
11+
//
12+
// This error type is primarily intended for use by receivers that can partially
13+
// process incoming data. For example, a receiver that successfully processes
14+
// 80 out of 100 metrics can return a PartialError with failedCount=20.
15+
//
16+
// The receiverhelper will use this information to correctly compute:
17+
// - accepted items = total received - failed count
18+
// - failed/refused items = failed count (based on error type)
19+
type PartialError struct {
20+
err error
21+
failedCount int
22+
}
23+
24+
// NewPartialError creates an error indicating partial failure.
25+
// failedCount is the number of items that failed to be processed.
26+
// The wrapped error should describe what went wrong with the failed items.
27+
//
28+
// Example usage:
29+
//
30+
// // In a receiver that processes metrics
31+
// if len(translationErrors) > 0 {
32+
// return consumererror.NewPartialError(
33+
// fmt.Errorf("failed to translate %d metrics", len(translationErrors)),
34+
// len(translationErrors),
35+
// )
36+
// }
37+
//
38+
// To indicate that the failure was due to downstream refusal, wrap with NewDownstream:
39+
//
40+
// return consumererror.NewDownstream(
41+
// consumererror.NewPartialError(err, failedCount),
42+
// )
43+
func NewPartialError(err error, failedCount int) error {
44+
if failedCount < 0 {
45+
failedCount = 0
46+
}
47+
return &PartialError{
48+
err: err,
49+
failedCount: failedCount,
50+
}
51+
}
52+
53+
// Error implements the error interface.
54+
func (p *PartialError) Error() string {
55+
return p.err.Error()
56+
}
57+
58+
// Unwrap returns the wrapped error for use with errors.Is and errors.As.
59+
func (p *PartialError) Unwrap() error {
60+
return p.err
61+
}
62+
63+
// FailedCount returns the number of items that failed to be processed.
64+
func (p *PartialError) FailedCount() int {
65+
return p.failedCount
66+
}
67+
68+
// GetPartialError extracts a PartialError from an error chain if present.
69+
// Returns nil if no PartialError is found in the error chain.
70+
//
71+
// This function is typically used by observability infrastructure to
72+
// determine if an error represents a partial failure and extract the
73+
// count of failed items.
74+
func GetPartialError(err error) *PartialError {
75+
var pe *PartialError
76+
if errors.As(err, &pe) {
77+
return pe
78+
}
79+
return nil
80+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package consumererror
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
"testing"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func TestNewPartialError(t *testing.T) {
16+
origErr := errors.New("translation failed")
17+
18+
t.Run("basic", func(t *testing.T) {
19+
err := NewPartialError(origErr, 5)
20+
assert.ErrorIs(t, err, origErr)
21+
assert.Equal(t, "translation failed", err.Error())
22+
23+
pe := GetPartialError(err)
24+
require.NotNil(t, pe)
25+
assert.Equal(t, 5, pe.FailedCount())
26+
})
27+
28+
t.Run("zero_count", func(t *testing.T) {
29+
err := NewPartialError(origErr, 0)
30+
pe := GetPartialError(err)
31+
require.NotNil(t, pe)
32+
assert.Equal(t, 0, pe.FailedCount())
33+
})
34+
35+
t.Run("negative_count_becomes_zero", func(t *testing.T) {
36+
err := NewPartialError(origErr, -1)
37+
pe := GetPartialError(err)
38+
require.NotNil(t, pe)
39+
assert.Equal(t, 0, pe.FailedCount())
40+
})
41+
}
42+
43+
func TestGetPartialError(t *testing.T) {
44+
t.Run("nil_error", func(t *testing.T) {
45+
pe := GetPartialError(nil)
46+
assert.Nil(t, pe)
47+
})
48+
49+
t.Run("non_partial_error", func(t *testing.T) {
50+
err := errors.New("regular error")
51+
pe := GetPartialError(err)
52+
assert.Nil(t, pe)
53+
})
54+
55+
t.Run("wrapped_partial_error", func(t *testing.T) {
56+
origErr := errors.New("inner error")
57+
partialErr := NewPartialError(origErr, 10)
58+
wrappedErr := fmt.Errorf("outer: %w", partialErr)
59+
60+
pe := GetPartialError(wrappedErr)
61+
require.NotNil(t, pe)
62+
assert.Equal(t, 10, pe.FailedCount())
63+
})
64+
}
65+
66+
func TestPartialError_Unwrap(t *testing.T) {
67+
var innerErr error = testErrorType{"testError"}
68+
69+
partialErr := NewPartialError(innerErr, 7)
70+
require.NotNil(t, GetPartialError(partialErr))
71+
72+
// Verify the inner error can be extracted with errors.As
73+
target := testErrorType{}
74+
require.NotEqual(t, innerErr, target)
75+
76+
isWrapped := errors.As(partialErr, &target)
77+
require.True(t, isWrapped)
78+
require.Equal(t, innerErr, target)
79+
}
80+
81+
func TestPartialError_CombinedWithDownstream(t *testing.T) {
82+
origErr := errors.New("queue full")
83+
partialErr := NewPartialError(origErr, 30)
84+
downstreamErr := NewDownstream(partialErr)
85+
86+
// Should be recognized as downstream error
87+
assert.True(t, IsDownstream(downstreamErr))
88+
89+
// Should still be able to extract the partial error
90+
pe := GetPartialError(downstreamErr)
91+
require.NotNil(t, pe)
92+
assert.Equal(t, 30, pe.FailedCount())
93+
94+
// Original error should still be accessible
95+
assert.ErrorIs(t, downstreamErr, origErr)
96+
}
97+
98+
func TestPartialError_CombinedWithPermanent(t *testing.T) {
99+
origErr := errors.New("invalid format")
100+
partialErr := NewPartialError(origErr, 15)
101+
permanentErr := NewPermanent(partialErr)
102+
103+
// Should be recognized as permanent error
104+
assert.True(t, IsPermanent(permanentErr))
105+
106+
// Should still be able to extract the partial error
107+
pe := GetPartialError(permanentErr)
108+
require.NotNil(t, pe)
109+
assert.Equal(t, 15, pe.FailedCount())
110+
111+
// Original error should still be accessible
112+
assert.ErrorIs(t, permanentErr, origErr)
113+
}
114+
115+
func TestPartialError_DownstreamWrappingPartial(t *testing.T) {
116+
// Test the recommended pattern: downstream wrapping partial
117+
origErr := errors.New("pipeline refused")
118+
partialErr := NewPartialError(origErr, 20)
119+
combinedErr := NewDownstream(partialErr)
120+
121+
// All checks should work
122+
assert.True(t, IsDownstream(combinedErr))
123+
pe := GetPartialError(combinedErr)
124+
require.NotNil(t, pe)
125+
assert.Equal(t, 20, pe.FailedCount())
126+
assert.ErrorIs(t, combinedErr, origErr)
127+
}

receiver/receiverhelper/obsreport.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,17 +182,31 @@ func (rec *ObsReport) endOp(
182182
numRefused := 0
183183
numFailedErrors := 0
184184
if err != nil {
185-
numAccepted = 0
185+
// Check if this is a partial error with a specific failed count.
186+
// This allows receivers to report partial success when some items
187+
// were processed successfully while others failed.
188+
failedCount := numReceivedItems // default: all items failed
189+
if pe := consumererror.GetPartialError(err); pe != nil {
190+
failedCount = pe.FailedCount()
191+
// Sanity check: failed count cannot exceed received items
192+
if failedCount > numReceivedItems {
193+
failedCount = numReceivedItems
194+
}
195+
}
196+
197+
// Calculate accepted items (supports partial success)
198+
numAccepted = numReceivedItems - failedCount
199+
186200
// If gate is enabled, we distinguish between refused and failed.
187201
if NewReceiverMetricsGate.IsEnabled() {
188202
if consumererror.IsDownstream(err) {
189-
numRefused = numReceivedItems
203+
numRefused = failedCount
190204
} else {
191-
numFailedErrors = numReceivedItems
205+
numFailedErrors = failedCount
192206
}
193207
} else {
194208
// When the gate is disabled, all errors are considered "refused".
195-
numRefused = numReceivedItems
209+
numRefused = failedCount
196210
}
197211
}
198212

0 commit comments

Comments
 (0)