Skip to content

Commit 938cc96

Browse files
committed
store-gateway: remove support for reading TSDB index v1
This change removes support for generating index-headers and reading the v1 format of TSDB index files. The v1 format was only the default in Prometheus 2.0 and 2.1. Prometheus 2.2 and later default to creating blocks that use the v2 format. The v1 format has never been the default in Mimir. Note that this change does not refactor or change index-header code beyond removing v1 support. Further cleanups will be done in follow up PRs. Part of #13808 Signed-off-by: Nick Pillitteri <[email protected]>
1 parent 22b5f87 commit 938cc96

File tree

10 files changed

+40
-392
lines changed

10 files changed

+40
-392
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* [CHANGE] Ingester: Renamed `cortex_ingest_storage_writer_buffered_produce_bytes` metric to `cortex_ingest_storage_writer_buffered_produce_bytes_distribution` (Prometheus summary), and added `cortex_ingest_storage_writer_buffered_produce_bytes` metric that exports the buffer size as a Prometheus Gauge. #13414
2929
* [CHANGE] Querier and query-frontend: Removed support for per-step stats when MQE is enabled. #13582
3030
* [CHANGE] Compactor: Require that uploaded TSDB blocks use v2 of the index file format. #13815
31+
* [CHANGE] Store-gateway: Remove support for generating index-headers from TSDB blocks that use v1 of the index file format. #13824
3132
* [CHANGE] Query-frontend: Removed support for calculating 'cache-adjusted samples processed' query statistic. The `-query-frontend.cache-samples-processed-stats` CLI flag has been deprecated and will be removed in a future release. Setting it has now no effect. #13582
3233
* [CHANGE] Querier: Renamed experimental flag `-querier.prefer-availability-zone` to `-querier.prefer-availability-zones` and changed it to accept a comma-separated list of availability zones. All zones in the list are given equal priority when querying ingesters and store-gateways. #13756 #13758
3334
* [CHANGE] Ingester: Stabilize experimental flag `-ingest-storage.write-logs-fsync-before-kafka-commit-concurrency` to fsync write logs before the offset is committed to Kafka. Remove `-ingest-storage.write-logs-fsync-before-kafka-commit-enabled` since this is always enabled now. #13591

pkg/storage/indexheader/binary_reader.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ func newChunkedIndexReader(ctx context.Context, bkt objstore.BucketReader, id ul
160160
}
161161

162162
version := int(b[4:5][0])
163-
164-
if version != index.FormatV1 && version != index.FormatV2 {
163+
if version != index.FormatV2 {
165164
return nil, 0, errors.Errorf("not supported index file version %d of %s", version, indexFilepath)
166165
}
167166

pkg/storage/indexheader/header_test.go

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -89,25 +89,11 @@ func TestReadersComparedToIndexHeader(t *testing.T) {
8989
_, err = block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, idIndexV2.String()), nil)
9090
require.NoError(t, err)
9191

92-
metaIndexV1, err := block.ReadMetaFromDir("./testdata/index_format_v1")
93-
require.NoError(t, err)
94-
test.Copy(t, "./testdata/index_format_v1", filepath.Join(tmpDir, metaIndexV1.ULID.String()))
95-
96-
_, err = block.InjectThanosMeta(log.NewNopLogger(), filepath.Join(tmpDir, metaIndexV1.ULID.String()), block.ThanosMeta{
97-
Labels: labels.FromStrings("ext1", "1").Map(),
98-
Source: block.TestSource,
99-
}, &metaIndexV1.BlockMeta)
100-
101-
require.NoError(t, err)
102-
_, err = block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, metaIndexV1.ULID.String()), nil)
103-
require.NoError(t, err)
104-
10592
for _, testBlock := range []struct {
10693
version string
10794
id ulid.ULID
10895
}{
10996
{version: "v2", id: idIndexV2},
110-
{version: "v1", id: metaIndexV1.ULID},
11197
} {
11298
t.Run(testBlock.version, func(t *testing.T) {
11399
id := testBlock.id
@@ -241,8 +227,8 @@ func Test_DownsampleSparseIndexHeader(t *testing.T) {
241227
// label names are equal between original and downsampled sparse index-headers
242228
require.ElementsMatch(t, downsampleLabelNames, origLabelNames)
243229

244-
origIdxpbTbl := br1.postingsOffsetTable.NewSparsePostingOffsetTable()
245-
downsampleIdxpbTbl := br2.postingsOffsetTable.NewSparsePostingOffsetTable()
230+
origIdxpbTbl := br1.postingsOffsetTable.ToSparsePostingOffsetTable()
231+
downsampleIdxpbTbl := br2.postingsOffsetTable.ToSparsePostingOffsetTable()
246232

247233
for name, vals := range origIdxpbTbl.Postings {
248234
downsampledOffsets := downsampleIdxpbTbl.Postings[name].Offsets
@@ -290,7 +276,7 @@ func compareIndexToHeaderPostings(t *testing.T, indexByteSlice index.ByteSlice,
290276
})
291277
require.NoError(t, err)
292278

293-
tbl := sbr.postingsOffsetTable.NewSparsePostingOffsetTable()
279+
tbl := sbr.postingsOffsetTable.ToSparsePostingOffsetTable()
294280

295281
expLabelNames, err := ir.LabelNames(context.Background())
296282
require.NoError(t, err)

pkg/storage/indexheader/index/postings.go

Lines changed: 4 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type PostingOffsetTable interface {
4242
// LabelNames returns a sorted list of all label names in this table.
4343
LabelNames() ([]string, error)
4444

45-
NewSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable)
45+
ToSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable)
4646

4747
// PostingOffsetInMemSampling returns the inverse of the fraction of postings held in memory. A lower value indicates
4848
// postings are sample more frequently.
@@ -58,60 +58,13 @@ type PostingListOffset struct {
5858
Off index.Range
5959
}
6060

61-
type PostingOffsetTableV1 struct {
62-
// For the v1 format, labelname -> labelvalue -> offset.
63-
postings map[string]map[string]index.Range
64-
}
65-
6661
func NewPostingOffsetTable(factory *streamencoding.DecbufFactory, tableOffset int, indexVersion int, indexLastPostingListEndBound uint64, postingOffsetsInMemSampling int, doChecksum bool) (PostingOffsetTable, error) {
6762
switch indexVersion {
68-
case index.FormatV1:
69-
return newV1PostingOffsetTable(factory, tableOffset, indexLastPostingListEndBound)
7063
case index.FormatV2:
7164
return newV2PostingOffsetTable(factory, tableOffset, indexLastPostingListEndBound, postingOffsetsInMemSampling, doChecksum)
7265
}
7366

74-
return nil, fmt.Errorf("unknown index version %v", indexVersion)
75-
}
76-
77-
func newV1PostingOffsetTable(factory *streamencoding.DecbufFactory, tableOffset int, indexLastPostingListEndBound uint64) (*PostingOffsetTableV1, error) {
78-
t := PostingOffsetTableV1{
79-
postings: map[string]map[string]index.Range{},
80-
}
81-
82-
// Earlier V1 formats don't have a sorted postings offset table, so
83-
// load the whole offset table into memory.
84-
var lastKey string
85-
var lastValue string
86-
var prevRng index.Range
87-
88-
if err := readOffsetTable(factory, tableOffset, func(key string, value string, off uint64) error {
89-
if len(t.postings) > 0 {
90-
prevRng.End = int64(off - crc32.Size)
91-
t.postings[lastKey][lastValue] = prevRng
92-
}
93-
94-
if _, ok := t.postings[key]; !ok {
95-
t.postings[key] = map[string]index.Range{}
96-
}
97-
98-
lastKey = key
99-
lastValue = value
100-
prevRng = index.Range{Start: int64(off + postingLengthFieldSize)}
101-
return nil
102-
}); err != nil {
103-
return nil, errors.Wrap(err, "read postings table")
104-
}
105-
106-
if len(t.postings) > 0 {
107-
// In case lastValOffset is unknown as we don't have next posting anymore. Guess from the index table of contents.
108-
// The last posting list ends before the label offset table.
109-
// In worst case we will overfetch a few bytes.
110-
prevRng.End = int64(indexLastPostingListEndBound) - crc32.Size
111-
t.postings[lastKey][lastValue] = prevRng
112-
}
113-
114-
return &t, nil
67+
return nil, fmt.Errorf("unknown or unsupported index version %v", indexVersion)
11568
}
11669

11770
func newV2PostingOffsetTable(factory *streamencoding.DecbufFactory, tableOffset int, indexLastPostingListEndBound uint64, postingOffsetsInMemSampling int, doChecksum bool) (table *PostingOffsetTableV2, err error) {
@@ -278,95 +231,6 @@ func NewPostingOffsetTableFromSparseHeader(factory *streamencoding.DecbufFactory
278231
return &t, err
279232
}
280233

281-
// readOffsetTable reads an offset table and at the given position calls f for each
282-
// found entry. If f returns an error it stops decoding and returns the received error.
283-
func readOffsetTable(factory *streamencoding.DecbufFactory, tableOffset int, f func(string, string, uint64) error) (err error) {
284-
d := factory.NewDecbufAtChecked(tableOffset, castagnoliTable)
285-
defer runutil.CloseWithErrCapture(&err, &d, "read offset table")
286-
287-
cnt := d.Be32()
288-
289-
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
290-
keyCount := d.Uvarint()
291-
292-
// The Postings offset table takes only 2 keys per entry (name and value of label).
293-
if keyCount != 2 {
294-
return errors.Errorf("unexpected key length for posting table %d", keyCount)
295-
}
296-
297-
key := d.UvarintStr()
298-
value := d.UvarintStr()
299-
o := d.Uvarint64()
300-
if d.Err() != nil {
301-
break
302-
}
303-
if err := f(key, value, o); err != nil {
304-
return err
305-
}
306-
cnt--
307-
}
308-
return d.Err()
309-
}
310-
311-
func (t *PostingOffsetTableV1) PostingsOffset(name string, value string) (index.Range, bool, error) {
312-
e, ok := t.postings[name]
313-
if !ok {
314-
return index.Range{}, false, nil
315-
}
316-
rng, ok := e[value]
317-
if !ok {
318-
return index.Range{}, false, nil
319-
}
320-
return rng, true, nil
321-
}
322-
323-
func (t *PostingOffsetTableV1) LabelValuesOffsets(ctx context.Context, name, prefix string, filter func(string) bool) ([]PostingListOffset, error) {
324-
e, ok := t.postings[name]
325-
if !ok {
326-
return nil, nil
327-
}
328-
values := make([]PostingListOffset, 0, len(e))
329-
count := 1
330-
for k, r := range e {
331-
if count%CheckContextEveryNIterations == 0 && ctx.Err() != nil {
332-
return nil, ctx.Err()
333-
}
334-
count++
335-
if strings.HasPrefix(k, prefix) && (filter == nil || filter(k)) {
336-
values = append(values, PostingListOffset{LabelValue: k, Off: r})
337-
}
338-
}
339-
slices.SortFunc(values, func(a, b PostingListOffset) int {
340-
return strings.Compare(a.LabelValue, b.LabelValue)
341-
})
342-
return values, nil
343-
}
344-
345-
func (t *PostingOffsetTableV1) LabelNames() ([]string, error) {
346-
labelNames := make([]string, 0, len(t.postings))
347-
allPostingsKeyName, _ := index.AllPostingsKey()
348-
349-
for name := range t.postings {
350-
if name == allPostingsKeyName {
351-
continue
352-
}
353-
354-
labelNames = append(labelNames, name)
355-
}
356-
357-
slices.Sort(labelNames)
358-
359-
return labelNames, nil
360-
}
361-
362-
func (t *PostingOffsetTableV1) PostingOffsetInMemSampling() int {
363-
return 0
364-
}
365-
366-
func (t *PostingOffsetTableV1) NewSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable) {
367-
return &indexheaderpb.PostingOffsetTable{}
368-
}
369-
370234
type PostingOffsetTableV2 struct {
371235
// Map of LabelName to a list of some LabelValues's position in the offset table.
372236
// The first and last values for each name are always present, we keep only 1/postingOffsetsInMemSampling of the rest.
@@ -648,8 +512,8 @@ func (t *PostingOffsetTableV2) PostingOffsetInMemSampling() int {
648512
return 0
649513
}
650514

651-
// NewSparsePostingOffsetTable loads all postings offset table data into a sparse index-header to be persisted to disk
652-
func (t *PostingOffsetTableV2) NewSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable) {
515+
// ToSparsePostingOffsetTable loads all postings offset table data into a sparse index-header to be persisted to disk
516+
func (t *PostingOffsetTableV2) ToSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable) {
653517
sparseHeaders := &indexheaderpb.PostingOffsetTable{
654518
Postings: make(map[string]*indexheaderpb.PostingValueOffsets, len(t.postings)),
655519
PostingOffsetInMemorySampling: int64(t.postingOffsetsInMemSampling),

0 commit comments

Comments
 (0)