Skip to content

Commit 16b08c0

Browse files
committed
Merge branch 'add-auxiliary-command-server-proto' into add-auxiliary-command-server-to-agent-config
2 parents 3f39fbf + 44f1e1e commit 16b08c0

File tree

14 files changed

+584
-4
lines changed

14 files changed

+584
-4
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ require (
6666
go.opentelemetry.io/collector/processor v1.30.0
6767
go.opentelemetry.io/collector/processor/batchprocessor v0.124.0
6868
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.124.0
69+
go.opentelemetry.io/collector/processor/processortest v0.124.0
6970
go.opentelemetry.io/collector/receiver v1.30.0
7071
go.opentelemetry.io/collector/receiver/otlpreceiver v0.124.0
7172
go.opentelemetry.io/collector/receiver/receivertest v0.124.0
@@ -74,6 +75,7 @@ require (
7475
go.opentelemetry.io/collector/scraper/scrapertest v0.124.0
7576
go.opentelemetry.io/otel v1.35.0
7677
go.uber.org/goleak v1.3.0
78+
go.uber.org/multierr v1.11.0
7779
go.uber.org/zap v1.27.0
7880
golang.org/x/mod v0.23.0
7981
golang.org/x/sync v0.13.0
@@ -260,7 +262,6 @@ require (
260262
go.opentelemetry.io/collector/pipeline/xpipeline v0.124.0 // indirect
261263
go.opentelemetry.io/collector/processor/processorhelper v0.124.0 // indirect
262264
go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.124.0 // indirect
263-
go.opentelemetry.io/collector/processor/processortest v0.124.0 // indirect
264265
go.opentelemetry.io/collector/processor/xprocessor v0.124.0 // indirect
265266
go.opentelemetry.io/collector/receiver/receiverhelper v0.124.0 // indirect
266267
go.opentelemetry.io/collector/receiver/xreceiver v0.124.0 // indirect
@@ -289,7 +290,6 @@ require (
289290
go.opentelemetry.io/otel/sdk/log v0.11.0 // indirect
290291
go.opentelemetry.io/otel/trace v1.35.0 // indirect
291292
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
292-
go.uber.org/multierr v1.11.0 // indirect
293293
golang.org/x/arch v0.12.0 // indirect
294294
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac // indirect
295295
golang.org/x/tools v0.30.0 // indirect

internal/collector/factories.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package collector
77

88
import (
99
"github.com/nginx/agent/v3/internal/collector/containermetricsreceiver"
10+
"github.com/nginx/agent/v3/internal/collector/logsgzipprocessor"
1011
nginxreceiver "github.com/nginx/agent/v3/internal/collector/nginxossreceiver"
1112
"github.com/nginx/agent/v3/internal/collector/nginxplusreceiver"
1213

@@ -104,6 +105,7 @@ func createProcessorFactories() map[component.Type]processor.Factory {
104105
redactionprocessor.NewFactory(),
105106
resourceprocessor.NewFactory(),
106107
transformprocessor.NewFactory(),
108+
logsgzipprocessor.NewFactory(),
107109
}
108110

109111
processors := make(map[component.Type]processor.Factory)

internal/collector/factories_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func TestOTelComponentFactoriesDefault(t *testing.T) {
1919
assert.NotNil(t, factories, "factories should not be nil")
2020

2121
assert.Len(t, factories.Receivers, 6)
22-
assert.Len(t, factories.Processors, 8)
22+
assert.Len(t, factories.Processors, 9)
2323
assert.Len(t, factories.Exporters, 4)
2424
assert.Len(t, factories.Extensions, 3)
2525
assert.Empty(t, factories.Connectors)
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Logs gzip processor
2+
3+
The Logs gzip processor gzips the input log record body, updating the log record in-place.
4+
5+
For metrics and traces, this will just be a pass-through as it does not implement related interfaces.
6+
7+
## Configuration
8+
9+
No configuration needed.
10+
11+
## Benchmarking
12+
13+
We performed benchmark measuring the performance of serial and concurrent operations (more practical) of this processor, with and without the `sync.Pool`. Here are the results:
14+
15+
```
16+
Concurrent Run: Without Sync Pool
17+
goos: darwin
18+
goarch: arm64
19+
pkg: github.com/nginx/agent/v3/internal/collector/logsgzipprocessor
20+
cpu: Apple M2 Pro
21+
BenchmarkGzipProcessor_Concurrent-12 24 45279866 ns/op 817791582 B/op 24727 allocs/op
22+
PASS
23+
ok github.com/nginx/agent/v3/internal/collector/logsgzipprocessor 1.939s
24+
25+
Concurrent Run: With Sync Pool
26+
27+
goos: darwin
28+
goarch: arm64
29+
pkg: github.com/nginx/agent/v3/internal/collector/logsgzipprocessor
30+
cpu: Apple M2 Pro
31+
BenchmarkGzipProcessor_Concurrent-12 147 9383213 ns/op 10948640 B/op 7820 allocs/op
32+
PASS
33+
ok github.com/nginx/agent/v3/internal/collector/logsgzipprocessor 2.026s
34+
35+
————
36+
37+
Serial Run: Without Sync Pool
38+
39+
goos: darwin
40+
goarch: arm64
41+
pkg: github.com/nginx/agent/v3/internal/collector/logsgzipprocessor
42+
cpu: Apple M2 Pro
43+
BenchmarkGzipProcessor/SmallRecords-12 100 12048268 ns/op 81898890 B/op 2537 allocs/op
44+
BenchmarkGzipProcessor/MediumRecords-12 100 13143269 ns/op 82027307 B/op 2541 allocs/op
45+
BenchmarkGzipProcessor/LargeRecords-12 91 15912399 ns/op 83198992 B/op 2580 allocs/op
46+
BenchmarkGzipProcessor/ManySmallRecords-12 2 807707542 ns/op 8143237656 B/op 243348 allocs/op
47+
48+
49+
Serial Run: With Sync Pool
50+
51+
goos: darwin
52+
goarch: arm64
53+
pkg: github.com/nginx/agent/v3/internal/collector/logsgzipprocessor
54+
cpu: Apple M2 Pro
55+
BenchmarkGzipProcessor/SmallRecords-12 205 7304839 ns/op 1027942 B/op 783 allocs/op
56+
BenchmarkGzipProcessor/MediumRecords-12 182 7336266 ns/op 1078050 B/op 784 allocs/op
57+
BenchmarkGzipProcessor/LargeRecords-12 132 9646940 ns/op 2057059 B/op 815 allocs/op
58+
BenchmarkGzipProcessor/ManySmallRecords-12 5 239726258 ns/op 6883977 B/op 73679 allocs/op
59+
PASS
60+
```
61+
62+
63+
To run this benchmark yourself with syncpool implementation, you can run the tests in `processor_benchmark_test.go` in with the `sync.Pool` mode.
64+
65+
To compare benchmark without syncpool, you can use this code block in `processor.go` and comment the existing `gzipCompress` function, and run `processor_benchmark_test.go` :
66+
67+
```
68+
func (p *logsGzipProcessor) gzipCompress(data []byte) ([]byte, error) {
69+
var buf bytes.Buffer
70+
w := gzip.NewWriter(&buf)
71+
_, err := w.Write(data)
72+
if err != nil {
73+
return nil, err
74+
}
75+
if err = w.Close(); err != nil {
76+
return nil, err
77+
}
78+
79+
return buf.Bytes(), nil
80+
}
81+
```
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Copyright (c) F5, Inc.
2+
//
3+
// This source code is licensed under the Apache License, Version 2.0 license found in the
4+
// LICENSE file in the root directory of this source tree.
5+
package logsgzipprocessor
6+
7+
import (
8+
"bytes"
9+
"compress/gzip"
10+
"context"
11+
"fmt"
12+
"io"
13+
"sync"
14+
15+
"go.opentelemetry.io/collector/component"
16+
"go.opentelemetry.io/collector/consumer"
17+
"go.opentelemetry.io/collector/pdata/pcommon"
18+
"go.opentelemetry.io/collector/pdata/plog"
19+
"go.opentelemetry.io/collector/processor"
20+
"go.uber.org/multierr"
21+
"go.uber.org/zap"
22+
)
23+
24+
// nolint: ireturn
25+
func NewFactory() processor.Factory {
26+
return processor.NewFactory(
27+
component.MustNewType("logsgzip"),
28+
func() component.Config {
29+
return &struct{}{}
30+
},
31+
processor.WithLogs(createLogsGzipProcessor, component.StabilityLevelBeta),
32+
)
33+
}
34+
35+
// nolint: ireturn
36+
func createLogsGzipProcessor(_ context.Context,
37+
settings processor.Settings,
38+
cfg component.Config,
39+
logs consumer.Logs,
40+
) (processor.Logs, error) {
41+
logger := settings.Logger
42+
logger.Info("Creating logs gzip processor")
43+
44+
return newLogsGzipProcessor(logs, settings), nil
45+
}
46+
47+
// logsGzipProcessor is a custom-processor implementation for compressing individual log records into
48+
// gzip format. This can be used to reduce the size of log records and improve performance when processing
49+
// large log volumes. This processor will be used by default for agent interacting with NGINX One
50+
// console (https://docs.nginx.com/nginx-one/about/).
51+
type logsGzipProcessor struct {
52+
nextConsumer consumer.Logs
53+
// We use sync.Pool to efficiently manage and reuse gzip.Writer instances within this processor.
54+
// Otherwise, creating a new compressor for every log record would result in frequent memory allocations
55+
// and increased garbage collection overhead, especially under high-throughput workload like this one.
56+
// By pooling these objects, we minimize allocation churn, reduce GC pressure, and improve overall performance.
57+
pool *sync.Pool
58+
settings processor.Settings
59+
}
60+
61+
type GzipWriter interface {
62+
Write(p []byte) (int, error)
63+
Close() error
64+
Reset(w io.Writer)
65+
}
66+
67+
func newLogsGzipProcessor(logs consumer.Logs, settings processor.Settings) *logsGzipProcessor {
68+
return &logsGzipProcessor{
69+
nextConsumer: logs,
70+
pool: &sync.Pool{
71+
New: func() any {
72+
return gzip.NewWriter(nil)
73+
},
74+
},
75+
settings: settings,
76+
}
77+
}
78+
79+
func (p *logsGzipProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
80+
var errs error
81+
resourceLogs := ld.ResourceLogs()
82+
for i := range resourceLogs.Len() {
83+
scopeLogs := resourceLogs.At(i).ScopeLogs()
84+
for j := range scopeLogs.Len() {
85+
err := p.processLogRecords(scopeLogs.At(j).LogRecords())
86+
if err != nil {
87+
errs = multierr.Append(errs, err)
88+
}
89+
}
90+
}
91+
if errs != nil {
92+
return fmt.Errorf("failed processing log records: %w", errs)
93+
}
94+
95+
return p.nextConsumer.ConsumeLogs(ctx, ld)
96+
}
97+
98+
func (p *logsGzipProcessor) processLogRecords(logRecords plog.LogRecordSlice) error {
99+
var errs error
100+
// Filter out unsupported data types in the log before processing
101+
logRecords.RemoveIf(func(lr plog.LogRecord) bool {
102+
body := lr.Body()
103+
// Keep only STRING or BYTES types
104+
if body.Type() != pcommon.ValueTypeStr &&
105+
body.Type() != pcommon.ValueTypeBytes {
106+
p.settings.Logger.Debug("Skipping log record with unsupported body type", zap.Any("type", body.Type()))
107+
return true
108+
}
109+
110+
return false
111+
})
112+
// Process remaining valid records
113+
for k := range logRecords.Len() {
114+
record := logRecords.At(k)
115+
body := record.Body()
116+
var data []byte
117+
//nolint:exhaustive // Already filtered out other types with RemoveIf
118+
switch body.Type() {
119+
case pcommon.ValueTypeStr:
120+
data = []byte(body.Str())
121+
case pcommon.ValueTypeBytes:
122+
data = body.Bytes().AsRaw()
123+
}
124+
gzipped, err := p.gzipCompress(data)
125+
if err != nil {
126+
errs = multierr.Append(errs, fmt.Errorf("failed to compress log record: %w", err))
127+
128+
continue
129+
}
130+
err = record.Body().FromRaw(gzipped)
131+
if err != nil {
132+
errs = multierr.Append(errs, fmt.Errorf("failed to set gzipped data to log record body: %w", err))
133+
134+
continue
135+
}
136+
}
137+
138+
return errs
139+
}
140+
141+
func (p *logsGzipProcessor) gzipCompress(data []byte) ([]byte, error) {
142+
var buf bytes.Buffer
143+
var err error
144+
wIface := p.pool.Get()
145+
w, ok := wIface.(GzipWriter)
146+
if !ok {
147+
return nil, fmt.Errorf("writer of type %T not supported", wIface)
148+
}
149+
w.Reset(&buf)
150+
defer func() {
151+
if err = w.Close(); err != nil {
152+
p.settings.Logger.Error("Failed to close gzip writer", zap.Error(err))
153+
}
154+
p.pool.Put(w)
155+
}()
156+
157+
_, err = w.Write(data)
158+
if err != nil {
159+
return nil, err
160+
}
161+
if err = w.Close(); err != nil {
162+
return nil, err
163+
}
164+
165+
return buf.Bytes(), nil
166+
}
167+
168+
func (p *logsGzipProcessor) Capabilities() consumer.Capabilities {
169+
return consumer.Capabilities{
170+
MutatesData: true,
171+
}
172+
}
173+
174+
func (p *logsGzipProcessor) Start(ctx context.Context, _ component.Host) error {
175+
p.settings.Logger.Info("Starting logs gzip processor")
176+
return nil
177+
}
178+
179+
func (p *logsGzipProcessor) Shutdown(ctx context.Context) error {
180+
p.settings.Logger.Info("Shutting down logs gzip processor")
181+
return nil
182+
}

0 commit comments

Comments
 (0)