Skip to content

Commit fcefb9b

Browse files
[pkg/stanza] Ensure filter operator does not split batches of entries (#45161)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Updates the `filter` operator to process batches of entries without splitting them. The parser now writes all entries as a single batch, rather than processing them individually. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #42391 <!--Describe what testing was performed and which tests were added.--> #### Testing Added unit tests to ensure the batches are not split. --------- Signed-off-by: Paulo Dias <paulodias.gm@gmail.com> Co-authored-by: Andrzej Stencel <andrzej.stencel@elastic.co>
1 parent c95ef7e commit fcefb9b

4 files changed

Lines changed: 106 additions & 6 deletions

File tree

.chloggen/fix_42391.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: "enhancement"
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: "pkg/stanza"
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Ensure filter operator does not split batches of entries"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [42391]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

pkg/stanza/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ Operators that support batching:
9393
- `assign_keys`
9494
- `container`
9595
- `copy`
96+
- `filter`
9697
- `flatten`
9798
- `json_array_parser`
9899
- `json_parser`
@@ -114,7 +115,6 @@ Operators that support batching:
114115
Operators that do not support batching:
115116

116117
- `csv_parser`
117-
- `filter`
118118
- `router`
119119

120120
### FAQ

pkg/stanza/operator/transformer/filter/transformer.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package filter // import "github.com/open-telemetry/opentelemetry-collector-cont
66
import (
77
"context"
88
"crypto/rand"
9+
"errors"
910
"math/big"
1011

1112
"github.com/expr-lang/expr/vm"
@@ -23,7 +24,42 @@ type Transformer struct {
2324
}
2425

2526
func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
26-
return t.ProcessBatchWith(ctx, entries, t.Process)
27+
filteredEntries := make([]*entry.Entry, 0, len(entries))
28+
var errs []error
29+
for _, ent := range entries {
30+
env := helper.GetExprEnv(ent)
31+
matches, err := vm.Run(t.expression, env)
32+
helper.PutExprEnv(env)
33+
34+
if err != nil {
35+
t.Logger().Error("Running expressing returned an error", zap.Error(err))
36+
continue
37+
}
38+
39+
filtered, ok := matches.(bool)
40+
if !ok {
41+
t.Logger().Error("Expression did not compile as a boolean")
42+
continue
43+
}
44+
45+
if !filtered {
46+
filteredEntries = append(filteredEntries, ent)
47+
continue
48+
}
49+
50+
i, err := randInt(rand.Reader, upperBound)
51+
if err != nil {
52+
errs = append(errs, err)
53+
continue
54+
}
55+
56+
if i.Cmp(t.dropCutoff) >= 0 {
57+
filteredEntries = append(filteredEntries, ent)
58+
}
59+
}
60+
61+
errs = append(errs, t.WriteBatch(ctx, filteredEntries))
62+
return errors.Join(errs...)
2763
}
2864

2965
// Process will drop incoming entries that match the filter expression

pkg/stanza/operator/transformer/filter/transformer_test.go

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,11 @@ func TestTransformer(t *testing.T) {
185185

186186
filtered := true
187187
mockOutput := testutil.NewMockOperator("output")
188-
mockOutput.On("Process", mock.Anything, mock.Anything).Return(nil).Run(func(_ mock.Arguments) {
189-
filtered = false
188+
mockOutput.On("ProcessBatch", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
189+
entries := args.Get(1).([]*entry.Entry)
190+
if len(entries) > 0 {
191+
filtered = false
192+
}
190193
})
191194

192195
op.(*Transformer).OutputOperators = []operator.Operator{mockOutput}
@@ -208,8 +211,9 @@ func TestFilterDropRatio(t *testing.T) {
208211

209212
processedEntries := 0
210213
mockOutput := testutil.NewMockOperator("output")
211-
mockOutput.On("Process", mock.Anything, mock.Anything).Return(nil).Run(func(_ mock.Arguments) {
212-
processedEntries++
214+
mockOutput.On("ProcessBatch", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
215+
entries := args.Get(1).([]*entry.Entry)
216+
processedEntries += len(entries)
213217
})
214218

215219
filterOperator, ok := op.(*Transformer)
@@ -243,3 +247,36 @@ func TestFilterDropRatio(t *testing.T) {
243247

244248
require.Equal(t, 10, processedEntries)
245249
}
250+
251+
func TestFilterDoesNotSplitBatches(t *testing.T) {
252+
cfg := NewConfigWithID("test")
253+
cfg.Expression = `body.message == "drop_me"`
254+
set := componenttest.NewNopTelemetrySettings()
255+
op, err := cfg.Build(set)
256+
require.NoError(t, err)
257+
258+
mockOutput := testutil.NewMockOperator("output")
259+
mockOutput.On("ProcessBatch", mock.Anything, mock.Anything).Return(nil)
260+
261+
filterOperator := op.(*Transformer)
262+
filterOperator.OutputOperators = []operator.Operator{mockOutput}
263+
264+
testEntries := []*entry.Entry{
265+
{Body: map[string]any{"message": "keep_me"}},
266+
{Body: map[string]any{"message": "keep_me_too"}},
267+
{Body: map[string]any{"message": "drop_me"}},
268+
{Body: map[string]any{"message": "also_keep"}},
269+
}
270+
271+
err = filterOperator.ProcessBatch(t.Context(), testEntries)
272+
require.NoError(t, err)
273+
274+
// Verify that ProcessBatch was called exactly once, proving the batch was not split
275+
mockOutput.AssertNumberOfCalls(t, "ProcessBatch", 1)
276+
277+
// Verify that 3 entries were passed (one was filtered out)
278+
calls := mockOutput.Calls
279+
require.Len(t, calls, 1)
280+
passedEntries := calls[0].Arguments.Get(1).([]*entry.Entry)
281+
require.Len(t, passedEntries, 3)
282+
}

0 commit comments

Comments
 (0)