Skip to content

refactor(stringlabels): Support stringlabels in chunkenc #17263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,7 @@ func newBufferedIterator(ctx context.Context, pool compression.ReaderPool, b []b
pool: pool,
format: format,
symbolizer: symbolizer,
currStructuredMetadata: structuredMetadataPool.Get().(labels.Labels),
}
}

Expand Down Expand Up @@ -1443,14 +1444,14 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
if err != nil {
if err != io.EOF {
si.err = err
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
if si.readBufValid == 0 { // Got EOF and no data in the buffer.
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time.
si.err = fmt.Errorf("invalid data in chunk")
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
}
var l uint64
Expand All @@ -1465,7 +1466,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {

if lineSize >= maxLineLength {
si.err = fmt.Errorf("line too long %d, maximum %d", lineSize, maxLineLength)
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
// If the buffer is not yet initialize or too small, we get a new one.
if si.buf == nil || lineSize > cap(si.buf) {
Expand All @@ -1476,7 +1477,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
si.buf = BytesBufferPool.Get(lineSize).([]byte)
if lineSize > cap(si.buf) {
si.err = fmt.Errorf("could not get a line buffer of size %d, actual %d", lineSize, cap(si.buf))
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
}
si.buf = si.buf[:lineSize]
Expand All @@ -1496,7 +1497,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
continue
}
si.err = err
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
}

Expand All @@ -1505,7 +1506,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
if si.format < ChunkFormatV4 {
si.stats.AddDecompressedBytes(decompressedBytes)
si.stats.AddDecompressedLines(1)
return ts, si.buf[:lineSize], nil, true
return ts, si.buf[:lineSize], labels.EmptyLabels(), true
}

lastAttempt = 0
Expand All @@ -1516,14 +1517,14 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
if err != nil {
if err != io.EOF {
si.err = err
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
if si.readBufValid == 0 { // Got EOF and no data in the buffer.
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time.
si.err = fmt.Errorf("invalid data in chunk")
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
}
var l uint64
Expand Down Expand Up @@ -1573,7 +1574,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
si.symbolsBuf = SymbolsPool.Get(nSymbols).([]symbol)
if nSymbols > cap(si.symbolsBuf) {
si.err = fmt.Errorf("could not get a symbols matrix of size %d, actual %d", nSymbols, cap(si.symbolsBuf))
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
}

Expand All @@ -1589,14 +1590,14 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
if err != nil {
if err != io.EOF {
si.err = err
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
if si.readBufValid == 0 { // Got EOF and no data in the buffer.
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time.
si.err = fmt.Errorf("invalid data in chunk")
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
}
sName, nWidth = binary.Uvarint(si.readBuf[:si.readBufValid])
Expand All @@ -1615,7 +1616,8 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
si.stats.AddDecompressedStructuredMetadataBytes(decompressedStructuredMetadataBytes)
si.stats.AddDecompressedBytes(decompressedBytes + decompressedStructuredMetadataBytes)

return ts, si.buf[:lineSize], si.symbolizer.Lookup(si.symbolsBuf[:nSymbols], si.currStructuredMetadata), true
labelsBuilder := log.NewBufferedLabelsBuilder(si.currStructuredMetadata)
return ts, si.buf[:lineSize], si.symbolizer.Lookup(si.symbolsBuf[:nSymbols], labelsBuilder), true
}

func (si *bufferedIterator) Err() error { return si.err }
Expand Down Expand Up @@ -1644,9 +1646,9 @@ func (si *bufferedIterator) close() {
si.symbolsBuf = nil
}

if si.currStructuredMetadata != nil {
if !si.currStructuredMetadata.IsEmpty() {
structuredMetadataPool.Put(si.currStructuredMetadata) // nolint:staticcheck
si.currStructuredMetadata = nil
si.currStructuredMetadata = labels.EmptyLabels() // TODO: maybe a reset would be better
}

si.origBytes = nil
Expand Down
1 change: 1 addition & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
for iter.Next() {
_ = iter.At()
}
iter.Close()
}
})
}
Expand Down
29 changes: 16 additions & 13 deletions pkg/chunkenc/symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/util"
)

Expand Down Expand Up @@ -58,16 +59,18 @@ func (s *symbolizer) Reset() {

// Add adds new labels pairs to the collection and returns back a symbol for each existing and new label pair
func (s *symbolizer) Add(lbls labels.Labels) symbols {
if len(lbls) == 0 {
if lbls.IsEmpty() {
return nil
}

syms := make([]symbol, len(lbls))
syms := make([]symbol, 0, lbls.Len())

for i, label := range lbls {
syms[i].Name = s.add(label.Name)
syms[i].Value = s.add(label.Value)
}
lbls.Range(func(label labels.Label) {
syms = append(syms, symbol{
Name: s.add(label.Name),
Value: s.add(label.Value),
})
})

return syms
}
Expand Down Expand Up @@ -97,20 +100,20 @@ func (s *symbolizer) add(lbl string) uint32 {
}

// Lookup coverts and returns labels pairs for the given symbols
func (s *symbolizer) Lookup(syms symbols, buf labels.Labels) labels.Labels {
func (s *symbolizer) Lookup(syms symbols, buf *log.BufferedLabelsBuilder) labels.Labels {
if len(syms) == 0 {
return nil
return labels.EmptyLabels()
}

if buf == nil {
buf = structuredMetadataPool.Get().(labels.Labels)
structuredMetadata := structuredMetadataPool.Get().(labels.Labels)
buf = log.NewBufferedLabelsBuilder(structuredMetadata)
}
buf = buf[:0]

for _, symbol := range syms {
buf = append(buf, labels.Label{Name: s.lookup(symbol.Name), Value: s.lookup(symbol.Value)})
buf.Add(labels.Label{Name: s.lookup(symbol.Name), Value: s.lookup(symbol.Value)})
}

return buf
return buf.Labels()
}

func (s *symbolizer) lookup(idx uint32) string {
Expand Down
71 changes: 22 additions & 49 deletions pkg/chunkenc/symbols_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,11 @@ func TestSymbolizer(t *testing.T) {
{
name: "no duplicate labels",
labelsToAdd: []labels.Labels{
{
labels.Label{
Name: "foo",
Value: "bar",
},
},
{
labels.Label{
Name: "fizz",
Value: "buzz",
},
labels.Label{
Name: "ping",
Value: "pong",
},
},
labels.FromStrings("foo", "bar"),
labels.FromStrings(
"fizz", "buzz",
"ping", "pong",
),
},
expectedSymbols: []symbols{
{
Expand All @@ -71,30 +60,15 @@ func TestSymbolizer(t *testing.T) {
{
name: "with duplicate labels",
labelsToAdd: []labels.Labels{
{
labels.Label{
Name: "foo",
Value: "bar",
},
{
Name: "bar",
Value: "foo",
},
},
{
labels.Label{
Name: "foo",
Value: "bar",
},
labels.Label{
Name: "fizz",
Value: "buzz",
},
labels.Label{
Name: "ping",
Value: "pong",
},
},
labels.FromStrings(
"foo", "bar",
"bar", "foo",
),
labels.FromStrings(
"foo", "bar",
"fizz", "buzz",
"ping", "pong",
),
},
expectedSymbols: []symbols{
{
Expand All @@ -108,14 +82,14 @@ func TestSymbolizer(t *testing.T) {
},
},
{
symbol{
Name: 0,
Value: 1,
},
symbol{
Name: 2,
Value: 3,
},
symbol{
Name: 1,
Value: 0,
},
symbol{
Name: 4,
Value: 5,
Expand All @@ -130,10 +104,10 @@ func TestSymbolizer(t *testing.T) {
for _, encoding := range testEncodings {
t.Run(fmt.Sprintf("%s - %s", tc.name, encoding), func(t *testing.T) {
s := newSymbolizer()
for i, labels := range tc.labelsToAdd {
symbols := s.Add(labels)
for i, lbls := range tc.labelsToAdd {
symbols := s.Add(lbls)
require.Equal(t, tc.expectedSymbols[i], symbols)
require.Equal(t, labels, s.Lookup(symbols, nil))
require.Equal(t, lbls, s.Lookup(symbols, nil))
}

// Test that Lookup returns empty labels if no symbols are provided.
Expand All @@ -144,8 +118,7 @@ func TestSymbolizer(t *testing.T) {
Value: 0,
},
}, nil)
require.Equal(t, "", ret[0].Name)
require.Equal(t, "", ret[0].Value)
require.Equal(t, `{""=""}`, ret.String())
}

require.Equal(t, tc.expectedNumLabels, len(s.labels))
Expand Down
26 changes: 12 additions & 14 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (e *nsEntries) ValueAtDimension(_ uint64) int64 {
func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata labels.Labels) (bool, error) {
if hb.format < UnorderedWithStructuredMetadataHeadBlockFmt {
// structuredMetadata must be ignored for the previous head block formats
structuredMetadata = nil
structuredMetadata = labels.EmptyLabels()
}
// This is an allocation hack. The rangetree lib does not
// support the ability to pass a "mutate" function during an insert
Expand Down Expand Up @@ -155,17 +155,17 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata l
}

hb.size += len(line)
hb.size += len(structuredMetadata) * 2 * 4 // 4 bytes per label and value pair as structuredMetadataSymbols
hb.size += structuredMetadata.Len() * 2 * 4 // 4 bytes per label and value pair as structuredMetadataSymbols
hb.lines++

return false, nil
}

func metaLabelsLen(metaLabels labels.Labels) int {
length := 0
for _, label := range metaLabels {
metaLabels.Range(func(label labels.Label) {
length += len(label.Name) + len(label.Value)
}
})
return length
}

Expand Down Expand Up @@ -252,14 +252,15 @@ func (hb *unorderedHeadBlock) Iterator(ctx context.Context, direction logproto.D
// cutting of blocks.
streams := map[string]*logproto.Stream{}
baseHash := pipeline.BaseLabels().Hash()
var structuredMetadata labels.Labels
structuredMetadata := structuredMetadataPool.Get().(labels.Labels)
labelsBuilder := log.NewBufferedLabelsBuilder(structuredMetadata)
_ = hb.forEntries(
ctx,
direction,
mint,
maxt,
func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, structuredMetadata)
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, labelsBuilder)
newLine, parsedLbs, matches := pipeline.ProcessString(ts, line, structuredMetadata)
if !matches {
return nil
Expand Down Expand Up @@ -298,9 +299,7 @@ func (hb *unorderedHeadBlock) Iterator(ctx context.Context, direction logproto.D
}

return iter.EntryIteratorWithClose(iter.NewStreamsIterator(streamsResult, direction), func() error {
if structuredMetadata != nil {
structuredMetadataPool.Put(structuredMetadata) // nolint:staticcheck
}
structuredMetadataPool.Put(structuredMetadata) // nolint:staticcheck
return nil
})
}
Expand All @@ -313,14 +312,15 @@ func (hb *unorderedHeadBlock) SampleIterator(
) iter.SampleIterator {
series := map[string]*logproto.Series{}
setQueryReferencedStructuredMetadata := false
var structuredMetadata labels.Labels
structuredMetadata := structuredMetadataPool.Get().(labels.Labels)
labelsBuilder := log.NewBufferedLabelsBuilder(structuredMetadata)
_ = hb.forEntries(
ctx,
logproto.FORWARD,
mint,
maxt,
func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, structuredMetadata)
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, labelsBuilder)

for _, extractor := range extractor {
value, lbls, ok := extractor.ProcessString(ts, line, structuredMetadata)
Expand Down Expand Up @@ -372,9 +372,7 @@ func (hb *unorderedHeadBlock) SampleIterator(
for _, s := range series {
SamplesPool.Put(s.Samples)
}
if structuredMetadata != nil {
structuredMetadataPool.Put(structuredMetadata) // nolint:staticcheck
}
structuredMetadataPool.Put(structuredMetadata) // nolint:staticcheck
return nil
})
}
Expand Down
Loading
Loading