Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* [CHANGE] Ingester: Renamed `cortex_ingest_storage_writer_buffered_produce_bytes` metric to `cortex_ingest_storage_writer_buffered_produce_bytes_distribution` (Prometheus summary), and added `cortex_ingest_storage_writer_buffered_produce_bytes` metric that exports the buffer size as a Prometheus Gauge. #13414
* [CHANGE] Querier and query-frontend: Removed support for per-step stats when MQE is enabled. #13582
* [CHANGE] Compactor: Require that uploaded TSDB blocks use v2 of the index file format. #13815
* [CHANGE] Store-gateway: Remove support for generating index-headers from TSDB blocks that use v1 of the index file format. #13824
* [CHANGE] Query-frontend: Removed support for calculating 'cache-adjusted samples processed' query statistic. The `-query-frontend.cache-samples-processed-stats` CLI flag has been deprecated and will be removed in a future release. Setting it has now no effect. #13582
* [CHANGE] Querier: Renamed experimental flag `-querier.prefer-availability-zone` to `-querier.prefer-availability-zones` and changed it to accept a comma-separated list of availability zones. All zones in the list are given equal priority when querying ingesters and store-gateways. #13756 #13758
* [CHANGE] Ingester: Stabilize experimental flag `-ingest-storage.write-logs-fsync-before-kafka-commit-concurrency` to fsync write logs before the offset is committed to Kafka. Remove `-ingest-storage.write-logs-fsync-before-kafka-commit-enabled` since this is always enabled now. #13591
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ func newChunkedIndexReader(ctx context.Context, bkt objstore.BucketReader, id ul
}

version := int(b[4:5][0])

if version != index.FormatV1 && version != index.FormatV2 {
if version != index.FormatV2 {
return nil, 0, errors.Errorf("not supported index file version %d of %s", version, indexFilepath)
}

Expand Down
20 changes: 3 additions & 17 deletions pkg/storage/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,11 @@ func TestReadersComparedToIndexHeader(t *testing.T) {
_, err = block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, idIndexV2.String()), nil)
require.NoError(t, err)

metaIndexV1, err := block.ReadMetaFromDir("./testdata/index_format_v1")
require.NoError(t, err)
test.Copy(t, "./testdata/index_format_v1", filepath.Join(tmpDir, metaIndexV1.ULID.String()))

_, err = block.InjectThanosMeta(log.NewNopLogger(), filepath.Join(tmpDir, metaIndexV1.ULID.String()), block.ThanosMeta{
Labels: labels.FromStrings("ext1", "1").Map(),
Source: block.TestSource,
}, &metaIndexV1.BlockMeta)

require.NoError(t, err)
_, err = block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, metaIndexV1.ULID.String()), nil)
require.NoError(t, err)

for _, testBlock := range []struct {
version string
id ulid.ULID
}{
{version: "v2", id: idIndexV2},
{version: "v1", id: metaIndexV1.ULID},
} {
t.Run(testBlock.version, func(t *testing.T) {
id := testBlock.id
Expand Down Expand Up @@ -241,8 +227,8 @@ func Test_DownsampleSparseIndexHeader(t *testing.T) {
// label names are equal between original and downsampled sparse index-headers
require.ElementsMatch(t, downsampleLabelNames, origLabelNames)

origIdxpbTbl := br1.postingsOffsetTable.NewSparsePostingOffsetTable()
downsampleIdxpbTbl := br2.postingsOffsetTable.NewSparsePostingOffsetTable()
origIdxpbTbl := br1.postingsOffsetTable.ToSparsePostingOffsetTable()
downsampleIdxpbTbl := br2.postingsOffsetTable.ToSparsePostingOffsetTable()

for name, vals := range origIdxpbTbl.Postings {
downsampledOffsets := downsampleIdxpbTbl.Postings[name].Offsets
Expand Down Expand Up @@ -290,7 +276,7 @@ func compareIndexToHeaderPostings(t *testing.T, indexByteSlice index.ByteSlice,
})
require.NoError(t, err)

tbl := sbr.postingsOffsetTable.NewSparsePostingOffsetTable()
tbl := sbr.postingsOffsetTable.ToSparsePostingOffsetTable()

expLabelNames, err := ir.LabelNames(context.Background())
require.NoError(t, err)
Expand Down
144 changes: 4 additions & 140 deletions pkg/storage/indexheader/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type PostingOffsetTable interface {
// LabelNames returns a sorted list of all label names in this table.
LabelNames() ([]string, error)

NewSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable)
ToSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable)

// PostingOffsetInMemSampling returns the inverse of the fraction of postings held in memory. A lower value indicates
// postings are sample more frequently.
Expand All @@ -58,60 +58,13 @@ type PostingListOffset struct {
Off index.Range
}

type PostingOffsetTableV1 struct {
// For the v1 format, labelname -> labelvalue -> offset.
postings map[string]map[string]index.Range
}

func NewPostingOffsetTable(factory *streamencoding.DecbufFactory, tableOffset int, indexVersion int, indexLastPostingListEndBound uint64, postingOffsetsInMemSampling int, doChecksum bool) (PostingOffsetTable, error) {
switch indexVersion {
case index.FormatV1:
return newV1PostingOffsetTable(factory, tableOffset, indexLastPostingListEndBound)
case index.FormatV2:
return newV2PostingOffsetTable(factory, tableOffset, indexLastPostingListEndBound, postingOffsetsInMemSampling, doChecksum)
}

return nil, fmt.Errorf("unknown index version %v", indexVersion)
}

func newV1PostingOffsetTable(factory *streamencoding.DecbufFactory, tableOffset int, indexLastPostingListEndBound uint64) (*PostingOffsetTableV1, error) {
t := PostingOffsetTableV1{
postings: map[string]map[string]index.Range{},
}

// Earlier V1 formats don't have a sorted postings offset table, so
// load the whole offset table into memory.
var lastKey string
var lastValue string
var prevRng index.Range

if err := readOffsetTable(factory, tableOffset, func(key string, value string, off uint64) error {
if len(t.postings) > 0 {
prevRng.End = int64(off - crc32.Size)
t.postings[lastKey][lastValue] = prevRng
}

if _, ok := t.postings[key]; !ok {
t.postings[key] = map[string]index.Range{}
}

lastKey = key
lastValue = value
prevRng = index.Range{Start: int64(off + postingLengthFieldSize)}
return nil
}); err != nil {
return nil, errors.Wrap(err, "read postings table")
}

if len(t.postings) > 0 {
// In case lastValOffset is unknown as we don't have next posting anymore. Guess from the index table of contents.
// The last posting list ends before the label offset table.
// In worst case we will overfetch a few bytes.
prevRng.End = int64(indexLastPostingListEndBound) - crc32.Size
t.postings[lastKey][lastValue] = prevRng
}

return &t, nil
return nil, fmt.Errorf("unknown or unsupported index version %v", indexVersion)
}

func newV2PostingOffsetTable(factory *streamencoding.DecbufFactory, tableOffset int, indexLastPostingListEndBound uint64, postingOffsetsInMemSampling int, doChecksum bool) (table *PostingOffsetTableV2, err error) {
Expand Down Expand Up @@ -278,95 +231,6 @@ func NewPostingOffsetTableFromSparseHeader(factory *streamencoding.DecbufFactory
return &t, err
}

// readOffsetTable reads an offset table and at the given position calls f for each
// found entry. If f returns an error it stops decoding and returns the received error.
func readOffsetTable(factory *streamencoding.DecbufFactory, tableOffset int, f func(string, string, uint64) error) (err error) {
d := factory.NewDecbufAtChecked(tableOffset, castagnoliTable)
defer runutil.CloseWithErrCapture(&err, &d, "read offset table")

cnt := d.Be32()

for d.Err() == nil && d.Len() > 0 && cnt > 0 {
keyCount := d.Uvarint()

// The Postings offset table takes only 2 keys per entry (name and value of label).
if keyCount != 2 {
return errors.Errorf("unexpected key length for posting table %d", keyCount)
}

key := d.UvarintStr()
value := d.UvarintStr()
o := d.Uvarint64()
if d.Err() != nil {
break
}
if err := f(key, value, o); err != nil {
return err
}
cnt--
}
return d.Err()
}

func (t *PostingOffsetTableV1) PostingsOffset(name string, value string) (index.Range, bool, error) {
e, ok := t.postings[name]
if !ok {
return index.Range{}, false, nil
}
rng, ok := e[value]
if !ok {
return index.Range{}, false, nil
}
return rng, true, nil
}

func (t *PostingOffsetTableV1) LabelValuesOffsets(ctx context.Context, name, prefix string, filter func(string) bool) ([]PostingListOffset, error) {
e, ok := t.postings[name]
if !ok {
return nil, nil
}
values := make([]PostingListOffset, 0, len(e))
count := 1
for k, r := range e {
if count%CheckContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, ctx.Err()
}
count++
if strings.HasPrefix(k, prefix) && (filter == nil || filter(k)) {
values = append(values, PostingListOffset{LabelValue: k, Off: r})
}
}
slices.SortFunc(values, func(a, b PostingListOffset) int {
return strings.Compare(a.LabelValue, b.LabelValue)
})
return values, nil
}

func (t *PostingOffsetTableV1) LabelNames() ([]string, error) {
labelNames := make([]string, 0, len(t.postings))
allPostingsKeyName, _ := index.AllPostingsKey()

for name := range t.postings {
if name == allPostingsKeyName {
continue
}

labelNames = append(labelNames, name)
}

slices.Sort(labelNames)

return labelNames, nil
}

func (t *PostingOffsetTableV1) PostingOffsetInMemSampling() int {
return 0
}

func (t *PostingOffsetTableV1) NewSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable) {
return &indexheaderpb.PostingOffsetTable{}
}

type PostingOffsetTableV2 struct {
// Map of LabelName to a list of some LabelValues's position in the offset table.
// The first and last values for each name are always present, we keep only 1/postingOffsetsInMemSampling of the rest.
Expand Down Expand Up @@ -648,8 +512,8 @@ func (t *PostingOffsetTableV2) PostingOffsetInMemSampling() int {
return 0
}

// NewSparsePostingOffsetTable loads all postings offset table data into a sparse index-header to be persisted to disk
func (t *PostingOffsetTableV2) NewSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable) {
// ToSparsePostingOffsetTable loads all postings offset table data into a sparse index-header to be persisted to disk
func (t *PostingOffsetTableV2) ToSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable) {
sparseHeaders := &indexheaderpb.PostingOffsetTable{
Postings: make(map[string]*indexheaderpb.PostingValueOffsets, len(t.postings)),
PostingOffsetInMemorySampling: int64(t.postingOffsetsInMemSampling),
Expand Down
Loading