Skip to content
Merged
35 changes: 18 additions & 17 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1443,14 +1443,14 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
if err != nil {
if err != io.EOF {
si.err = err
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
if si.readBufValid == 0 { // Got EOF and no data in the buffer.
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time.
si.err = fmt.Errorf("invalid data in chunk")
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
}
var l uint64
Expand All @@ -1465,7 +1465,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {

if lineSize >= maxLineLength {
si.err = fmt.Errorf("line too long %d, maximum %d", lineSize, maxLineLength)
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
// If the buffer is not yet initialize or too small, we get a new one.
if si.buf == nil || lineSize > cap(si.buf) {
Expand All @@ -1476,7 +1476,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
si.buf = BytesBufferPool.Get(lineSize).([]byte)
if lineSize > cap(si.buf) {
si.err = fmt.Errorf("could not get a line buffer of size %d, actual %d", lineSize, cap(si.buf))
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
}
si.buf = si.buf[:lineSize]
Expand All @@ -1496,7 +1496,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
continue
}
si.err = err
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
}

Expand All @@ -1505,7 +1505,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
if si.format < ChunkFormatV4 {
si.stats.AddDecompressedBytes(decompressedBytes)
si.stats.AddDecompressedLines(1)
return ts, si.buf[:lineSize], nil, true
return ts, si.buf[:lineSize], labels.EmptyLabels(), true
}

lastAttempt = 0
Expand All @@ -1516,14 +1516,14 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
if err != nil {
if err != io.EOF {
si.err = err
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
if si.readBufValid == 0 { // Got EOF and no data in the buffer.
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time.
si.err = fmt.Errorf("invalid data in chunk")
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
}
var l uint64
Expand Down Expand Up @@ -1573,7 +1573,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
si.symbolsBuf = SymbolsPool.Get(nSymbols).([]symbol)
if nSymbols > cap(si.symbolsBuf) {
si.err = fmt.Errorf("could not get a symbols matrix of size %d, actual %d", nSymbols, cap(si.symbolsBuf))
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
}

Expand All @@ -1589,14 +1589,14 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
if err != nil {
if err != io.EOF {
si.err = err
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
if si.readBufValid == 0 { // Got EOF and no data in the buffer.
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time.
si.err = fmt.Errorf("invalid data in chunk")
return 0, nil, nil, false
return 0, nil, labels.EmptyLabels(), false
}
}
sName, nWidth = binary.Uvarint(si.readBuf[:si.readBufValid])
Expand All @@ -1615,7 +1615,8 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
si.stats.AddDecompressedStructuredMetadataBytes(decompressedStructuredMetadataBytes)
si.stats.AddDecompressedBytes(decompressedBytes + decompressedStructuredMetadataBytes)

return ts, si.buf[:lineSize], si.symbolizer.Lookup(si.symbolsBuf[:nSymbols], si.currStructuredMetadata), true
labelsBuilder := log.NewBufferedLabelsBuilder(si.currStructuredMetadata)
return ts, si.buf[:lineSize], si.symbolizer.Lookup(si.symbolsBuf[:nSymbols], labelsBuilder), true
}

func (si *bufferedIterator) Err() error { return si.err }
Expand Down Expand Up @@ -1644,9 +1645,9 @@ func (si *bufferedIterator) close() {
si.symbolsBuf = nil
}

if si.currStructuredMetadata != nil {
if !si.currStructuredMetadata.IsEmpty() {
structuredMetadataPool.Put(si.currStructuredMetadata) // nolint:staticcheck
si.currStructuredMetadata = nil
si.currStructuredMetadata = labels.EmptyLabels() // TODO: maybe a reset would be better
}

si.origBytes = nil
Expand Down
1 change: 1 addition & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
for iter.Next() {
_ = iter.At()
}
iter.Close()
}
})
}
Expand Down
29 changes: 16 additions & 13 deletions pkg/chunkenc/symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/util"
)

Expand Down Expand Up @@ -58,16 +59,18 @@ func (s *symbolizer) Reset() {

// Add adds new labels pairs to the collection and returns back a symbol for each existing and new label pair
func (s *symbolizer) Add(lbls labels.Labels) symbols {
if len(lbls) == 0 {
if lbls.IsEmpty() {
return nil
}

syms := make([]symbol, len(lbls))
syms := make([]symbol, 0, lbls.Len())

for i, label := range lbls {
syms[i].Name = s.add(label.Name)
syms[i].Value = s.add(label.Value)
}
lbls.Range(func(label labels.Label) {
syms = append(syms, symbol{
Name: s.add(label.Name),
Value: s.add(label.Value),
})
})

return syms
}
Expand Down Expand Up @@ -97,20 +100,20 @@ func (s *symbolizer) add(lbl string) uint32 {
}

// Lookup coverts and returns labels pairs for the given symbols
func (s *symbolizer) Lookup(syms symbols, buf labels.Labels) labels.Labels {
func (s *symbolizer) Lookup(syms symbols, buf *log.BufferedLabelsBuilder) labels.Labels {
if len(syms) == 0 {
return nil
return labels.EmptyLabels()
}

if buf == nil {
buf = structuredMetadataPool.Get().(labels.Labels)
structuredMetadata := structuredMetadataPool.Get().(labels.Labels)
buf = log.NewBufferedLabelsBuilder(structuredMetadata)
}
buf = buf[:0]

for _, symbol := range syms {
buf = append(buf, labels.Label{Name: s.lookup(symbol.Name), Value: s.lookup(symbol.Value)})
buf.Add(labels.Label{Name: s.lookup(symbol.Name), Value: s.lookup(symbol.Value)})
}

return buf
return buf.Labels()
}

func (s *symbolizer) lookup(idx uint32) string {
Expand Down
71 changes: 22 additions & 49 deletions pkg/chunkenc/symbols_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,11 @@ func TestSymbolizer(t *testing.T) {
{
name: "no duplicate labels",
labelsToAdd: []labels.Labels{
{
labels.Label{
Name: "foo",
Value: "bar",
},
},
{
labels.Label{
Name: "fizz",
Value: "buzz",
},
labels.Label{
Name: "ping",
Value: "pong",
},
},
labels.FromStrings("foo", "bar"),
labels.FromStrings(
"fizz", "buzz",
"ping", "pong",
),
},
expectedSymbols: []symbols{
{
Expand All @@ -71,30 +60,15 @@ func TestSymbolizer(t *testing.T) {
{
name: "with duplicate labels",
labelsToAdd: []labels.Labels{
{
labels.Label{
Name: "foo",
Value: "bar",
},
{
Name: "bar",
Value: "foo",
},
},
{
labels.Label{
Name: "foo",
Value: "bar",
},
labels.Label{
Name: "fizz",
Value: "buzz",
},
labels.Label{
Name: "ping",
Value: "pong",
},
},
labels.FromStrings(
"foo", "bar",
"bar", "foo",
),
labels.FromStrings(
"foo", "bar",
"fizz", "buzz",
"ping", "pong",
),
},
expectedSymbols: []symbols{
{
Expand All @@ -108,14 +82,14 @@ func TestSymbolizer(t *testing.T) {
},
},
{
symbol{
Name: 0,
Value: 1,
},
symbol{
Name: 2,
Value: 3,
},
symbol{
Name: 1,
Value: 0,
},
symbol{
Name: 4,
Value: 5,
Expand All @@ -130,10 +104,10 @@ func TestSymbolizer(t *testing.T) {
for _, encoding := range testEncodings {
t.Run(fmt.Sprintf("%s - %s", tc.name, encoding), func(t *testing.T) {
s := newSymbolizer()
for i, labels := range tc.labelsToAdd {
symbols := s.Add(labels)
for i, lbls := range tc.labelsToAdd {
symbols := s.Add(lbls)
require.Equal(t, tc.expectedSymbols[i], symbols)
require.Equal(t, labels, s.Lookup(symbols, nil))
require.Equal(t, lbls, s.Lookup(symbols, nil))
}

// Test that Lookup returns empty labels if no symbols are provided.
Expand All @@ -144,8 +118,7 @@ func TestSymbolizer(t *testing.T) {
Value: 0,
},
}, nil)
require.Equal(t, "", ret[0].Name)
require.Equal(t, "", ret[0].Value)
require.Equal(t, `{""=""}`, ret.String())
}

require.Equal(t, tc.expectedNumLabels, len(s.labels))
Expand Down
26 changes: 12 additions & 14 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (e *nsEntries) ValueAtDimension(_ uint64) int64 {
func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata labels.Labels) (bool, error) {
if hb.format < UnorderedWithStructuredMetadataHeadBlockFmt {
// structuredMetadata must be ignored for the previous head block formats
structuredMetadata = nil
structuredMetadata = labels.EmptyLabels()
}
// This is an allocation hack. The rangetree lib does not
// support the ability to pass a "mutate" function during an insert
Expand Down Expand Up @@ -155,17 +155,17 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata l
}

hb.size += len(line)
hb.size += len(structuredMetadata) * 2 * 4 // 4 bytes per label and value pair as structuredMetadataSymbols
hb.size += structuredMetadata.Len() * 2 * 4 // 4 bytes per label and value pair as structuredMetadataSymbols
hb.lines++

return false, nil
}

func metaLabelsLen(metaLabels labels.Labels) int {
length := 0
for _, label := range metaLabels {
metaLabels.Range(func(label labels.Label) {
length += len(label.Name) + len(label.Value)
}
})
return length
}

Expand Down Expand Up @@ -252,14 +252,15 @@ func (hb *unorderedHeadBlock) Iterator(ctx context.Context, direction logproto.D
// cutting of blocks.
streams := map[string]*logproto.Stream{}
baseHash := pipeline.BaseLabels().Hash()
var structuredMetadata labels.Labels
structuredMetadata := structuredMetadataPool.Get().(labels.Labels)
labelsBuilder := log.NewBufferedLabelsBuilder(structuredMetadata)
_ = hb.forEntries(
ctx,
direction,
mint,
maxt,
func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, structuredMetadata)
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, labelsBuilder)
newLine, parsedLbs, matches := pipeline.ProcessString(ts, line, structuredMetadata)
if !matches {
return nil
Expand Down Expand Up @@ -298,9 +299,7 @@ func (hb *unorderedHeadBlock) Iterator(ctx context.Context, direction logproto.D
}

return iter.EntryIteratorWithClose(iter.NewStreamsIterator(streamsResult, direction), func() error {
if structuredMetadata != nil {
structuredMetadataPool.Put(structuredMetadata) // nolint:staticcheck
}
structuredMetadataPool.Put(structuredMetadata) // nolint:staticcheck
return nil
})
}
Expand All @@ -313,14 +312,15 @@ func (hb *unorderedHeadBlock) SampleIterator(
) iter.SampleIterator {
series := map[string]*logproto.Series{}
setQueryReferencedStructuredMetadata := false
var structuredMetadata labels.Labels
structuredMetadata := structuredMetadataPool.Get().(labels.Labels)
labelsBuilder := log.NewBufferedLabelsBuilder(structuredMetadata)
_ = hb.forEntries(
ctx,
logproto.FORWARD,
mint,
maxt,
func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, structuredMetadata)
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, labelsBuilder)

for _, extractor := range extractor {
value, lbls, ok := extractor.ProcessString(ts, line, structuredMetadata)
Expand Down Expand Up @@ -372,9 +372,7 @@ func (hb *unorderedHeadBlock) SampleIterator(
for _, s := range series {
SamplesPool.Put(s.Samples)
}
if structuredMetadata != nil {
structuredMetadataPool.Put(structuredMetadata) // nolint:staticcheck
}
structuredMetadataPool.Put(structuredMetadata) // nolint:staticcheck
return nil
})
}
Expand Down
Loading
Loading