Skip to content

Commit 8ea7acc

Browse files
authored
[loadgenreceiver] Add possibility to not limit signal size (#770)
* [loadgenreceiver] Add possibility to not limit signal size * Add license * max size -> max buffer size
1 parent 6dc7f87 commit 8ea7acc

File tree

12 files changed

+297
-23
lines changed

12 files changed

+297
-23
lines changed

receiver/loadgenreceiver/config.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"go.opentelemetry.io/collector/component"
2424
)
2525

26+
const maxScannerBufSize = 1024 * 1024
27+
2628
type (
2729
JsonlFile string
2830
)
@@ -46,16 +48,25 @@ type Config struct {
4648
DisablePdataReuse bool `mapstructure:"disable_pdata_reuse"`
4749
}
4850

49-
type MetricsConfig struct {
50-
// JsonlFile is an optional configuration option to specify the path to
51-
// get the base generated signals from.
52-
JsonlFile `mapstructure:"jsonl_file"`
53-
51+
type SignalConfig struct {
5452
// MaxReplay is an optional configuration to specify the number of times the file is replayed.
5553
MaxReplay int `mapstructure:"max_replay"`
54+
55+
// MaxBufferSize defines the maximum acceptable size for the file content. Set to 0 if you don't want
56+
// to set a limit.
57+
MaxBufferSize int `mapstructure:"max_buffer_size"`
58+
5659
// doneCh is only non-nil when the receiver is created with NewFactoryWithDone.
5760
// It is to notify the caller of collector that receiver finished replaying the file for MaxReplay number of times.
5861
doneCh chan Stats
62+
}
63+
64+
type MetricsConfig struct {
65+
// JsonlFile is an optional configuration option to specify the path to
66+
// get the base generated signals from.
67+
JsonlFile `mapstructure:"jsonl_file"`
68+
69+
SignalConfig `mapstructure:",squash"`
5970

6071
// AddCounterAttr, if true, adds a loadgenreceiver_counter resource attribute containing increasing counter value to the generated metrics.
6172
// It can be used to workaround timestamp precision and duplication detection of backends,
@@ -70,23 +81,15 @@ type LogsConfig struct {
7081
// get the base generated signals from.
7182
JsonlFile `mapstructure:"jsonl_file"`
7283

73-
// MaxReplay is an optional configuration to specify the number of times the file is replayed.
74-
MaxReplay int `mapstructure:"max_replay"`
75-
// doneCh is only non-nil when the receiver is created with NewFactoryWithDone.
76-
// It is to notify the caller of collector that receiver finished replaying the file for MaxReplay number of times.
77-
doneCh chan Stats
84+
SignalConfig `mapstructure:",squash"`
7885
}
7986

8087
type TracesConfig struct {
8188
// JsonlFile is an optional configuration option to specify the path to
8289
// get the base generated signals from.
8390
JsonlFile `mapstructure:"jsonl_file"`
8491

85-
// MaxReplay is an optional configuration to specify the number of times the file is replayed.
86-
MaxReplay int `mapstructure:"max_replay"`
87-
// doneCh is only non-nil when the receiver is created with NewFactoryWithDone.
88-
// It is to notify the caller of collector that receiver finished replaying the file for MaxReplay number of times.
89-
doneCh chan Stats
92+
SignalConfig `mapstructure:",squash"`
9093
}
9194

9295
var _ component.Config = (*Config)(nil)
@@ -102,5 +105,14 @@ func (cfg *Config) Validate() error {
102105
if cfg.Traces.MaxReplay < 0 {
103106
return fmt.Errorf("traces::max_replay must be >= 0")
104107
}
108+
if cfg.Logs.MaxBufferSize < 0 {
109+
return fmt.Errorf("logs::max_buffer_size must be >= 0")
110+
}
111+
if cfg.Metrics.MaxBufferSize < 0 {
112+
return fmt.Errorf("metrics::max_buffer_size must be >= 0")
113+
}
114+
if cfg.Traces.MaxBufferSize < 0 {
115+
return fmt.Errorf("traces::max_buffer_size must be >= 0")
116+
}
105117
return nil
106118
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package loadgenreceiver
19+
20+
import (
21+
"path/filepath"
22+
"testing"
23+
24+
"github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal/metadata"
25+
"github.com/stretchr/testify/assert"
26+
"go.opentelemetry.io/collector/component"
27+
"go.opentelemetry.io/collector/confmap/confmaptest"
28+
"go.opentelemetry.io/collector/confmap/xconfmap"
29+
)
30+
31+
func TestLoadConfig(t *testing.T) {
32+
tests := []struct {
33+
id component.ID
34+
expected component.Config
35+
expectedErrMessage string
36+
}{
37+
{
38+
id: component.NewID(metadata.Type),
39+
expected: &Config{
40+
Metrics: MetricsConfig{
41+
SignalConfig: SignalConfig{
42+
MaxBufferSize: maxScannerBufSize,
43+
},
44+
AddCounterAttr: true,
45+
},
46+
Logs: LogsConfig{
47+
SignalConfig: SignalConfig{
48+
MaxBufferSize: maxScannerBufSize,
49+
},
50+
},
51+
Traces: TracesConfig{
52+
SignalConfig: SignalConfig{
53+
MaxBufferSize: maxScannerBufSize,
54+
},
55+
},
56+
Concurrency: 1,
57+
},
58+
},
59+
{
60+
id: component.NewIDWithName(metadata.Type, "logs_invalid_max_replay"),
61+
expectedErrMessage: "logs::max_replay must be >= 0",
62+
},
63+
{
64+
id: component.NewIDWithName(metadata.Type, "metrics_invalid_max_replay"),
65+
expectedErrMessage: "metrics::max_replay must be >= 0",
66+
},
67+
{
68+
id: component.NewIDWithName(metadata.Type, "traces_invalid_max_replay"),
69+
expectedErrMessage: "traces::max_replay must be >= 0",
70+
},
71+
{
72+
id: component.NewIDWithName(metadata.Type, "logs_invalid_max_buffer_size"),
73+
expectedErrMessage: "logs::max_buffer_size must be >= 0",
74+
},
75+
{
76+
id: component.NewIDWithName(metadata.Type, "metrics_invalid_max_buffer_size"),
77+
expectedErrMessage: "metrics::max_buffer_size must be >= 0",
78+
},
79+
{
80+
id: component.NewIDWithName(metadata.Type, "traces_invalid_max_buffer_size"),
81+
expectedErrMessage: "traces::max_buffer_size must be >= 0",
82+
},
83+
}
84+
for _, tt := range tests {
85+
t.Run(tt.id.String(), func(t *testing.T) {
86+
t.Parallel()
87+
88+
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
89+
assert.NoError(t, err)
90+
91+
factory := NewFactory()
92+
cfg := factory.CreateDefaultConfig()
93+
sub, err := cm.Sub(tt.id.String())
94+
assert.NoError(t, err)
95+
assert.NoError(t, sub.Unmarshal(cfg))
96+
97+
err = xconfmap.Validate(cfg)
98+
if tt.expectedErrMessage != "" {
99+
assert.EqualError(t, err, tt.expectedErrMessage)
100+
return
101+
}
102+
103+
assert.NoError(t, err)
104+
assert.Equal(t, tt.expected, cfg)
105+
})
106+
}
107+
108+
}

receiver/loadgenreceiver/factory.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,23 @@ func NewFactory() receiver.Factory {
3131
func createDefaultReceiverConfig(logsDone, metricsDone, tracesDone chan Stats) component.Config {
3232
return &Config{
3333
Logs: LogsConfig{
34-
doneCh: logsDone,
34+
SignalConfig: SignalConfig{
35+
doneCh: logsDone,
36+
MaxBufferSize: maxScannerBufSize,
37+
},
3538
},
3639
Metrics: MetricsConfig{
37-
doneCh: metricsDone,
40+
SignalConfig: SignalConfig{
41+
doneCh: metricsDone,
42+
MaxBufferSize: maxScannerBufSize,
43+
},
3844
AddCounterAttr: true,
3945
},
4046
Traces: TracesConfig{
41-
doneCh: tracesDone,
47+
SignalConfig: SignalConfig{
48+
doneCh: tracesDone,
49+
MaxBufferSize: maxScannerBufSize,
50+
},
4251
},
4352
Concurrency: 1,
4453
}

receiver/loadgenreceiver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
go.opentelemetry.io/collector/component v1.41.0
88
go.opentelemetry.io/collector/component/componenttest v0.135.0
99
go.opentelemetry.io/collector/confmap v1.41.0
10+
go.opentelemetry.io/collector/confmap/xconfmap v0.135.0
1011
go.opentelemetry.io/collector/consumer v1.41.0
1112
go.opentelemetry.io/collector/consumer/consumertest v0.135.0
1213
go.opentelemetry.io/collector/pdata v1.41.0

receiver/loadgenreceiver/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ go.opentelemetry.io/collector/component/componenttest v0.135.0 h1:OB6OmCWE1EwHwv
6363
go.opentelemetry.io/collector/component/componenttest v0.135.0/go.mod h1:9epxwkJW7ZXB1mTmCVF3JzfIoM0uhtnBTC2YWxrXczk=
6464
go.opentelemetry.io/collector/confmap v1.41.0 h1:m2Z7uZ1W4KpUdIWmps3vSv9jAvKFIr4EO/yYdSZ4+lE=
6565
go.opentelemetry.io/collector/confmap v1.41.0/go.mod h1:0nVs/u8BR6LZUjkMSOszBv1CSu4AGMoWv4c8zqu0ui0=
66+
go.opentelemetry.io/collector/confmap/xconfmap v0.135.0 h1:gPVQ5kkcgfdsyAkXttuj1/+SyweFkgSa0V5Bpd+FClQ=
67+
go.opentelemetry.io/collector/confmap/xconfmap v0.135.0/go.mod h1:10wDpK0GfVm1DNFDgkMSO0QAreSUX2jkvZoXRmQiuac=
6668
go.opentelemetry.io/collector/consumer v1.41.0 h1:sV77khNsZd5YR+vNtHIJaRcTXIlszNX7ZePpXRpm9PA=
6769
go.opentelemetry.io/collector/consumer v1.41.0/go.mod h1:fDB3ZjVCv2+zFsF/6WSYBSX3pkux/qAYf2Tk/P6b9yA=
6870
go.opentelemetry.io/collector/consumer/consumererror v0.135.0 h1:OTu0rLPWxWc03sqeYHdWGJFUA3W2DfgC1sHLZx8NMXI=

receiver/loadgenreceiver/logs.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,14 @@ func createLogsReceiver(
7474
}
7575
}
7676

77+
maxBufferSize := genConfig.Logs.MaxBufferSize
78+
if maxBufferSize == 0 {
79+
maxBufferSize = len(sampleLogs) + 10 // add some margin
80+
}
81+
7782
var items []plog.Logs
7883
scanner := bufio.NewScanner(bytes.NewReader(sampleLogs))
79-
scanner.Buffer(make([]byte, 0, maxScannerBufSize), maxScannerBufSize)
84+
scanner.Buffer(make([]byte, 0, maxBufferSize), maxBufferSize)
8085
for scanner.Scan() {
8186
logBytes := scanner.Bytes()
8287
lineLogs, err := parser.UnmarshalLogs(logBytes)

receiver/loadgenreceiver/logs_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@
1818
package loadgenreceiver // import "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver"
1919

2020
import (
21+
"bufio"
2122
"bytes"
2223
"context"
2324
"fmt"
25+
"os"
26+
"path/filepath"
27+
"strings"
2428
"testing"
2529

2630
"github.com/stretchr/testify/assert"
31+
"github.com/stretchr/testify/require"
2732
"go.opentelemetry.io/collector/component"
2833
"go.opentelemetry.io/collector/component/componenttest"
2934
"go.opentelemetry.io/collector/consumer/consumertest"
@@ -60,3 +65,33 @@ func TestLogsGenerator_doneCh(t *testing.T) {
6065
})
6166
}
6267
}
68+
69+
func TestLogsGenerator_MaxBufferSizeAttr(t *testing.T) {
70+
dummyData := `{"resourceLogs":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"my.service"}}]},"scopeLogs":[{"logRecords":[{"timeUnixNano":"1727411470107912000","body":{"stringValue":"Example log record"}}]}]}]}`
71+
for _, maxBufferSize := range []int{0, 10} {
72+
t.Run(fmt.Sprintf("max_buffer_size=%d", maxBufferSize), func(t *testing.T) {
73+
dir := t.TempDir()
74+
filePath := filepath.Join(dir, strings.ReplaceAll(t.Name(), "/", "_")+".jsonl")
75+
content := []byte(dummyData)
76+
require.NoError(t, os.WriteFile(filePath, content, 0644))
77+
78+
doneCh := make(chan Stats)
79+
cfg := createDefaultReceiverConfig(nil, doneCh, nil)
80+
cfg.(*Config).Logs.MaxBufferSize = maxBufferSize
81+
cfg.(*Config).Logs.JsonlFile = JsonlFile(filePath)
82+
83+
_, err := createLogsReceiver(context.Background(), receiver.Settings{
84+
ID: component.ID{},
85+
TelemetrySettings: component.TelemetrySettings{
86+
Logger: zap.NewNop(),
87+
},
88+
BuildInfo: component.BuildInfo{},
89+
}, cfg, consumertest.NewNop())
90+
if maxBufferSize == 0 {
91+
require.NoError(t, err)
92+
} else {
93+
require.EqualError(t, err, bufio.ErrTooLong.Error())
94+
}
95+
})
96+
}
97+
}

receiver/loadgenreceiver/metrics.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,14 @@ func createMetricsReceiver(
8181
}
8282
}
8383

84+
maxBufferSize := genConfig.Metrics.MaxBufferSize
85+
if maxBufferSize == 0 {
86+
maxBufferSize = len(sampleMetrics) + 10 // add some margin
87+
}
88+
8489
var items []pmetric.Metrics
8590
scanner := bufio.NewScanner(bytes.NewReader(sampleMetrics))
86-
scanner.Buffer(make([]byte, 0, maxScannerBufSize), maxScannerBufSize)
91+
scanner.Buffer(make([]byte, 0, maxBufferSize), maxBufferSize)
8792
for scanner.Scan() {
8893
metricBytes := scanner.Bytes()
8994
lineMetrics, err := parser.UnmarshalMetrics(metricBytes)

receiver/loadgenreceiver/metrics_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
package loadgenreceiver // import "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver"
1919

2020
import (
21+
"bufio"
2122
"bytes"
2223
"context"
2324
"fmt"
25+
"os"
26+
"path/filepath"
27+
"strings"
2428
"testing"
2529

2630
"github.com/stretchr/testify/assert"
@@ -108,3 +112,33 @@ func TestMetricsGenerator_addCounterAttr(t *testing.T) {
108112
})
109113
}
110114
}
115+
116+
func TestMetricsGenerator_MaxBufferSizeAttr(t *testing.T) {
117+
dummyData := `{"resourceMetrics":[{"resource":{"attributes":[]},"scopeMetrics":[{"scope":{"name":"app.currency","version":"1.11.1"},"metrics":[{"name":"app.currency_counter","sum":{"dataPoints":[{"attributes":[{"key":"currency_code","value":{"stringValue":"USD"}}],"startTimeUnixNano":"1732918744262776863","timeUnixNano":"1732919164268598041","asInt":"30"},{"attributes":[{"key":"currency_code","value":{"stringValue":"CAD"}}],"startTimeUnixNano":"1732918744262776863","timeUnixNano":"1732919164268598041","asInt":"4"},{"attributes":[{"key":"currency_code","value":{"stringValue":"CHF"}}],"startTimeUnixNano":"1732918744262776863","timeUnixNano":"1732919164268598041","asInt":"380"}],"aggregationTemporality":2,"isMonotonic":true}}]}]}]}`
118+
for _, maxBufferSize := range []int{0, 10} {
119+
t.Run(fmt.Sprintf("max_buffer_size=%d", maxBufferSize), func(t *testing.T) {
120+
dir := t.TempDir()
121+
filePath := filepath.Join(dir, strings.ReplaceAll(t.Name(), "/", "_")+".jsonl")
122+
content := []byte(dummyData)
123+
require.NoError(t, os.WriteFile(filePath, content, 0644))
124+
125+
doneCh := make(chan Stats)
126+
cfg := createDefaultReceiverConfig(nil, doneCh, nil)
127+
cfg.(*Config).Metrics.MaxBufferSize = maxBufferSize
128+
cfg.(*Config).Metrics.JsonlFile = JsonlFile(filePath)
129+
130+
_, err := createMetricsReceiver(context.Background(), receiver.Settings{
131+
ID: component.ID{},
132+
TelemetrySettings: component.TelemetrySettings{
133+
Logger: zap.NewNop(),
134+
},
135+
BuildInfo: component.BuildInfo{},
136+
}, cfg, consumertest.NewNop())
137+
if maxBufferSize == 0 {
138+
require.NoError(t, err)
139+
} else {
140+
require.EqualError(t, err, bufio.ErrTooLong.Error())
141+
}
142+
})
143+
}
144+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
loadgen:
2+
3+
loadgen/logs_invalid_max_replay:
4+
logs:
5+
max_replay: -1
6+
7+
loadgen/metrics_invalid_max_replay:
8+
metrics:
9+
max_replay: -1
10+
11+
loadgen/traces_invalid_max_replay:
12+
traces:
13+
max_replay: -1
14+
15+
loadgen/logs_invalid_max_buffer_size:
16+
logs:
17+
max_buffer_size: -1
18+
19+
loadgen/metrics_invalid_max_buffer_size:
20+
metrics:
21+
max_buffer_size: -1
22+
23+
loadgen/traces_invalid_max_buffer_size:
24+
traces:
25+
max_buffer_size: -1

0 commit comments

Comments
 (0)