Skip to content

Commit 4fcaa5b

Browse files
committed
[pkg/stanza] Fix syslog parser operator logging errors at ERROR level when on_error is set to quiet mode
Signed-off-by: Paulo Dias <[email protected]>
1 parent 0a048ee commit 4fcaa5b

File tree

3 files changed

+177
-4
lines changed

3 files changed

+177
-4
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: "bug_fix"
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: "pkg/stanza"
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Fix syslog parser operator logging errors at ERROR level when `on_error` is set to quiet mode"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [42646]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

pkg/stanza/operator/parser/syslog/parser.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error
4747
for _, ent := range entries {
4848
skip, err := p.Skip(ctx, ent)
4949
if err != nil {
50-
errs = append(errs, p.HandleEntryErrorWithWrite(ctx, ent, err, write))
50+
if handleErr := p.HandleEntryErrorWithWrite(ctx, ent, err, write); handleErr != nil {
51+
if p.OnError != helper.DropOnErrorQuiet && p.OnError != helper.SendOnErrorQuiet {
52+
errs = append(errs, handleErr)
53+
}
54+
}
5155
continue
5256
}
5357
if skip {
@@ -61,7 +65,11 @@ func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error
6165
var bytes []byte
6266
bytes, err = toBytes(ent.Body)
6367
if err != nil {
64-
errs = append(errs, p.HandleEntryErrorWithWrite(ctx, ent, err, write))
68+
if handleErr := p.HandleEntryErrorWithWrite(ctx, ent, err, write); handleErr != nil {
69+
if p.OnError != helper.DropOnErrorQuiet && p.OnError != helper.SendOnErrorQuiet {
70+
errs = append(errs, handleErr)
71+
}
72+
}
6573
continue
6674
}
6775
if p.shouldSkipPriorityValues(bytes) {
@@ -77,7 +85,11 @@ func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error
7785
}
7886

7987
if err = callback(ent); err != nil {
80-
errs = append(errs, p.HandleEntryErrorWithWrite(ctx, ent, err, write))
88+
if handleErr := p.HandleEntryErrorWithWrite(ctx, ent, err, write); handleErr != nil {
89+
if p.OnError != helper.DropOnErrorQuiet && p.OnError != helper.SendOnErrorQuiet {
90+
errs = append(errs, handleErr)
91+
}
92+
}
8193
continue
8294
}
8395

@@ -94,7 +106,11 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) error {
94106
if !p.enableOctetCounting && p.allowSkipPriHeader {
95107
bytes, err := toBytes(entry.Body)
96108
if err != nil {
97-
return err
109+
handleErr := p.HandleEntryError(ctx, entry, err)
110+
if p.OnError == helper.DropOnErrorQuiet || p.OnError == helper.SendOnErrorQuiet {
111+
return nil
112+
}
113+
return handleErr
98114
}
99115

100116
if p.shouldSkipPriorityValues(bytes) {

pkg/stanza/operator/parser/syslog/parser_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,133 @@ func TestSyslogParserDoesNotSplitBatches(t *testing.T) {
172172
}))
173173
output.AssertNumberOfCalls(t, "ProcessBatch", 1)
174174
}
175+
176+
func TestSyslogQuietModeProcessBatch(t *testing.T) {
177+
testCases := []struct {
178+
name string
179+
onError string
180+
expectError bool
181+
}{
182+
{
183+
name: "DropOnErrorQuiet_ReturnsNoError",
184+
onError: "drop_quiet",
185+
expectError: false,
186+
},
187+
{
188+
name: "SendOnErrorQuiet_ReturnsNoError",
189+
onError: "send_quiet",
190+
expectError: false,
191+
},
192+
{
193+
name: "DropOnError_ReturnsError",
194+
onError: "drop",
195+
expectError: true,
196+
},
197+
{
198+
name: "SendOnError_ReturnsError",
199+
onError: "send",
200+
expectError: true,
201+
},
202+
}
203+
204+
for _, tc := range testCases {
205+
t.Run(tc.name, func(t *testing.T) {
206+
cfg := basicConfig()
207+
cfg.Protocol = syslog.RFC5424
208+
cfg.OnError = tc.onError
209+
210+
set := componenttest.NewNopTelemetrySettings()
211+
op, err := cfg.Build(set)
212+
require.NoError(t, err)
213+
214+
output := &testutil.Operator{}
215+
output.On("ID").Return("fake")
216+
output.On("CanProcess").Return(true)
217+
output.On("ProcessBatch", mock.Anything, mock.Anything).Return(nil)
218+
require.NoError(t, op.SetOutputs([]operator.Operator{output}))
219+
220+
ctx := t.Context()
221+
222+
// Create entries with invalid syslog messages that will cause parse errors
223+
entry1 := entry.New()
224+
entry1.Body = "invalid syslog message 1"
225+
226+
entry2 := entry.New()
227+
entry2.Body = "invalid syslog message 2"
228+
229+
entry3 := entry.New()
230+
entry3.Body = "invalid syslog message 3"
231+
232+
testEntries := []*entry.Entry{entry1, entry2, entry3}
233+
234+
err = op.ProcessBatch(ctx, testEntries)
235+
if tc.expectError {
236+
require.Error(t, err, "expected error in non-quiet mode")
237+
} else {
238+
require.NoError(t, err, "expected no error in quiet mode")
239+
}
240+
})
241+
}
242+
}
243+
244+
// TestSyslogQuietModeProcess tests quiet mode handling for toBytes error
245+
// in the Process method when allowSkipPriHeader is enabled
246+
func TestSyslogQuietModeProcess(t *testing.T) {
247+
testCases := []struct {
248+
name string
249+
onError string
250+
expectError bool
251+
}{
252+
{
253+
name: "DropOnErrorQuiet_ReturnsNoError",
254+
onError: "drop_quiet",
255+
expectError: false,
256+
},
257+
{
258+
name: "SendOnErrorQuiet_ReturnsNoError",
259+
onError: "send_quiet",
260+
expectError: false,
261+
},
262+
{
263+
name: "DropOnError_ReturnsError",
264+
onError: "drop",
265+
expectError: true,
266+
},
267+
{
268+
name: "SendOnError_ReturnsError",
269+
onError: "send",
270+
expectError: true,
271+
},
272+
}
273+
274+
for _, tc := range testCases {
275+
t.Run(tc.name, func(t *testing.T) {
276+
cfg := basicConfig()
277+
cfg.Protocol = syslog.RFC3164
278+
cfg.AllowSkipPriHeader = true // Enable this to trigger the toBytes path
279+
cfg.OnError = tc.onError
280+
281+
set := componenttest.NewNopTelemetrySettings()
282+
op, err := cfg.Build(set)
283+
require.NoError(t, err)
284+
285+
output := &testutil.Operator{}
286+
output.On("ID").Return("fake")
287+
output.On("CanProcess").Return(true)
288+
output.On("Process", mock.Anything, mock.Anything).Return(nil)
289+
require.NoError(t, op.SetOutputs([]operator.Operator{output}))
290+
291+
// Create entry with non-string body that will cause toBytes error
292+
e := entry.New()
293+
e.Body = 12345 // Non-string body causes toBytes to fail
294+
e.ObservedTimestamp = time.Now()
295+
296+
err = op.Process(t.Context(), e)
297+
if tc.expectError {
298+
require.Error(t, err, "expected error in non-quiet mode")
299+
} else {
300+
require.NoError(t, err, "expected no error in quiet mode")
301+
}
302+
})
303+
}
304+
}

0 commit comments

Comments
 (0)