Skip to content

Commit f5acb3b

Browse files
[pkg/stanza] Fix transformer operators logging errors at ERROR level when on_error is set to quiet mode (#45366)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Fix transformer operators logging errors at ERROR level when `on_error` is set to quiet mode (`send_quiet` or `drop_quiet`) <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #42646 <!--Describe what testing was performed and which tests were added.--> #### Testing Updated unit tests and added tests to all operators, which should support `on_error` configuration. --------- Signed-off-by: Paulo Dias <paulodias.gm@gmail.com> Co-authored-by: Andrzej Stencel <andrzej.stencel@elastic.co>
1 parent da14a0e commit f5acb3b

File tree

3 files changed

+325
-5
lines changed

3 files changed

+325
-5
lines changed

.chloggen/fix_42646.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: "bug_fix"
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: "pkg/stanza"
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Fix transformer operators logging errors at ERROR level when `on_error` is set to quiet mode"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [42646]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

pkg/stanza/operator/helper/transformer.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,12 @@ func (t *TransformerOperator) ProcessBatchWithTransform(ctx context.Context, ent
106106
}
107107

108108
if err = transform(ent); err != nil {
109-
errs = append(errs, t.HandleEntryErrorWithWrite(ctx, ent, err, write))
109+
if handleErr := t.HandleEntryErrorWithWrite(ctx, ent, err, write); handleErr != nil {
110+
// Only append error if not in quiet mode
111+
if !t.isQuietMode() {
112+
errs = append(errs, handleErr)
113+
}
114+
}
110115
continue
111116
}
112117

@@ -130,7 +135,12 @@ func (t *TransformerOperator) ProcessWith(ctx context.Context, entry *entry.Entr
130135
}
131136

132137
if err := transform(entry); err != nil {
133-
return t.HandleEntryError(ctx, entry, err)
138+
handleErr := t.HandleEntryError(ctx, entry, err)
139+
// Return nil for quiet modes to prevent error from bubbling up
140+
if t.isQuietMode() {
141+
return nil
142+
}
143+
return handleErr
134144
}
135145
return t.Write(ctx, entry)
136146
}
@@ -145,7 +155,7 @@ func (t *TransformerOperator) HandleEntryErrorWithWrite(ctx context.Context, ent
145155
return errors.New("got a nil entry, this should not happen and is potentially a bug")
146156
}
147157

148-
if t.OnError == SendOnErrorQuiet || t.OnError == DropOnErrorQuiet {
158+
if t.isQuietMode() {
149159
// No need to construct the zap attributes if logging not enabled at debug level.
150160
if t.Logger().Core().Enabled(zapcore.DebugLevel) {
151161
t.Logger().Debug("Failed to process entry", zapAttributes(entry, t.OnError, err)...)
@@ -162,6 +172,11 @@ func (t *TransformerOperator) HandleEntryErrorWithWrite(ctx context.Context, ent
162172
return err
163173
}
164174

175+
// isQuietMode returns true if the operator is configured to use quiet mode
176+
func (t *TransformerOperator) isQuietMode() bool {
177+
return t.OnError == DropOnErrorQuiet || t.OnError == SendOnErrorQuiet
178+
}
179+
165180
func (t *TransformerOperator) Skip(_ context.Context, entry *entry.Entry) (bool, error) {
166181
if t.IfExpr == nil {
167182
return false, nil

pkg/stanza/operator/helper/transformer_test.go

Lines changed: 280 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func TestTransformerDropOnErrorQuiet(t *testing.T) {
151151
}
152152

153153
err := transformer.ProcessWith(ctx, testEntry, transform)
154-
require.Error(t, err)
154+
require.NoError(t, err)
155155
output.AssertNotCalled(t, "Process", mock.Anything, mock.Anything)
156156

157157
// Test output logs
@@ -251,7 +251,7 @@ func TestTransformerSendOnErrorQuiet(t *testing.T) {
251251
}
252252

253253
err := transformer.ProcessWith(ctx, testEntry, transform)
254-
require.Error(t, err)
254+
require.NoError(t, err)
255255
output.AssertCalled(t, "Process", mock.Anything, mock.Anything)
256256

257257
// Test output logs
@@ -380,6 +380,284 @@ func TestTransformerDoesNotSplitBatches(t *testing.T) {
380380
output.AssertNumberOfCalls(t, "ProcessBatch", 1)
381381
}
382382

383+
// TestBatchProcessingAllEntriesFailQuietMode tests that when all entries fail in a batch with quiet mode,
384+
// no errors are returned and entries are handled according to the quiet mode (drop or send).
385+
func TestBatchProcessingAllEntriesFailQuietMode(t *testing.T) {
386+
testCases := []struct {
387+
name string
388+
onError string
389+
expectProcessed bool
390+
expectError bool
391+
expectedLogLevel zapcore.Level
392+
}{
393+
{
394+
name: "DropOnErrorQuiet",
395+
onError: DropOnErrorQuiet,
396+
expectProcessed: false,
397+
expectError: false,
398+
expectedLogLevel: zapcore.DebugLevel,
399+
},
400+
{
401+
name: "SendOnErrorQuiet",
402+
onError: SendOnErrorQuiet,
403+
expectProcessed: true,
404+
expectError: false,
405+
expectedLogLevel: zapcore.DebugLevel,
406+
},
407+
}
408+
409+
for _, tc := range testCases {
410+
t.Run(tc.name, func(t *testing.T) {
411+
output := &testutil.Operator{}
412+
output.On("ID").Return("test-output")
413+
output.On("ProcessBatch", mock.Anything, mock.Anything).Return(nil)
414+
415+
obs, logs := observer.New(zapcore.DebugLevel)
416+
set := componenttest.NewNopTelemetrySettings()
417+
set.Logger = zap.New(obs)
418+
419+
transformer := TransformerOperator{
420+
OnError: tc.onError,
421+
WriterOperator: WriterOperator{
422+
BasicOperator: BasicOperator{
423+
OperatorID: "test-id",
424+
OperatorType: "test-type",
425+
set: set,
426+
},
427+
OutputOperators: []operator.Operator{output},
428+
OutputIDs: []string{"test-output"},
429+
},
430+
}
431+
432+
ctx := t.Context()
433+
testEntry1 := entry.New()
434+
testEntry2 := entry.New()
435+
testEntry3 := entry.New()
436+
testEntries := []*entry.Entry{testEntry1, testEntry2, testEntry3}
437+
438+
// Transform function that always fails
439+
transform := func(_ *entry.Entry) error {
440+
return errors.New("transform failure")
441+
}
442+
443+
err := transformer.ProcessBatchWithTransform(ctx, testEntries, transform)
444+
445+
if tc.expectError {
446+
require.Error(t, err)
447+
} else {
448+
require.NoError(t, err, "expected no error in quiet mode")
449+
}
450+
451+
// Verify that logs were created at the expected level
452+
require.Equal(t, 3, logs.Len(), "expected 3 log entries for 3 failed entries")
453+
for _, logEntry := range logs.All() {
454+
require.Equal(t, tc.expectedLogLevel, logEntry.Level, "expected log level to be %v", tc.expectedLogLevel)
455+
}
456+
457+
// Verify ProcessBatch was called
458+
output.AssertCalled(t, "ProcessBatch", mock.Anything, mock.Anything)
459+
460+
// Check if entries were sent or dropped
461+
calls := output.Calls
462+
if len(calls) > 0 {
463+
processedEntries := calls[0].Arguments.Get(1).([]*entry.Entry)
464+
if tc.expectProcessed {
465+
require.Len(t, processedEntries, 3, "expected all 3 entries to be sent in SendOnErrorQuiet mode")
466+
} else {
467+
require.Empty(t, processedEntries, "expected 0 entries to be sent in DropOnErrorQuiet mode")
468+
}
469+
}
470+
})
471+
}
472+
}
473+
474+
// TestBatchProcessingMixedSuccessFailureQuietMode tests batch processing with a mix of successful and failed entries in quiet mode.
475+
func TestBatchProcessingMixedSuccessFailureQuietMode(t *testing.T) {
476+
testCases := []struct {
477+
name string
478+
onError string
479+
expectError bool
480+
expectedProcessedCount int
481+
expectedLogLevel zapcore.Level
482+
}{
483+
{
484+
name: "DropOnErrorQuiet",
485+
onError: DropOnErrorQuiet,
486+
expectError: false,
487+
expectedProcessedCount: 2, // Only successful entries
488+
expectedLogLevel: zapcore.DebugLevel,
489+
},
490+
{
491+
name: "SendOnErrorQuiet",
492+
onError: SendOnErrorQuiet,
493+
expectError: false,
494+
expectedProcessedCount: 4, // All entries (2 successful + 2 failed)
495+
expectedLogLevel: zapcore.DebugLevel,
496+
},
497+
}
498+
499+
for _, tc := range testCases {
500+
t.Run(tc.name, func(t *testing.T) {
501+
output := &testutil.Operator{}
502+
output.On("ID").Return("test-output")
503+
output.On("ProcessBatch", mock.Anything, mock.Anything).Return(nil)
504+
505+
obs, logs := observer.New(zapcore.DebugLevel)
506+
set := componenttest.NewNopTelemetrySettings()
507+
set.Logger = zap.New(obs)
508+
509+
transformer := TransformerOperator{
510+
OnError: tc.onError,
511+
WriterOperator: WriterOperator{
512+
BasicOperator: BasicOperator{
513+
OperatorID: "test-id",
514+
OperatorType: "test-type",
515+
set: set,
516+
},
517+
OutputOperators: []operator.Operator{output},
518+
OutputIDs: []string{"test-output"},
519+
},
520+
}
521+
522+
ctx := t.Context()
523+
testEntry1 := entry.New()
524+
testEntry1.Body = "success1"
525+
testEntry2 := entry.New()
526+
testEntry2.Body = "fail1"
527+
testEntry3 := entry.New()
528+
testEntry3.Body = "success2"
529+
testEntry4 := entry.New()
530+
testEntry4.Body = "fail2"
531+
testEntries := []*entry.Entry{testEntry1, testEntry2, testEntry3, testEntry4}
532+
533+
// Transform function that fails for entries with "fail" in body
534+
transform := func(e *entry.Entry) error {
535+
if body, ok := e.Body.(string); ok && body[:4] == "fail" {
536+
return errors.New("transform failure")
537+
}
538+
return nil
539+
}
540+
541+
err := transformer.ProcessBatchWithTransform(ctx, testEntries, transform)
542+
543+
if tc.expectError {
544+
require.Error(t, err)
545+
} else {
546+
require.NoError(t, err, "expected no error in quiet mode")
547+
}
548+
549+
// Verify that logs were created for failed entries only
550+
require.Equal(t, 2, logs.Len(), "expected 2 log entries for 2 failed entries")
551+
for _, logEntry := range logs.All() {
552+
require.Equal(t, tc.expectedLogLevel, logEntry.Level, "expected log level to be %v", tc.expectedLogLevel)
553+
}
554+
555+
// Verify ProcessBatch was called
556+
output.AssertCalled(t, "ProcessBatch", mock.Anything, mock.Anything)
557+
558+
// Check the number of processed entries
559+
calls := output.Calls
560+
if len(calls) > 0 {
561+
processedEntries := calls[0].Arguments.Get(1).([]*entry.Entry)
562+
require.Len(t, processedEntries, tc.expectedProcessedCount,
563+
"expected %d entries to be processed", tc.expectedProcessedCount)
564+
}
565+
})
566+
}
567+
}
568+
569+
// TestBatchProcessingAllEntriesFailNonQuietMode tests that errors are properly returned in non-quiet mode.
570+
func TestBatchProcessingAllEntriesFailNonQuietMode(t *testing.T) {
571+
testCases := []struct {
572+
name string
573+
onError string
574+
expectProcessed bool
575+
expectError bool
576+
expectedLogLevel zapcore.Level
577+
}{
578+
{
579+
name: "DropOnError",
580+
onError: DropOnError,
581+
expectProcessed: false,
582+
expectError: true,
583+
expectedLogLevel: zapcore.ErrorLevel,
584+
},
585+
{
586+
name: "SendOnError",
587+
onError: SendOnError,
588+
expectProcessed: true,
589+
expectError: true,
590+
expectedLogLevel: zapcore.ErrorLevel,
591+
},
592+
}
593+
594+
for _, tc := range testCases {
595+
t.Run(tc.name, func(t *testing.T) {
596+
output := &testutil.Operator{}
597+
output.On("ID").Return("test-output")
598+
output.On("ProcessBatch", mock.Anything, mock.Anything).Return(nil)
599+
600+
obs, logs := observer.New(zapcore.WarnLevel)
601+
set := componenttest.NewNopTelemetrySettings()
602+
set.Logger = zap.New(obs)
603+
604+
transformer := TransformerOperator{
605+
OnError: tc.onError,
606+
WriterOperator: WriterOperator{
607+
BasicOperator: BasicOperator{
608+
OperatorID: "test-id",
609+
OperatorType: "test-type",
610+
set: set,
611+
},
612+
OutputOperators: []operator.Operator{output},
613+
OutputIDs: []string{"test-output"},
614+
},
615+
}
616+
617+
ctx := t.Context()
618+
testEntry1 := entry.New()
619+
testEntry2 := entry.New()
620+
testEntry3 := entry.New()
621+
testEntries := []*entry.Entry{testEntry1, testEntry2, testEntry3}
622+
623+
// Transform function that always fails
624+
transform := func(_ *entry.Entry) error {
625+
return errors.New("transform failure")
626+
}
627+
628+
err := transformer.ProcessBatchWithTransform(ctx, testEntries, transform)
629+
630+
if tc.expectError {
631+
require.Error(t, err, "expected error in non-quiet mode")
632+
// Verify we get multiple errors joined together
633+
require.Contains(t, err.Error(), "transform failure")
634+
} else {
635+
require.NoError(t, err)
636+
}
637+
638+
// Verify that logs were created at the expected level
639+
require.Equal(t, 3, logs.Len(), "expected 3 log entries for 3 failed entries")
640+
for _, logEntry := range logs.All() {
641+
require.Equal(t, tc.expectedLogLevel, logEntry.Level)
642+
}
643+
644+
// Verify ProcessBatch was called
645+
output.AssertCalled(t, "ProcessBatch", mock.Anything, mock.Anything)
646+
647+
// Check if entries were sent or dropped
648+
calls := output.Calls
649+
if len(calls) > 0 {
650+
processedEntries := calls[0].Arguments.Get(1).([]*entry.Entry)
651+
if tc.expectProcessed {
652+
require.Len(t, processedEntries, 3, "expected all 3 entries to be sent in SendOnError mode")
653+
} else {
654+
require.Empty(t, processedEntries, "expected 0 entries to be sent in DropOnError mode")
655+
}
656+
}
657+
})
658+
}
659+
}
660+
383661
func TestTransformerIf(t *testing.T) {
384662
cases := []struct {
385663
name string

0 commit comments

Comments
 (0)