Skip to content

Commit 0b8f988

Browse files
kamalchaturvediKamal Chaturvedi
authored andcommitted
Utilizing otlp logger so error/info/debug logs are directed to otel logs sink. Also includes other logging related cleanup, as per code review comments.
1 parent 67c08be commit 0b8f988

File tree

3 files changed

+45
-27
lines changed

3 files changed

+45
-27
lines changed

internal/collector/logsgzipprocessor/processor.go

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"context"
1111
"fmt"
1212
"io"
13-
"log/slog"
1413
"sync"
1514

1615
"go.opentelemetry.io/collector/component"
@@ -19,6 +18,7 @@ import (
1918
"go.opentelemetry.io/collector/pdata/plog"
2019
"go.opentelemetry.io/collector/processor"
2120
"go.uber.org/multierr"
21+
"go.uber.org/zap"
2222
)
2323

2424
// nolint: ireturn
@@ -34,11 +34,14 @@ func NewFactory() processor.Factory {
3434

3535
// nolint: ireturn
3636
func createLogsGzipProcessor(_ context.Context,
37-
_ processor.Settings,
37+
settings processor.Settings,
3838
cfg component.Config,
3939
logs consumer.Logs,
4040
) (processor.Logs, error) {
41-
return newLogsGzipProcessor(logs), nil
41+
logger := settings.Logger
42+
logger.Info("Creating logs gzip processor")
43+
44+
return newLogsGzipProcessor(logs, settings), nil
4245
}
4346

4447
// logsGzipProcessor is a custom-processor implementation for compressing individual log records into
@@ -51,7 +54,8 @@ type logsGzipProcessor struct {
5154
// Otherwise, creating a new compressor for every log record would result in frequent memory allocations
5255
// and increased garbage collection overhead, especially under high-throughput workload like this one.
5356
// By pooling these objects, we minimize allocation churn, reduce GC pressure, and improve overall performance.
54-
pool *sync.Pool
57+
pool *sync.Pool
58+
settings processor.Settings
5559
}
5660

5761
type GzipWriter interface {
@@ -60,14 +64,15 @@ type GzipWriter interface {
6064
Reset(w io.Writer)
6165
}
6266

63-
func newLogsGzipProcessor(logs consumer.Logs) *logsGzipProcessor {
67+
func newLogsGzipProcessor(logs consumer.Logs, settings processor.Settings) *logsGzipProcessor {
6468
return &logsGzipProcessor{
6569
nextConsumer: logs,
6670
pool: &sync.Pool{
6771
New: func() any {
6872
return gzip.NewWriter(nil)
6973
},
7074
},
75+
settings: settings,
7176
}
7277
}
7378

@@ -77,7 +82,7 @@ func (p *logsGzipProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error
7782
for i := range resourceLogs.Len() {
7883
scopeLogs := resourceLogs.At(i).ScopeLogs()
7984
for j := range scopeLogs.Len() {
80-
err := p.processLogRecords(ctx, scopeLogs.At(j).LogRecords())
85+
err := p.processLogRecords(scopeLogs.At(j).LogRecords())
8186
if err != nil {
8287
errs = multierr.Append(errs, err)
8388
}
@@ -90,15 +95,19 @@ func (p *logsGzipProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error
9095
return p.nextConsumer.ConsumeLogs(ctx, ld)
9196
}
9297

93-
func (p *logsGzipProcessor) processLogRecords(ctx context.Context, logRecords plog.LogRecordSlice) error {
98+
func (p *logsGzipProcessor) processLogRecords(logRecords plog.LogRecordSlice) error {
9499
var errs error
95100
// Filter out unsupported data types in the log before processing
96101
logRecords.RemoveIf(func(lr plog.LogRecord) bool {
97102
body := lr.Body()
98103
// Keep only STRING or BYTES types
104+
if body.Type() != pcommon.ValueTypeStr &&
105+
body.Type() != pcommon.ValueTypeBytes {
106+
p.settings.Logger.Debug("Skipping log record with unsupported body type", zap.Any("type", body.Type()))
107+
return true
108+
}
99109

100-
return body.Type() != pcommon.ValueTypeStr &&
101-
body.Type() != pcommon.ValueTypeBytes
110+
return false
102111
})
103112
// Process remaining valid records
104113
for k := range logRecords.Len() {
@@ -114,15 +123,13 @@ func (p *logsGzipProcessor) processLogRecords(ctx context.Context, logRecords pl
114123
}
115124
gzipped, err := p.gzipCompress(data)
116125
if err != nil {
117-
slog.ErrorContext(ctx, "failed to compress log record", slog.Any("error", err))
118-
errs = multierr.Append(errs, err)
126+
errs = multierr.Append(errs, fmt.Errorf("failed to compress log record: %w", err))
119127

120128
continue
121129
}
122130
err = record.Body().FromRaw(gzipped)
123131
if err != nil {
124-
slog.ErrorContext(ctx, "failed to set gzipped data to log record body", slog.Any("error", err))
125-
errs = multierr.Append(errs, err)
132+
errs = multierr.Append(errs, fmt.Errorf("failed to set gzipped data to log record body: %w", err))
126133

127134
continue
128135
}
@@ -133,18 +140,21 @@ func (p *logsGzipProcessor) processLogRecords(ctx context.Context, logRecords pl
133140

134141
func (p *logsGzipProcessor) gzipCompress(data []byte) ([]byte, error) {
135142
var buf bytes.Buffer
143+
var err error
136144
wIface := p.pool.Get()
137145
w, ok := wIface.(GzipWriter)
138146
if !ok {
139147
return nil, fmt.Errorf("writer of type %T not supported", wIface)
140148
}
141149
w.Reset(&buf)
142150
defer func() {
143-
w.Close()
151+
if err = w.Close(); err != nil {
152+
p.settings.Logger.Error("Failed to close gzip writer", zap.Error(err))
153+
}
144154
p.pool.Put(w)
145155
}()
146156

147-
_, err := w.Write(data)
157+
_, err = w.Write(data)
148158
if err != nil {
149159
return nil, err
150160
}
@@ -162,11 +172,11 @@ func (p *logsGzipProcessor) Capabilities() consumer.Capabilities {
162172
}
163173

164174
func (p *logsGzipProcessor) Start(ctx context.Context, _ component.Host) error {
165-
slog.DebugContext(ctx, "starting logs gzip processor")
175+
p.settings.Logger.Info("Starting logs gzip processor")
166176
return nil
167177
}
168178

169179
func (p *logsGzipProcessor) Shutdown(ctx context.Context) error {
170-
slog.DebugContext(ctx, "shutting down logs gzip processor")
180+
p.settings.Logger.Info("Shutting down logs gzip processor")
171181
return nil
172182
}

internal/collector/logsgzipprocessor/processor_benchmark_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"go.opentelemetry.io/collector/consumer/consumertest"
1414
"go.opentelemetry.io/collector/pdata/plog"
15+
"go.opentelemetry.io/collector/processor"
1516
)
1617

1718
// Helper to generate logs with variable size and content
@@ -59,12 +60,12 @@ func BenchmarkGzipProcessor(b *testing.B) {
5960
b.Run(bm.name, func(b *testing.B) {
6061
b.ReportAllocs()
6162
consumer := &consumertest.LogsSink{}
62-
processor := newLogsGzipProcessor(consumer)
63+
p := newLogsGzipProcessor(consumer, processor.Settings{})
6364
logs := generateLogs(bm.numRecords, bm.recordSize)
6465

6566
b.ResetTimer()
6667
for i := 0; i < b.N; i++ {
67-
_ = processor.ConsumeLogs(context.Background(), logs)
68+
_ = p.ConsumeLogs(context.Background(), logs)
6869
}
6970
})
7071
}
@@ -76,13 +77,13 @@ func BenchmarkGzipProcessor_Concurrent(b *testing.B) {
7677
const workers = 8
7778
logs := generateLogs(1000, 1000)
7879
consumer := &consumertest.LogsSink{}
79-
processor := newLogsGzipProcessor(consumer)
80+
p := newLogsGzipProcessor(consumer, processor.Settings{})
8081

8182
b.ReportAllocs()
8283
b.ResetTimer()
8384
b.RunParallel(func(pb *testing.PB) {
8485
for pb.Next() {
85-
_ = processor.ConsumeLogs(context.Background(), logs)
86+
_ = p.ConsumeLogs(context.Background(), logs)
8687
}
8788
})
8889
}

internal/collector/logsgzipprocessor/processor_test.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"github.com/stretchr/testify/require"
1818
"go.opentelemetry.io/collector/consumer/consumertest"
1919
"go.opentelemetry.io/collector/pdata/plog"
20+
"go.opentelemetry.io/collector/processor/processortest"
21+
"go.uber.org/zap"
2022
)
2123

2224
var dummyInputStr = "hello world"
@@ -27,22 +29,24 @@ func TestGzipProcessor(t *testing.T) {
2729
name string
2830
}{
2931
{
30-
name: "string content",
32+
name: "Test 1: string content",
3133
input: dummyInputStr,
3234
},
3335
{
34-
name: "byte content",
36+
name: "Test 2: byte content",
3537
input: []byte("binary data"),
3638
},
3739
{
38-
name: "integer content",
40+
name: "Test 3: integer content",
3941
input: 12345,
4042
},
4143
}
4244

4345
for _, tc := range testCases {
4446
t.Run(tc.name, func(t *testing.T) {
4547
ctx := context.Background()
48+
settings := processortest.NewNopSettings(processortest.NopType)
49+
settings.Logger = zap.NewNop()
4650
// Setup: create a log record with the test case content
4751
logs := plog.NewLogs()
4852
logRecord := logs.ResourceLogs().AppendEmpty().
@@ -60,7 +64,7 @@ func TestGzipProcessor(t *testing.T) {
6064
}
6165

6266
next := &consumertest.LogsSink{}
63-
processor := newLogsGzipProcessor(next)
67+
processor := newLogsGzipProcessor(next, settings)
6468
require.NoError(t, processor.Start(ctx, nil))
6569

6670
capability := processor.Capabilities()
@@ -121,18 +125,20 @@ func TestGzipProcessorFailure(t *testing.T) {
121125
isGzipCloseError bool
122126
}{
123127
{
124-
name: "gzip write failure",
128+
name: "Test 1: gzip write failure",
125129
isGzipWriteError: true,
126130
},
127131
{
128-
name: "gzip writer close failure",
132+
name: "Test 2: gzip writer close failure",
129133
isGzipCloseError: true,
130134
},
131135
}
132136

133137
for _, tc := range testCases {
134138
t.Run(tc.name, func(t *testing.T) {
135139
ctx := context.Background()
140+
settings := processortest.NewNopSettings(processortest.NopType)
141+
settings.Logger = zap.NewNop()
136142
// Setup: create a log record with the test case content
137143
logs := plog.NewLogs()
138144
logRecord := logs.ResourceLogs().AppendEmpty().
@@ -151,6 +157,7 @@ func TestGzipProcessorFailure(t *testing.T) {
151157
return mockWriter
152158
},
153159
},
160+
settings: settings,
154161
}
155162
require.NoError(t, processor.Start(ctx, nil))
156163

0 commit comments

Comments
 (0)