Skip to content

Commit 5115697

Browse files
feat (pkg/stanza): add sanitize operator to replace invalid UTF-8 cha… (open-telemetry#42029)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description WIth `utf-8-raw` encoding, there may be invalid UTF-8 characters in the final output, which breaks compatibility. So let's add an operator to sanitize invalid UTF-8 characters, then users can use it after recombine operator. For example: ```yaml encoding: utf-8-raw operators: - type: container - type: recombine combine_field: body is_first_entry: body matches "^[0-9]+-[0-9]+-[0-9]+" max_log_size: 128kb combine_with: "\n" source_identifier: attributes["log.file.path"] - type: sanitize # newly operator to sanitize invalid UTF-8 characters. field: body ``` <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#42028 <!--Describe what testing was performed and which tests were added.--> #### Testing Added unit tests, and tested in our k8s cluter. <!--Describe the documentation added.--> #### Documentation Added docs files in `pkg/stanza/docs/operators` <!--Please delete paragraphs that you did not use before submitting.--> --------- Co-authored-by: Andrzej Stencel <[email protected]>
1 parent 6f3747d commit 5115697

File tree

9 files changed

+366
-0
lines changed

9 files changed

+366
-0
lines changed

.chloggen/stanza_sanitize.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/stanza
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: add `sanitize_utf8` operator to replace invalid UTF-8 characters.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [42028]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

pkg/stanza/adapter/register.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,6 @@ import (
3030
_ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/remove"
3131
_ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/retain"
3232
_ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/router"
33+
_ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/sanitizeutf8"
3334
_ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/unquote"
3435
)

pkg/stanza/docs/operators/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,6 @@ General purpose:
4545
- [remove](./remove.md)
4646
- [retain](./retain.md)
4747
- [router](./router.md)
48+
- [sanitize_utf8](./sanitize_utf8.md)
4849
- [unquote](./unquote.md)
4950
- [assign_keys](./assign_keys.md)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
## `sanitize_utf8` Operator
2+
3+
The `sanitize_utf8` operator is used to replace invalid UTF-8 characters in a specified field of a log entry. This is useful
4+
for ensuring that log data is properly encoded and can be processed by downstream systems.
5+
6+
Invalid UTF-8 byte sequences will be replaced with `\uFFFD` (``).
7+
8+
### Configuration Fields
9+
10+
| Field | Default | Description |
11+
|------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
12+
| `id` | `sanitize_utf8` | A unique identifier for the operator. |
13+
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
14+
| `field` | `body` | The [field](../types/field.md) to sanitize. This must be a string field. |
15+
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). |
16+
| `if` | | An [expression](../types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. |
17+
18+
### Example Configurations
19+
20+
#### Simple Configuration for String Body
21+
22+
```yaml
23+
- type: sanitize_utf8
24+
field: body
25+
```
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sanitizeutf8
5+
6+
import (
7+
"path/filepath"
8+
"testing"
9+
10+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
11+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest"
12+
)
13+
14+
func TestUnmarshal(t *testing.T) {
15+
operatortest.ConfigUnmarshalTests{
16+
DefaultConfig: NewConfig(),
17+
TestsFile: filepath.Join(".", "testdata", "config.yaml"),
18+
Tests: []operatortest.ConfigUnmarshalTest{
19+
{
20+
Name: "default",
21+
ExpectUnmarshalErr: false,
22+
Expect: NewConfig(),
23+
},
24+
{
25+
Name: "sanitize_utf8_with_field",
26+
ExpectUnmarshalErr: false,
27+
Expect: func() *Config {
28+
cfg := NewConfig()
29+
cfg.Field = entry.NewBodyField()
30+
return cfg
31+
}(),
32+
},
33+
{
34+
Name: "sanitize_utf8_with_invalid_field",
35+
ExpectUnmarshalErr: true,
36+
},
37+
},
38+
}.Run(t)
39+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sanitizeutf8
5+
6+
import (
7+
"testing"
8+
9+
"go.uber.org/goleak"
10+
)
11+
12+
func TestMain(m *testing.M) {
13+
goleak.VerifyTestMain(m)
14+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
default:
2+
type: sanitize_utf8
3+
sanitize_utf8_with_field:
4+
type: sanitize_utf8
5+
field: body
6+
sanitize_utf8_with_invalid_field:
7+
type: sanitize_utf8
8+
field: invalid_field
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sanitizeutf8 // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/sanitizeutf8"
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"strings"
10+
"unicode/utf8"
11+
12+
"go.opentelemetry.io/collector/component"
13+
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
17+
)
18+
19+
const (
20+
operatorType = "sanitize_utf8"
21+
invalidCharacterMarker = string(utf8.RuneError) // Unicode replacement character
22+
)
23+
24+
func init() {
25+
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
26+
}
27+
28+
// NewConfig creates a new sanitize_utf8 config with default values
29+
func NewConfig() *Config {
30+
return NewConfigWithID(operatorType)
31+
}
32+
33+
// NewConfigWithID creates a new sanitize_utf8 config with default values
34+
func NewConfigWithID(operatorID string) *Config {
35+
return &Config{
36+
TransformerConfig: helper.NewTransformerConfig(operatorID, operatorType),
37+
Field: entry.NewBodyField(),
38+
}
39+
}
40+
41+
// Config is the configuration of a sanitize_utf8 operator
42+
type Config struct {
43+
helper.TransformerConfig `mapstructure:",squash"`
44+
Field entry.Field `mapstructure:"field"`
45+
}
46+
47+
// Build creates a new Transformer from a config
48+
func (c *Config) Build(set component.TelemetrySettings) (operator.Operator, error) {
49+
transformer, err := c.TransformerConfig.Build(set)
50+
if err != nil {
51+
return nil, fmt.Errorf("failed to build transformer config: %w", err)
52+
}
53+
54+
return &Transformer{
55+
TransformerOperator: transformer,
56+
field: c.Field,
57+
}, nil
58+
}
59+
60+
// Transformer is an operator that replaces invalid UTF-8 characters in a field
61+
type Transformer struct {
62+
helper.TransformerOperator
63+
field entry.Field
64+
}
65+
66+
func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
67+
return t.ProcessBatchWithTransform(ctx, entries, t.ProcessWith)
68+
}
69+
70+
func (t *Transformer) Process(ctx context.Context, e *entry.Entry) error {
71+
return t.TransformerOperator.ProcessWith(ctx, e, t.ProcessWith)
72+
}
73+
74+
func (t *Transformer) ProcessWith(e *entry.Entry) error {
75+
v, ok := t.field.Get(e)
76+
if !ok {
77+
return nil
78+
}
79+
s, ok := v.(string)
80+
if !ok {
81+
return fmt.Errorf("field '%s' is not a string, got %T", t.field.String(), v)
82+
}
83+
if utf8.ValidString(s) {
84+
return nil
85+
}
86+
if err := t.field.Set(e, strings.ToValidUTF8(s, invalidCharacterMarker)); err != nil {
87+
return fmt.Errorf("failed to set field %s: %w", t.field.String(), err)
88+
}
89+
return nil
90+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sanitizeutf8
5+
6+
import (
7+
"fmt"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/collector/component/componenttest"
13+
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
17+
)
18+
19+
type processTestCase struct {
20+
name string
21+
op *Config
22+
input func() *entry.Entry
23+
output func() *entry.Entry
24+
}
25+
26+
func TestSanitizeUTF8(t *testing.T) {
27+
now := time.Now()
28+
cases := []processTestCase{
29+
{
30+
name: "normal_string_body",
31+
op: newCfgWithField("body"),
32+
input: func() *entry.Entry {
33+
e := entry.New()
34+
e.Body = "This is a normal string"
35+
e.ObservedTimestamp = now
36+
return e
37+
},
38+
output: func() *entry.Entry {
39+
e := entry.New()
40+
e.Body = "This is a normal string"
41+
e.ObservedTimestamp = now
42+
return e
43+
},
44+
},
45+
{
46+
name: "invalid_utf8_body",
47+
op: newCfgWithField("body"),
48+
input: func() *entry.Entry {
49+
e := entry.New()
50+
e.Body = "This is an invalid utf8 string \xfe"
51+
e.ObservedTimestamp = now
52+
return e
53+
},
54+
output: func() *entry.Entry {
55+
e := entry.New()
56+
e.Body = "This is an invalid utf8 string \uFFFD"
57+
e.ObservedTimestamp = now
58+
return e
59+
},
60+
},
61+
{
62+
name: "consecutive_invalid_utf8_body",
63+
op: newCfgWithField("body"),
64+
input: func() *entry.Entry {
65+
e := entry.New()
66+
e.Body = "This is an invalid utf8 string \xfe\xfe"
67+
e.ObservedTimestamp = now
68+
return e
69+
},
70+
output: func() *entry.Entry {
71+
e := entry.New()
72+
e.Body = "This is an invalid utf8 string \uFFFD"
73+
e.ObservedTimestamp = now
74+
return e
75+
},
76+
},
77+
{
78+
name: "unconsecutive_invalid_utf8_body",
79+
op: newCfgWithField("body"),
80+
input: func() *entry.Entry {
81+
e := entry.New()
82+
e.Body = "This is an invalid utf8 string \xfe and another \xfe"
83+
e.ObservedTimestamp = now
84+
return e
85+
},
86+
output: func() *entry.Entry {
87+
e := entry.New()
88+
e.Body = "This is an invalid utf8 string \uFFFD and another \uFFFD"
89+
e.ObservedTimestamp = now
90+
return e
91+
},
92+
},
93+
{
94+
name: "normal_string_attribute",
95+
op: newCfgWithField("attributes.foo"),
96+
input: func() *entry.Entry {
97+
e := entry.New()
98+
e.Attributes = map[string]any{
99+
"foo": "This is a normal string",
100+
}
101+
e.ObservedTimestamp = now
102+
return e
103+
},
104+
output: func() *entry.Entry {
105+
e := entry.New()
106+
e.Attributes = map[string]any{
107+
"foo": "This is a normal string",
108+
}
109+
e.ObservedTimestamp = now
110+
return e
111+
},
112+
},
113+
{
114+
name: "invalid_utf8_attribute",
115+
op: newCfgWithField("attributes.foo"),
116+
input: func() *entry.Entry {
117+
e := entry.New()
118+
e.Attributes = map[string]any{
119+
"foo": "This is an invalid utf8 string \xfe",
120+
}
121+
e.ObservedTimestamp = now
122+
return e
123+
},
124+
output: func() *entry.Entry {
125+
e := entry.New()
126+
e.Attributes = map[string]any{
127+
"foo": "This is an invalid utf8 string \uFFFD",
128+
}
129+
e.ObservedTimestamp = now
130+
return e
131+
},
132+
},
133+
}
134+
for _, tc := range cases {
135+
t.Run("BuildandProcess/"+tc.name, func(t *testing.T) {
136+
cfg := tc.op
137+
cfg.OutputIDs = []string{"fake"}
138+
cfg.OnError = "drop"
139+
op, err := cfg.Build(componenttest.NewNopTelemetrySettings())
140+
require.NoError(t, err)
141+
142+
sanitizeUTF8 := op.(*Transformer)
143+
fake := testutil.NewFakeOutput(t)
144+
require.NoError(t, sanitizeUTF8.SetOutputs([]operator.Operator{fake}))
145+
val := tc.input()
146+
err = sanitizeUTF8.Process(t.Context(), val)
147+
require.NoError(t, err)
148+
fake.ExpectEntry(t, tc.output())
149+
})
150+
}
151+
}
152+
153+
func newCfgWithField(field string) *Config {
154+
entryField, err := entry.NewField(field)
155+
if err != nil {
156+
panic(fmt.Sprintf("failed to create field %s: %v", field, err))
157+
}
158+
config := NewConfigWithID("sanitize_utf8/test")
159+
config.Field = entryField
160+
return config
161+
}

0 commit comments

Comments
 (0)