Skip to content

CLP-inspired log compression/decompression #69

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion pkg/otel/arrow_record/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"

colarspb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1"
"github.com/f5/otel-arrow-adapter/pkg/otel/common"
acommon "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow"
logsarrow "github.com/f5/otel-arrow-adapter/pkg/otel/logs/arrow"
metricsarrow "github.com/f5/otel-arrow-adapter/pkg/otel/metrics/arrow"
Expand All @@ -53,6 +54,9 @@ type Producer struct {
metricsSchema *acommon.AdaptiveSchema
logsSchema *acommon.AdaptiveSchema
tracesSchema *acommon.AdaptiveSchema
// CLP like log optimization
// see [CLP: Efficient and Scalable Search on Compressed Text Logs](https://www.usenix.org/system/files/osdi21-rodrigues.pdf)
logOptimization *common.LogConfig
}

type streamProducer struct {
Expand All @@ -65,6 +69,7 @@ type Config struct {
pool memory.Allocator
initIndexSize uint64
limitIndexSize uint64
logConfig *common.LogConfig
}

type Option func(*Config)
Expand All @@ -84,6 +89,7 @@ func NewProducerWithOptions(options ...Option) *Producer {
pool: memory.NewGoAllocator(),
initIndexSize: math.MaxUint16,
limitIndexSize: math.MaxUint16,
logConfig: common.DefaultLogConfig(),
}
for _, opt := range options {
opt(cfg)
Expand All @@ -104,6 +110,7 @@ func NewProducerWithOptions(options ...Option) *Producer {
tracesarrow.Schema,
acommon.WithDictInitIndexSize(cfg.initIndexSize),
acommon.WithDictLimitIndexSize(cfg.limitIndexSize)),
logOptimization: cfg.logConfig,
}
}

Expand Down Expand Up @@ -134,7 +141,7 @@ func (p *Producer) BatchArrowRecordsFromLogs(ls plog.Logs) (*colarspb.BatchArrow
// Note: The record returned is wrapped into a RecordMessage and will
// be released by the Producer.Produce method.
record, err := RecordBuilder[plog.Logs](func() (acommon.EntityBuilder[plog.Logs], error) {
return logsarrow.NewLogsBuilder(p.pool, p.logsSchema)
return logsarrow.NewLogsBuilder(p.pool, p.logsSchema, p.logOptimization)
}, ls)
if err != nil {
return nil, err
Expand Down Expand Up @@ -361,3 +368,15 @@ func WithUint64LimitDictIndex() Option {
cfg.limitIndexSize = math.MaxUint64
}
}

func WithLogOptimization(logConfig *common.LogConfig) Option {
return func(cfg *Config) {
cfg.logConfig = logConfig
}
}

func WithNoLogOptimization() Option {
return func(cfg *Config) {
cfg.logConfig = nil
}
}
4 changes: 4 additions & 0 deletions pkg/otel/common/arrow/any_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func (b *AnyValueBuilder) Append(av pcommon.Value) error {
return err
}

func (b *AnyValueBuilder) AppendNull() {
b.builder.AppendNull()
}

// Release releases the memory allocated by the builder.
func (b *AnyValueBuilder) Release() {
if !b.released {
Expand Down
17 changes: 16 additions & 1 deletion pkg/otel/common/arrow/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,14 @@ func updateField(f *arrow.Field, dictMap map[*arrow.DictionaryType]*arrow.Dictio

func getDictionaryArray(arr arrow.Array, ids []int) *array.Dictionary {
if len(ids) == 0 {
return arr.(*array.Dictionary)
switch b := arr.(type) {
case *array.Dictionary:
return b
case *array.List:
return getDictionaryArray(b.ListValues(), ids)
default:
panic("getDictionaryArray: unsupported array type `" + arr.DataType().Name() + "`")
}
}

switch arr := arr.(type) {
Expand Down Expand Up @@ -409,6 +416,14 @@ func getDictionaryArray(arr arrow.Array, ids []int) *array.Dictionary {

func getDictionaryBuilder(builder array.Builder, ids []int) array.DictionaryBuilder {
if len(ids) == 0 {
switch b := builder.(type) {
case array.DictionaryBuilder:
return b
case *array.ListBuilder:
return getDictionaryBuilder(b.ValueBuilder(), ids)
default:
panic("getDictionaryBuilder: unsupported builder type `" + b.Type().Name() + "`")
}
return builder.(array.DictionaryBuilder)
}

Expand Down
50 changes: 50 additions & 0 deletions pkg/otel/common/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package common

// LogConfig is a configuration for the log compressor (CLP inspired log optimization).
// See [CLP: Efficient and Scalable Search on Compressed Text Logs](https://www.usenix.org/system/files/osdi21-rodrigues.pdf)
type LogConfig struct {
Delimiter string
DictVars []string
}

func DefaultLogConfig() *LogConfig {
return &LogConfig{
Delimiter: " \\t\\r\\n!\"#$%&'\\(\\)\\*,:;<>?@\\[\\]\\^_`\\{\\|\\}~",
DictVars: []string{
"0x[0-9a-fA-F]+", // Hexadecimal identifier
"\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}", // ip v4 address
"\\[([0-9a-fA-F]{1,4}:){7}([0-9a-fA-F]){1,4}\\]", // ip v6 address
".+\\d.+", // words and identifiers ending by a number (e.g. task-123, container-123)
".*=.*[a-zA-Z0-9].*", // key=value pair with alphanumeric value
},
}
}

// EncodedLog represents a log record into 3 parts:
// - LogType: the log type, which is the static pattern detected by the compressor (highly repetitive).
// - DictVars: the dictionary variables, which are the variables that are repeated in the log.
// - IntVars: non-dictionary variables which are int 64 numbers (potentially high cardinality values).
// - FloatVars: non-dictionary variables which are float 64 numbers (potentially high cardinality values).
type EncodedLog struct {
LogType string
DictVars []string
IntVars []int64
FloatVars []float64
}
1 change: 1 addition & 0 deletions pkg/otel/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const NAME string = "name"
const KIND string = "kind"
const VERSION string = "version"
const BODY string = "body"
const ENCODED_STR_BODY string = "encoded_str_body"
const STATUS string = "status"
const DESCRIPTION string = "description"
const UNIT string = "unit"
Expand Down
9 changes: 5 additions & 4 deletions pkg/otel/logs/arrow/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/f5/otel-arrow-adapter/pkg/otel/common"
acommon "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow"
"github.com/f5/otel-arrow-adapter/pkg/otel/internal"
)
Expand All @@ -16,7 +17,7 @@ func TestLogRecord(t *testing.T) {

pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer pool.AssertSize(t, 0)
sb := NewLogRecordBuilder(pool)
sb := NewLogRecordBuilder(pool, nil)

if err := sb.Append(LogRecord1()); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -47,7 +48,7 @@ func TestScopeLogs(t *testing.T) {

pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer pool.AssertSize(t, 0)
ssb := NewScopeLogsBuilder(pool)
ssb := NewScopeLogsBuilder(pool, nil)

if err := ssb.Append(ScopeLogs1()); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestResourceLogs(t *testing.T) {

pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer pool.AssertSize(t, 0)
rsb := NewResourceLogsBuilder(pool)
rsb := NewResourceLogsBuilder(pool, nil)

if err := rsb.Append(ResourceLogs1()); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -111,7 +112,7 @@ func TestLogs(t *testing.T) {
defer pool.AssertSize(t, 0)
logsSchema := acommon.NewAdaptiveSchema(Schema)
defer logsSchema.Release()
tb, err := NewLogsBuilder(pool, logsSchema)
tb, err := NewLogsBuilder(pool, logsSchema, common.DefaultLogConfig())
require.NoError(t, err)

err = tb.Append(Logs())
Expand Down
164 changes: 164 additions & 0 deletions pkg/otel/logs/arrow/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package arrow

import (
"fmt"
"regexp"
"strconv"
"strings"

"github.com/f5/otel-arrow-adapter/pkg/otel/common"
)

type LogCompressor struct {
config *common.LogConfig
delimiter *regexp.Regexp
dictVars []*regexp.Regexp
nonDictVars []*regexp.Regexp
}

func NewLogCompressor(config *common.LogConfig) *LogCompressor {
var (
delimiter *regexp.Regexp
dictVars []*regexp.Regexp
)

if config != nil {
delimiter = regexp.MustCompile(fmt.Sprintf("[%s]+", config.Delimiter))

dictVars = make([]*regexp.Regexp, len(config.DictVars))
for i, pattern := range config.DictVars {
dictVars[i] = regexp.MustCompile(fmt.Sprintf("^%s$", pattern))
}
}

return &LogCompressor{
config: config,
delimiter: delimiter,
dictVars: dictVars,
}
}

func (lc *LogCompressor) Config() *common.LogConfig {
return lc.config
}

func (lc *LogCompressor) Compress(log string) *common.EncodedLog {
if lc.config == nil {
return &common.EncodedLog{
LogType: log,
}
}

var (
logTypeBuf strings.Builder
tokenBuf strings.Builder
dictVars []string
intVars []int64
floatVars []float64
)

delimiterIndices := lc.delimiter.FindAllStringIndex(log, -1)

if len(delimiterIndices) == 0 {
// No delimiters found, return the whole log as a log type
return &common.EncodedLog{
LogType: log,
}
}

// Add an artificial delimiter at the end of the log to simplify
// the logic of the main loop.
delimiterIndices = append(delimiterIndices, []int{len(log), len(log)})

curDelimiter := 0
inToken := true
if delimiterIndices[curDelimiter][0] == 0 {
inToken = false
}

// iterate over the log characters (single pass)
for pos, c := range log {
if c == '\x11' || c == '\x12' {
// skip these characters
continue
}

if pos == delimiterIndices[curDelimiter][0] { // left delimiter
token := tokenBuf.String()
tokenBuf.Reset()
if varFound := lc.extractVariable(&token, &logTypeBuf, &intVars, &floatVars, &dictVars); !varFound {
logTypeBuf.WriteString(token)
}
inToken = false
} else if pos == delimiterIndices[curDelimiter][1] { // right delimiter
curDelimiter++
inToken = true
}

// Write current character to the current segment (token or log type)
if inToken {
tokenBuf.WriteRune(c)
} else {
logTypeBuf.WriteRune(c)
}
}

// In case of a trailing token
if tokenBuf.Len() > 0 {
token := tokenBuf.String()
if varFound := lc.extractVariable(&token, &logTypeBuf, &intVars, &floatVars, &dictVars); !varFound {
logTypeBuf.WriteString(token)
}
}

return &common.EncodedLog{
LogType: logTypeBuf.String(),
DictVars: dictVars,
IntVars: intVars,
FloatVars: floatVars,
}
}

func (lc *LogCompressor) extractVariable(text *string, logType *strings.Builder, intVars *[]int64, floatVars *[]float64, dictVars *[]string) bool {
if i64V, err := strconv.ParseInt(*text, 10, 64); err == nil {
logType.WriteRune('\x12')
logType.WriteRune(rune(len(*intVars)))
*intVars = append(*intVars, i64V)
return true
}

if f64V, err := strconv.ParseFloat(*text, 64); err == nil {
logType.WriteRune('\x13')
logType.WriteRune(rune(len(*floatVars)))
*floatVars = append(*floatVars, f64V)
return true
}

for _, regex := range lc.dictVars {
if regex.Match([]byte(*text)) {
logType.WriteRune('\x11')
logType.WriteRune(rune(len(*dictVars)))
*dictVars = append(*dictVars, *text)
return true
}
}

return false
}
Loading