diff --git a/pkg/otel/arrow_record/producer.go b/pkg/otel/arrow_record/producer.go index fe4c57d6..ed141c92 100644 --- a/pkg/otel/arrow_record/producer.go +++ b/pkg/otel/arrow_record/producer.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" colarspb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1" + "github.com/f5/otel-arrow-adapter/pkg/otel/common" acommon "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow" logsarrow "github.com/f5/otel-arrow-adapter/pkg/otel/logs/arrow" metricsarrow "github.com/f5/otel-arrow-adapter/pkg/otel/metrics/arrow" @@ -53,6 +54,9 @@ type Producer struct { metricsSchema *acommon.AdaptiveSchema logsSchema *acommon.AdaptiveSchema tracesSchema *acommon.AdaptiveSchema + // CLP like log optimization + // see [CLP: Efficient and Scalable Search on Compressed Text Logs](https://www.usenix.org/system/files/osdi21-rodrigues.pdf) + logOptimization *common.LogConfig } type streamProducer struct { @@ -65,6 +69,7 @@ type Config struct { pool memory.Allocator initIndexSize uint64 limitIndexSize uint64 + logConfig *common.LogConfig } type Option func(*Config) @@ -84,6 +89,7 @@ func NewProducerWithOptions(options ...Option) *Producer { pool: memory.NewGoAllocator(), initIndexSize: math.MaxUint16, limitIndexSize: math.MaxUint16, + logConfig: common.DefaultLogConfig(), } for _, opt := range options { opt(cfg) @@ -104,6 +110,7 @@ func NewProducerWithOptions(options ...Option) *Producer { tracesarrow.Schema, acommon.WithDictInitIndexSize(cfg.initIndexSize), acommon.WithDictLimitIndexSize(cfg.limitIndexSize)), + logOptimization: cfg.logConfig, } } @@ -134,7 +141,7 @@ func (p *Producer) BatchArrowRecordsFromLogs(ls plog.Logs) (*colarspb.BatchArrow // Note: The record returned is wrapped into a RecordMessage and will // be released by the Producer.Produce method. record, err := RecordBuilder[plog.Logs](func() (acommon.EntityBuilder[plog.Logs], error) { - return logsarrow.NewLogsBuilder(p.pool, p.logsSchema) + return logsarrow.NewLogsBuilder(p.pool, p.logsSchema, p.logOptimization) }, ls) if err != nil { return nil, err @@ -361,3 +368,15 @@ func WithUint64LimitDictIndex() Option { cfg.limitIndexSize = math.MaxUint64 } } + +func WithLogOptimization(logConfig *common.LogConfig) Option { + return func(cfg *Config) { + cfg.logConfig = logConfig + } +} + +func WithNoLogOptimization() Option { + return func(cfg *Config) { + cfg.logConfig = nil + } +} diff --git a/pkg/otel/common/arrow/any_value.go b/pkg/otel/common/arrow/any_value.go index c8baf967..2e82b6b7 100644 --- a/pkg/otel/common/arrow/any_value.go +++ b/pkg/otel/common/arrow/any_value.go @@ -110,6 +110,10 @@ func (b *AnyValueBuilder) Append(av pcommon.Value) error { return err } +func (b *AnyValueBuilder) AppendNull() { + b.builder.AppendNull() +} + // Release releases the memory allocated by the builder. func (b *AnyValueBuilder) Release() { if !b.released { diff --git a/pkg/otel/common/arrow/schema.go b/pkg/otel/common/arrow/schema.go index 3980b207..46826463 100644 --- a/pkg/otel/common/arrow/schema.go +++ b/pkg/otel/common/arrow/schema.go @@ -381,7 +381,14 @@ func updateField(f *arrow.Field, dictMap map[*arrow.DictionaryType]*arrow.Dictio func getDictionaryArray(arr arrow.Array, ids []int) *array.Dictionary { if len(ids) == 0 { - return arr.(*array.Dictionary) + switch b := arr.(type) { + case *array.Dictionary: + return b + case *array.List: + return getDictionaryArray(b.ListValues(), ids) + default: + panic("getDictionaryArray: unsupported array type `" + arr.DataType().Name() + "`") + } } switch arr := arr.(type) { @@ -409,6 +416,14 @@ func getDictionaryArray(arr arrow.Array, ids []int) *array.Dictionary { func getDictionaryBuilder(builder array.Builder, ids []int) array.DictionaryBuilder { if len(ids) == 0 { + switch b := builder.(type) { + case array.DictionaryBuilder: + return b + case *array.ListBuilder: + return getDictionaryBuilder(b.ValueBuilder(), ids) + default: + panic("getDictionaryBuilder: unsupported builder type `" + b.Type().Name() + "`") + } return builder.(array.DictionaryBuilder) } diff --git a/pkg/otel/common/log.go b/pkg/otel/common/log.go new file mode 100644 index 00000000..6f000a91 --- /dev/null +++ b/pkg/otel/common/log.go @@ -0,0 +1,50 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package common + +// LogConfig is a configuration for the log compressor (CLP inspired log optimization). +// See [CLP: Efficient and Scalable Search on Compressed Text Logs](https://www.usenix.org/system/files/osdi21-rodrigues.pdf) +type LogConfig struct { + Delimiter string + DictVars []string +} + +func DefaultLogConfig() *LogConfig { + return &LogConfig{ + Delimiter: " \\t\\r\\n!\"#$%&'\\(\\)\\*,:;<>?@\\[\\]\\^_`\\{\\|\\}~", + DictVars: []string{ + "0x[0-9a-fA-F]+", // Hexadecimal identifier + "\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}", // ip v4 address + "\\[([0-9a-fA-F]{1,4}:){7}([0-9a-fA-F]){1,4}\\]", // ip v6 address + ".+\\d.+", // words and identifiers ending by a number (e.g. task-123, container-123) + ".*=.*[a-zA-Z0-9].*", // key=value pair with alphanumeric value + }, + } +} + +// EncodedLog represents a log record into 3 parts: +// - LogType: the log type, which is the static pattern detected by the compressor (highly repetitive). +// - DictVars: the dictionary variables, which are the variables that are repeated in the log. +// - IntVars: non-dictionary variables which are int 64 numbers (potentially high cardinality values). +// - FloatVars: non-dictionary variables which are float 64 numbers (potentially high cardinality values). +type EncodedLog struct { + LogType string + DictVars []string + IntVars []int64 + FloatVars []float64 +} diff --git a/pkg/otel/constants/constants.go b/pkg/otel/constants/constants.go index 7e1d5525..3d3e5d1f 100644 --- a/pkg/otel/constants/constants.go +++ b/pkg/otel/constants/constants.go @@ -52,6 +52,7 @@ const NAME string = "name" const KIND string = "kind" const VERSION string = "version" const BODY string = "body" +const ENCODED_STR_BODY string = "encoded_str_body" const STATUS string = "status" const DESCRIPTION string = "description" const UNIT string = "unit" diff --git a/pkg/otel/logs/arrow/all_test.go b/pkg/otel/logs/arrow/all_test.go index 0dd6d794..37c9aaff 100644 --- a/pkg/otel/logs/arrow/all_test.go +++ b/pkg/otel/logs/arrow/all_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/plog" + "github.com/f5/otel-arrow-adapter/pkg/otel/common" acommon "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow" "github.com/f5/otel-arrow-adapter/pkg/otel/internal" ) @@ -16,7 +17,7 @@ func TestLogRecord(t *testing.T) { pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer pool.AssertSize(t, 0) - sb := NewLogRecordBuilder(pool) + sb := NewLogRecordBuilder(pool, nil) if err := sb.Append(LogRecord1()); err != nil { t.Fatal(err) @@ -47,7 +48,7 @@ func TestScopeLogs(t *testing.T) { pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer pool.AssertSize(t, 0) - ssb := NewScopeLogsBuilder(pool) + ssb := NewScopeLogsBuilder(pool, nil) if err := ssb.Append(ScopeLogs1()); err != nil { t.Fatal(err) @@ -78,7 +79,7 @@ func TestResourceLogs(t *testing.T) { pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer pool.AssertSize(t, 0) - rsb := NewResourceLogsBuilder(pool) + rsb := NewResourceLogsBuilder(pool, nil) if err := rsb.Append(ResourceLogs1()); err != nil { t.Fatal(err) @@ -111,7 +112,7 @@ func TestLogs(t *testing.T) { defer pool.AssertSize(t, 0) logsSchema := acommon.NewAdaptiveSchema(Schema) defer logsSchema.Release() - tb, err := NewLogsBuilder(pool, logsSchema) + tb, err := NewLogsBuilder(pool, logsSchema, common.DefaultLogConfig()) require.NoError(t, err) err = tb.Append(Logs()) diff --git a/pkg/otel/logs/arrow/compress.go b/pkg/otel/logs/arrow/compress.go new file mode 100644 index 00000000..1bcd9721 --- /dev/null +++ b/pkg/otel/logs/arrow/compress.go @@ -0,0 +1,164 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package arrow + +import ( + "fmt" + "regexp" + "strconv" + "strings" + + "github.com/f5/otel-arrow-adapter/pkg/otel/common" +) + +type LogCompressor struct { + config *common.LogConfig + delimiter *regexp.Regexp + dictVars []*regexp.Regexp + nonDictVars []*regexp.Regexp +} + +func NewLogCompressor(config *common.LogConfig) *LogCompressor { + var ( + delimiter *regexp.Regexp + dictVars []*regexp.Regexp + ) + + if config != nil { + delimiter = regexp.MustCompile(fmt.Sprintf("[%s]+", config.Delimiter)) + + dictVars = make([]*regexp.Regexp, len(config.DictVars)) + for i, pattern := range config.DictVars { + dictVars[i] = regexp.MustCompile(fmt.Sprintf("^%s$", pattern)) + } + } + + return &LogCompressor{ + config: config, + delimiter: delimiter, + dictVars: dictVars, + } +} + +func (lc *LogCompressor) Config() *common.LogConfig { + return lc.config +} + +func (lc *LogCompressor) Compress(log string) *common.EncodedLog { + if lc.config == nil { + return &common.EncodedLog{ + LogType: log, + } + } + + var ( + logTypeBuf strings.Builder + tokenBuf strings.Builder + dictVars []string + intVars []int64 + floatVars []float64 + ) + + delimiterIndices := lc.delimiter.FindAllStringIndex(log, -1) + + if len(delimiterIndices) == 0 { + // No delimiters found, return the whole log as a log type + return &common.EncodedLog{ + LogType: log, + } + } + + // Add an artificial delimiter at the end of the log to simplify + // the logic of the main loop. + delimiterIndices = append(delimiterIndices, []int{len(log), len(log)}) + + curDelimiter := 0 + inToken := true + if delimiterIndices[curDelimiter][0] == 0 { + inToken = false + } + + // iterate over the log characters (single pass) + for pos, c := range log { + if c == '\x11' || c == '\x12' { + // skip these characters + continue + } + + if pos == delimiterIndices[curDelimiter][0] { // left delimiter + token := tokenBuf.String() + tokenBuf.Reset() + if varFound := lc.extractVariable(&token, &logTypeBuf, &intVars, &floatVars, &dictVars); !varFound { + logTypeBuf.WriteString(token) + } + inToken = false + } else if pos == delimiterIndices[curDelimiter][1] { // right delimiter + curDelimiter++ + inToken = true + } + + // Write current character to the current segment (token or log type) + if inToken { + tokenBuf.WriteRune(c) + } else { + logTypeBuf.WriteRune(c) + } + } + + // In case of a trailing token + if tokenBuf.Len() > 0 { + token := tokenBuf.String() + if varFound := lc.extractVariable(&token, &logTypeBuf, &intVars, &floatVars, &dictVars); !varFound { + logTypeBuf.WriteString(token) + } + } + + return &common.EncodedLog{ + LogType: logTypeBuf.String(), + DictVars: dictVars, + IntVars: intVars, + FloatVars: floatVars, + } +} + +func (lc *LogCompressor) extractVariable(text *string, logType *strings.Builder, intVars *[]int64, floatVars *[]float64, dictVars *[]string) bool { + if i64V, err := strconv.ParseInt(*text, 10, 64); err == nil { + logType.WriteRune('\x12') + logType.WriteRune(rune(len(*intVars))) + *intVars = append(*intVars, i64V) + return true + } + + if f64V, err := strconv.ParseFloat(*text, 64); err == nil { + logType.WriteRune('\x13') + logType.WriteRune(rune(len(*floatVars))) + *floatVars = append(*floatVars, f64V) + return true + } + + for _, regex := range lc.dictVars { + if regex.Match([]byte(*text)) { + logType.WriteRune('\x11') + logType.WriteRune(rune(len(*dictVars))) + *dictVars = append(*dictVars, *text) + return true + } + } + + return false +} diff --git a/pkg/otel/logs/arrow/compress_test.go b/pkg/otel/logs/arrow/compress_test.go new file mode 100644 index 00000000..cbe21672 --- /dev/null +++ b/pkg/otel/logs/arrow/compress_test.go @@ -0,0 +1,88 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package arrow + +import ( + "testing" + + "github.com/davecgh/go-spew/spew" + "github.com/stretchr/testify/require" + + "github.com/f5/otel-arrow-adapter/pkg/otel/common" +) + +func TestCompress(t *testing.T) { + compressor := NewLogCompressor(common.DefaultLogConfig()) + + encodedLog := compressor.Compress("duration: 134") + require.Equal(t, "duration: \x12\x00", encodedLog.LogType) + require.Equal(t, 0, len(encodedLog.DictVars)) + require.Equal(t, 1, len(encodedLog.IntVars)) + require.EqualValues(t, []int64{134}, encodedLog.IntVars) + require.Equal(t, 0, len(encodedLog.FloatVars)) + + encodedLog = compressor.Compress("duration: 134.67") + require.Equal(t, "duration: \x13\x00", encodedLog.LogType) + require.Equal(t, 0, len(encodedLog.DictVars)) + require.Equal(t, 0, len(encodedLog.IntVars)) + require.Equal(t, 1, len(encodedLog.FloatVars)) + require.EqualValues(t, []float64{134.67}, encodedLog.FloatVars) + + encodedLog = compressor.Compress("service `abc` (method: GET, duration: 134.67, status: 200, task: task-123, uuid: 12345678-1234-1234-1234-123456789012)") + require.Equal(t, "service `abc` (method: GET, duration: \x13\x00, status: \x12\x00, task: \x11\x00, uuid: \x11\x01)", encodedLog.LogType) + require.Equal(t, 2, len(encodedLog.DictVars)) + require.EqualValues(t, []string{"task-123", "12345678-1234-1234-1234-123456789012"}, encodedLog.DictVars) + require.Equal(t, 1, len(encodedLog.IntVars)) + require.EqualValues(t, []int64{200}, encodedLog.IntVars) + require.Equal(t, 1, len(encodedLog.FloatVars)) + require.EqualValues(t, []float64{134.67}, encodedLog.FloatVars) + + encodedLog = compressor.Compress("service `abc` ()") + require.Equal(t, "service `abc` ()", encodedLog.LogType) + require.Equal(t, 0, len(encodedLog.DictVars)) + require.Equal(t, 0, len(encodedLog.IntVars)) + require.Equal(t, 0, len(encodedLog.FloatVars)) + + encodedLog = compressor.Compress("") + require.Equal(t, "", encodedLog.LogType) + require.Equal(t, 0, len(encodedLog.DictVars)) + require.Equal(t, 0, len(encodedLog.IntVars)) + require.Equal(t, 0, len(encodedLog.FloatVars)) + + encodedLog = compressor.Compress(" \t\n") + require.Equal(t, " \t\n", encodedLog.LogType) + require.Equal(t, 0, len(encodedLog.DictVars)) + require.Equal(t, 0, len(encodedLog.IntVars)) + require.Equal(t, 0, len(encodedLog.FloatVars)) + + encodedLog = compressor.Compress(" 134 123.0 1234.") + spew.Dump(encodedLog) + require.Equal(t, " \\x12\\x00 \\x13\\x00 \\x13\\x01.", encodedLog.LogType) + require.Equal(t, 0, len(encodedLog.DictVars)) + require.Equal(t, 2, len(encodedLog.IntVars)) + require.EqualValues(t, []int64{134, 1234}, encodedLog.IntVars) + require.Equal(t, 1, len(encodedLog.FloatVars)) + require.EqualValues(t, []float64{123.0}, encodedLog.FloatVars) + + // TODO + // - Can't we just extract integers? + // - Don't think that the number behind the \x13, \x12 is required. + // - Complete the test and the integration with both side Compress and Decompress + // - Rename Compress/Decompress to something that express the concept of static pattern extraction + // - Measure the performance of the compression +} diff --git a/pkg/otel/logs/arrow/encoded_log.go b/pkg/otel/logs/arrow/encoded_log.go new file mode 100644 index 00000000..ec4436cf --- /dev/null +++ b/pkg/otel/logs/arrow/encoded_log.go @@ -0,0 +1,145 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package arrow + +import ( + "fmt" + + "github.com/apache/arrow/go/v11/arrow" + "github.com/apache/arrow/go/v11/arrow/array" + + "github.com/f5/otel-arrow-adapter/pkg/otel/common" + acommon "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow" +) + +var ( + // EncodedLogDT + EncodedLogDT = arrow.StructOf([]arrow.Field{ + {Name: "log_type", Type: acommon.DefaultDictBinary}, + {Name: "str_vars", Type: arrow.ListOf(acommon.DefaultDictBinary)}, + {Name: "i64_vars", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64)}, + {Name: "f64_vars", Type: arrow.ListOf(arrow.PrimitiveTypes.Float64)}, + }...) +) + +// EncodedLogBuilder is a helper to build an Arrow array containing a collection of encoded logs. +type EncodedLogBuilder struct { + released bool + + builder *array.StructBuilder // encoded log value builder + + logTypeBuilder *acommon.AdaptiveDictionaryBuilder // log type builder + svlb *array.ListBuilder // str variable lists builder + svsb *acommon.AdaptiveDictionaryBuilder // str variable builder + ivlb *array.ListBuilder // i64 variable lists builder + ivsb *array.Int64Builder // i64 variable builder + fvlb *array.ListBuilder // f64 variable lists builder + fvsb *array.Float64Builder // f64 variable builder + + logCompressor *LogCompressor +} + +// EncodedLogBuilderFrom creates a new EncodedLogBuilderFrom from an existing StructBuilder. +func EncodedLogBuilderFrom(elb *array.StructBuilder, cfg *common.LogConfig) *EncodedLogBuilder { + return &EncodedLogBuilder{ + released: false, + builder: elb, + logTypeBuilder: acommon.AdaptiveDictionaryBuilderFrom(elb.FieldBuilder(0)), + svlb: elb.FieldBuilder(1).(*array.ListBuilder), + svsb: acommon.AdaptiveDictionaryBuilderFrom(elb.FieldBuilder(1).(*array.ListBuilder).ValueBuilder()), + ivlb: elb.FieldBuilder(2).(*array.ListBuilder), + ivsb: elb.FieldBuilder(2).(*array.ListBuilder).ValueBuilder().(*array.Int64Builder), + fvlb: elb.FieldBuilder(3).(*array.ListBuilder), + fvsb: elb.FieldBuilder(3).(*array.ListBuilder).ValueBuilder().(*array.Float64Builder), + logCompressor: NewLogCompressor(cfg), + } +} + +// Build builds the "encoded log" Arrow array. +// +// Once the returned array is no longer needed, Release() must be called to free the +// memory allocated by the array. +func (b *EncodedLogBuilder) Build() (*array.Struct, error) { + if b.released { + return nil, fmt.Errorf("encoded log builder already released") + } + + defer b.Release() + return b.builder.NewStructArray(), nil +} + +func (b *EncodedLogBuilder) AppendNull() (err error) { + if b.released { + err = fmt.Errorf("encoded log builder already released") + return + } + + b.builder.AppendNull() + return +} + +// Append appends a new any value to the builder. +func (b *EncodedLogBuilder) Append(log string) (err error) { + if b.released { + return fmt.Errorf("encoded log builder already released") + } + + b.builder.Append(true) + + encodedLog := b.logCompressor.Compress(log) + if len(encodedLog.LogType) == 0 { + b.logTypeBuilder.AppendNull() + } else { + err = b.logTypeBuilder.AppendString(encodedLog.LogType) + if err != nil { + return + } + } + + if len(encodedLog.DictVars) == 0 { + b.svlb.AppendNull() + } else { + b.svlb.Append(true) + b.svlb.Reserve(len(encodedLog.DictVars)) + for _, v := range encodedLog.DictVars { + err = b.svsb.AppendString(v) + if err != nil { + return + } + } + } + + // ToDo - add i64 and f64 vars + b.ivlb.AppendNull() + b.fvlb.AppendNull() + + return err +} + +// Release releases the memory allocated by the builder. +func (b *EncodedLogBuilder) Release() { + if !b.released { + b.builder.Release() + + b.released = true + } +} + +func (b *EncodedLogBuilder) LogConfig() *common.LogConfig { + return b.logCompressor.Config() +} diff --git a/pkg/otel/logs/arrow/log_record.go b/pkg/otel/logs/arrow/log_record.go index f517f4c5..717044a3 100644 --- a/pkg/otel/logs/arrow/log_record.go +++ b/pkg/otel/logs/arrow/log_record.go @@ -6,8 +6,10 @@ import ( "github.com/apache/arrow/go/v11/arrow" "github.com/apache/arrow/go/v11/arrow/array" "github.com/apache/arrow/go/v11/arrow/memory" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "github.com/f5/otel-arrow-adapter/pkg/otel/common" acommon "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow" "github.com/f5/otel-arrow-adapter/pkg/otel/constants" ) @@ -26,6 +28,7 @@ var ( {Name: constants.ATTRIBUTES, Type: acommon.AttributesDT}, {Name: constants.DROPPED_ATTRIBUTES_COUNT, Type: arrow.PrimitiveTypes.Uint32}, {Name: constants.FLAGS, Type: arrow.PrimitiveTypes.Uint32}, + {Name: constants.ENCODED_STR_BODY, Type: EncodedLogDT}, }...) ) @@ -45,18 +48,19 @@ type LogRecordBuilder struct { ab *acommon.AttributesBuilder // attributes builder dacb *array.Uint32Builder // dropped attributes count builder fb *array.Uint32Builder // flags builder + elb *EncodedLogBuilder // encoded log body builder } // NewLogRecordBuilder creates a new LogRecordBuilder with a given allocator. // // Once the builder is no longer needed, Release() must be called to free the // memory allocated by the builder. -func NewLogRecordBuilder(pool memory.Allocator) *LogRecordBuilder { +func NewLogRecordBuilder(pool memory.Allocator, logConfig *common.LogConfig) *LogRecordBuilder { sb := array.NewStructBuilder(pool, LogRecordDT) - return LogRecordBuilderFrom(sb) + return LogRecordBuilderFrom(sb, logConfig) } -func LogRecordBuilderFrom(sb *array.StructBuilder) *LogRecordBuilder { +func LogRecordBuilderFrom(sb *array.StructBuilder, logConfig *common.LogConfig) *LogRecordBuilder { return &LogRecordBuilder{ released: false, builder: sb, @@ -70,6 +74,7 @@ func LogRecordBuilderFrom(sb *array.StructBuilder) *LogRecordBuilder { ab: acommon.AttributesBuilderFrom(sb.FieldBuilder(7).(*array.MapBuilder)), dacb: sb.FieldBuilder(8).(*array.Uint32Builder), fb: sb.FieldBuilder(9).(*array.Uint32Builder), + elb: EncodedLogBuilderFrom(sb.FieldBuilder(10).(*array.StructBuilder), logConfig), } } @@ -112,9 +117,25 @@ func (b *LogRecordBuilder) Append(log plog.LogRecord) error { return err } } - if err := b.bb.Append(log.Body()); err != nil { - return err + + // If the log body is a string and log compressor is enabled, we encode the + // log body and store it in the encoded log body field otherwise we store + // the log body in the body field. + body := log.Body() + if body.Type() == pcommon.ValueTypeStr && b.elb.LogConfig() != nil { + b.bb.AppendNull() + if err := b.elb.Append(body.AsString()); err != nil { + return err + } + } else { + if err := b.bb.Append(body); err != nil { + return err + } + if err := b.elb.AppendNull(); err != nil { + return err + } } + if err := b.ab.Append(log.Attributes()); err != nil { return err } diff --git a/pkg/otel/logs/arrow/logs.go b/pkg/otel/logs/arrow/logs.go index 5cb03085..ae744c9f 100644 --- a/pkg/otel/logs/arrow/logs.go +++ b/pkg/otel/logs/arrow/logs.go @@ -8,6 +8,7 @@ import ( "github.com/apache/arrow/go/v11/arrow/memory" "go.opentelemetry.io/collector/pdata/plog" + "github.com/f5/otel-arrow-adapter/pkg/otel/common" acommon "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow" "github.com/f5/otel-arrow-adapter/pkg/otel/constants" ) @@ -27,10 +28,12 @@ type LogsBuilder struct { builder *array.RecordBuilder // Record builder rlb *array.ListBuilder // ResourceLogs list builder rlp *ResourceLogsBuilder // resource logs builder + + config *common.LogConfig } // NewLogsBuilder creates a new LogsBuilder with a given allocator. -func NewLogsBuilder(pool memory.Allocator, schema *acommon.AdaptiveSchema) (*LogsBuilder, error) { +func NewLogsBuilder(pool memory.Allocator, schema *acommon.AdaptiveSchema, logConfig *common.LogConfig) (*LogsBuilder, error) { builder := array.NewRecordBuilder(pool, schema.Schema()) err := schema.InitDictionaryBuilders(builder) if err != nil { @@ -45,7 +48,8 @@ func NewLogsBuilder(pool memory.Allocator, schema *acommon.AdaptiveSchema) (*Log schema: schema, builder: builder, rlb: rlb, - rlp: ResourceLogsBuilderFrom(rlb.ValueBuilder().(*array.StructBuilder)), + rlp: ResourceLogsBuilderFrom(rlb.ValueBuilder().(*array.StructBuilder), logConfig), + config: logConfig, }, nil } diff --git a/pkg/otel/logs/arrow/resource_logs.go b/pkg/otel/logs/arrow/resource_logs.go index 9b394de1..23d41864 100644 --- a/pkg/otel/logs/arrow/resource_logs.go +++ b/pkg/otel/logs/arrow/resource_logs.go @@ -8,6 +8,7 @@ import ( "github.com/apache/arrow/go/v11/arrow/memory" "go.opentelemetry.io/collector/pdata/plog" + "github.com/f5/otel-arrow-adapter/pkg/otel/common" acommon "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow" "github.com/f5/otel-arrow-adapter/pkg/otel/constants" ) @@ -36,20 +37,20 @@ type ResourceLogsBuilder struct { // // Once the builder is no longer needed, Build() or Release() must be called to free the // memory allocated by the builder. -func NewResourceLogsBuilder(pool memory.Allocator) *ResourceLogsBuilder { +func NewResourceLogsBuilder(pool memory.Allocator, logConfig *common.LogConfig) *ResourceLogsBuilder { builder := array.NewStructBuilder(pool, ResourceLogsDT) - return ResourceLogsBuilderFrom(builder) + return ResourceLogsBuilderFrom(builder, logConfig) } // ResourceLogsBuilderFrom creates a new ResourceLogsBuilder from an existing builder. -func ResourceLogsBuilderFrom(builder *array.StructBuilder) *ResourceLogsBuilder { +func ResourceLogsBuilderFrom(builder *array.StructBuilder, logConfig *common.LogConfig) *ResourceLogsBuilder { return &ResourceLogsBuilder{ released: false, builder: builder, rb: acommon.ResourceBuilderFrom(builder.FieldBuilder(0).(*array.StructBuilder)), schb: acommon.AdaptiveDictionaryBuilderFrom(builder.FieldBuilder(1)), slsb: builder.FieldBuilder(2).(*array.ListBuilder), - slb: ScopeLogsBuilderFrom(builder.FieldBuilder(2).(*array.ListBuilder).ValueBuilder().(*array.StructBuilder)), + slb: ScopeLogsBuilderFrom(builder.FieldBuilder(2).(*array.ListBuilder).ValueBuilder().(*array.StructBuilder), logConfig), } } diff --git a/pkg/otel/logs/arrow/scope_logs.go b/pkg/otel/logs/arrow/scope_logs.go index 4a8e96c5..d0486565 100644 --- a/pkg/otel/logs/arrow/scope_logs.go +++ b/pkg/otel/logs/arrow/scope_logs.go @@ -8,6 +8,7 @@ import ( "github.com/apache/arrow/go/v11/arrow/memory" "go.opentelemetry.io/collector/pdata/plog" + "github.com/f5/otel-arrow-adapter/pkg/otel/common" acommon "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow" "github.com/f5/otel-arrow-adapter/pkg/otel/constants" ) @@ -37,19 +38,19 @@ type ScopeLogsBuilder struct { // // Once the builder is no longer needed, Release() must be called to free the // memory allocated by the builder. -func NewScopeLogsBuilder(pool memory.Allocator) *ScopeLogsBuilder { +func NewScopeLogsBuilder(pool memory.Allocator, logConfig *common.LogConfig) *ScopeLogsBuilder { builder := array.NewStructBuilder(pool, ScopeLogsDT) - return ScopeLogsBuilderFrom(builder) + return ScopeLogsBuilderFrom(builder, logConfig) } -func ScopeLogsBuilderFrom(builder *array.StructBuilder) *ScopeLogsBuilder { +func ScopeLogsBuilderFrom(builder *array.StructBuilder, logConfig *common.LogConfig) *ScopeLogsBuilder { return &ScopeLogsBuilder{ released: false, builder: builder, scb: acommon.ScopeBuilderFrom(builder.FieldBuilder(0).(*array.StructBuilder)), schb: acommon.AdaptiveDictionaryBuilderFrom(builder.FieldBuilder(1)), lrsb: builder.FieldBuilder(2).(*array.ListBuilder), - lrb: LogRecordBuilderFrom(builder.FieldBuilder(2).(*array.ListBuilder).ValueBuilder().(*array.StructBuilder)), + lrb: LogRecordBuilderFrom(builder.FieldBuilder(2).(*array.ListBuilder).ValueBuilder().(*array.StructBuilder), logConfig), } } diff --git a/pkg/otel/logs/otlp/decompress.go b/pkg/otel/logs/otlp/decompress.go new file mode 100644 index 00000000..d201ea81 --- /dev/null +++ b/pkg/otel/logs/otlp/decompress.go @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package otlp + +import ( + "fmt" + "strconv" + "strings" + + "github.com/f5/otel-arrow-adapter/pkg/otel/common" +) + +func Decompress(seg *common.EncodedLog) string { + var log strings.Builder + x11 := false + x12 := false + x13 := false + + for _, c := range seg.LogType { + if !x11 && !x12 && c == '\x11' { + x11 = true + continue + } else if !x11 && !x12 && c == '\x12' { + x12 = true + continue + } else if !x11 && !x12 && c == '\x13' { + x13 = true + continue + } else { + if x11 { + log.WriteString(seg.DictVars[int(c)]) + x11 = false + } else if x12 { + log.WriteString(strconv.FormatInt(seg.IntVars[int(c)], 10)) + x12 = false + } else if x13 { + log.WriteString(fmt.Sprintf("%f", seg.FloatVars[int(c)])) + x13 = false + } else { + log.WriteRune(c) + } + } + } + return log.String() +} diff --git a/pkg/otel/logs/validation_test.go b/pkg/otel/logs/validation_test.go index 8e765cde..eda6c21c 100644 --- a/pkg/otel/logs/validation_test.go +++ b/pkg/otel/logs/validation_test.go @@ -25,6 +25,7 @@ import ( "github.com/f5/otel-arrow-adapter/pkg/datagen" "github.com/f5/otel-arrow-adapter/pkg/otel/assert" + "github.com/f5/otel-arrow-adapter/pkg/otel/common" acommon "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow" logsarrow "github.com/f5/otel-arrow-adapter/pkg/otel/logs/arrow" logsotlp "github.com/f5/otel-arrow-adapter/pkg/otel/logs/otlp" @@ -48,7 +49,7 @@ func TestConversionFromSyntheticData(t *testing.T) { defer pool.AssertSize(t, 0) logsSchema := acommon.NewAdaptiveSchema(logsarrow.Schema) defer logsSchema.Release() - lb, err := logsarrow.NewLogsBuilder(pool, logsSchema) + lb, err := logsarrow.NewLogsBuilder(pool, logsSchema, common.DefaultLogConfig()) require.NoError(t, err) err = lb.Append(expectedRequest.Logs()) require.NoError(t, err) diff --git a/tools/logs_benchmark/main.go b/tools/logs_benchmark/main.go index be987071..55711aa3 100644 --- a/tools/logs_benchmark/main.go +++ b/tools/logs_benchmark/main.go @@ -50,7 +50,7 @@ func main() { for i := range inputFiles { // Compare the performance between the standard OTLP representation and the OTLP Arrow representation. //profiler := benchmark.NewProfiler([]int{1000, 5000, 10000, 25000}) - profiler := benchmark.NewProfiler([]int{10000}, "output/logs_benchmark.log", 2) + profiler := benchmark.NewProfiler([]int{1000, 2000}, "output/logs_benchmark.log", 2) compressionAlgo := benchmark.Zstd() maxIter := uint64(3) profiler.Printf("Dataset '%s'\n", inputFiles[i]) diff --git a/tools/logs_gen/main.go b/tools/logs_gen/main.go index 6c786db6..fcf07ed1 100644 --- a/tools/logs_gen/main.go +++ b/tools/logs_gen/main.go @@ -33,7 +33,7 @@ import ( var help = flag.Bool("help", false, "Show help") var outputFile = "./data/otlp_logs.pb" -var batchSize = 1000 +var batchSize = 10000 func main() { // Define the flags.