Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions pkg/dataobj/metastore/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy"
"github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers"
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
"github.com/grafana/loki/v3/pkg/dataobj/sections/postings"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/xcap"
Expand Down Expand Up @@ -246,6 +247,10 @@ func (m *ObjectMetastore) DataObjects(ctx context.Context, start, end time.Time,
}

func (m *ObjectMetastore) Labels(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) {
if m.usePostingsSections {
return m.labelsFromPostings(ctx, start, end, matchers...)
}

uniqueLabels := map[string]struct{}{}

err := m.forEachLabel(ctx, start, end, func(label labels.Label) {
Expand All @@ -258,6 +263,10 @@ func (m *ObjectMetastore) Labels(ctx context.Context, start, end time.Time, matc
}

func (m *ObjectMetastore) Values(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) {
if m.usePostingsSections {
return m.valuesFromPostings(ctx, start, end, matchers...)
}

values := map[string]struct{}{}

err := m.forEachLabel(ctx, start, end, func(label labels.Label) {
Expand All @@ -269,6 +278,144 @@ func (m *ObjectMetastore) Values(ctx context.Context, start, end time.Time, matc
return slices.Collect(maps.Keys(values)), err
}

func (m *ObjectMetastore) labelsFromPostings(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) {
return m.lookupLabelsFromPostings(ctx, start, end, func(reader *postings.Reader, streamRefs map[postings.StreamRef]struct{}) ([]string, error) {
return reader.ResolveLabelNames(ctx, streamRefs)
}, func(streamLabel labels.Label) string {
return streamLabel.Name
}, matchers...)
}

func (m *ObjectMetastore) valuesFromPostings(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) {
return m.lookupLabelsFromPostings(ctx, start, end, func(reader *postings.Reader, streamRefs map[postings.StreamRef]struct{}) ([]string, error) {
return reader.ResolveLabelValues(ctx, streamRefs)
}, func(streamLabel labels.Label) string {
return streamLabel.Value
}, matchers...)
}

func (m *ObjectMetastore) lookupLabelsFromPostings(
ctx context.Context,
start, end time.Time,
lookupPostings func(*postings.Reader, map[postings.StreamRef]struct{}) ([]string, error),
labelSelector func(labels.Label) string,
matchers ...*labels.Matcher,
) ([]string, error) {
tenant, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, fmt.Errorf("extracting org ID: %w", err)
}

var tocPaths []string
for path := range IterTableOfContentsPaths(start, end) {
tocPaths = append(tocPaths, path)
}

entries, err := m.listObjectsFromTables(ctx, tocPaths, start, end)
if err != nil {
return nil, err
}

predicate := streamPredicateFromMatchers(start, end, matchers...)
results := make(map[string]struct{})
var mu sync.Mutex

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(m.parallelism)
for _, entry := range entries {
g.Go(func() error {
object, err := dataobj.FromBucket(ctx, m.bucket, entry.Path, 0)
if err != nil {
return fmt.Errorf("getting object %s from bucket: %w", entry.Path, err)
}

if reader, err := openPostingsReaderForObject(ctx, entry.Path, tenant, object); err != nil {
return err
} else if reader != nil {
defer reader.Close()

var matchingStreamRefs map[postings.StreamRef]struct{}
if len(matchers) > 0 {
matchingStreamRefs, _, err = reader.ResolveMatchingStreamRefs(ctx, matchers)
if err != nil {
return fmt.Errorf("resolving matching streams from postings %s: %w", entry.Path, err)
}
}

values, err := lookupPostings(reader, matchingStreamRefs)
if err != nil {
return fmt.Errorf("resolving postings labels from %s: %w", entry.Path, err)
}

mu.Lock()
for _, value := range values {
results[value] = struct{}{}
}
mu.Unlock()
return nil
}

valuesFromStreamSections := map[string]struct{}{}
err = forEachStream(ctx, object, predicate, func(stream streams.Stream) {
stream.Labels.Range(func(streamLabel labels.Label) {
valuesFromStreamSections[labelSelector(streamLabel)] = struct{}{}
})
})
if err != nil {
return fmt.Errorf("resolving stream labels from stream section %s: %w", entry.Path, err)
}

mu.Lock()
for value := range valuesFromStreamSections {
results[value] = struct{}{}
}
mu.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}

return slices.Collect(maps.Keys(results)), nil
}

func openPostingsReaderForObject(
ctx context.Context,
objectPath string,
targetTenant string,
object *dataobj.Object,
) (*postings.Reader, error) {
var postingsSection *dataobj.Section

for _, section := range object.Sections() {
if section.Tenant != targetTenant || !postings.CheckSection(section) {
continue
}
if postingsSection != nil {
return nil, fmt.Errorf("multiple postings sections found for tenant %s in %s", targetTenant, objectPath)
}
postingsSection = section
}

if postingsSection == nil {
return nil, nil
}

sec, err := postings.Open(ctx, postingsSection)
if err != nil {
return nil, fmt.Errorf("opening postings section from %s: %w", objectPath, err)
}

reader := postings.NewReader(postings.ReaderOptions{
Columns: sec.Columns(),
})
if err := reader.Open(ctx); err != nil {
return nil, fmt.Errorf("opening postings reader from %s: %w", objectPath, err)
}
return reader, nil
}

func (m *ObjectMetastore) forEachLabel(ctx context.Context, start, end time.Time, foreach func(labels.Label), matchers ...*labels.Matcher) error {
streams, err := m.streams(ctx, start, end, matchers...)
if err != nil {
Expand Down
151 changes: 149 additions & 2 deletions pkg/dataobj/metastore/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
"github.com/grafana/loki/v3/pkg/dataobj/sections/postings"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -152,6 +153,19 @@ func TestLabelsEmptyMatcher(t *testing.T) {
})
}

func TestLabels_UsePostingsSections_FallsBackToStreams(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
}

queryMetastoreWithConfig(t, tenantID, Config{UsePostingsSections: true}, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchedLabels, err := mstore.Labels(ctx, start, end, matchers...)
require.NoError(t, err)
require.Len(t, matchedLabels, len(matchers))
})
}

func TestValues(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
Expand Down Expand Up @@ -214,6 +228,94 @@ func TestValuesEmptyMatcher(t *testing.T) {
})
}

func TestLabels_UsePostingsSections_CombinedObjectPrefersPostings(t *testing.T) {
queryMetastoreWithIndexBuilder(t, tenantID, Config{UsePostingsSections: true}, func(builder *indexobj.Builder) {
_, err := builder.AppendStream(tenantID, streams.Stream{
ID: 1,
Labels: labels.New(
labels.Label{Name: "app", Value: "foo"},
labels.Label{Name: "streams_only", Value: "streams"},
),
MinTimestamp: now.Add(-3 * time.Hour),
MaxTimestamp: now.Add(-2 * time.Hour),
UncompressedSize: 5,
Rows: 1,
})
require.NoError(t, err)
require.NoError(t, builder.ObserveLogLine(tenantID, "test-path", 0, 1, 1, now.Add(-3*time.Hour), 5))

builder.ObserveLabelPosting(tenantID, postings.LabelObservation{
ObjectPath: "test-path",
SectionIndex: 0,
ColumnName: "app",
LabelValue: "foo",
StreamID: 1,
Timestamp: now.Add(-3 * time.Hour),
UncompressedSize: 5,
})
builder.ObserveLabelPosting(tenantID, postings.LabelObservation{
ObjectPath: "test-path",
SectionIndex: 0,
ColumnName: "postings_only",
LabelValue: "postings",
StreamID: 1,
Timestamp: now.Add(-3 * time.Hour),
UncompressedSize: 5,
})
}, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "app", "foo")}
matchedLabels, err := mstore.Labels(ctx, start, end, matchers...)
require.NoError(t, err)
require.Contains(t, matchedLabels, "app")
require.Contains(t, matchedLabels, "postings_only")
require.NotContains(t, matchedLabels, "streams_only")
})
}

func TestValues_UsePostingsSections_CombinedObjectPrefersPostings(t *testing.T) {
queryMetastoreWithIndexBuilder(t, tenantID, Config{UsePostingsSections: true}, func(builder *indexobj.Builder) {
_, err := builder.AppendStream(tenantID, streams.Stream{
ID: 1,
Labels: labels.New(
labels.Label{Name: "app", Value: "foo"},
labels.Label{Name: "streams_only", Value: "streams"},
),
MinTimestamp: now.Add(-3 * time.Hour),
MaxTimestamp: now.Add(-2 * time.Hour),
UncompressedSize: 5,
Rows: 1,
})
require.NoError(t, err)
require.NoError(t, builder.ObserveLogLine(tenantID, "test-path", 0, 1, 1, now.Add(-3*time.Hour), 5))

builder.ObserveLabelPosting(tenantID, postings.LabelObservation{
ObjectPath: "test-path",
SectionIndex: 0,
ColumnName: "app",
LabelValue: "foo",
StreamID: 1,
Timestamp: now.Add(-3 * time.Hour),
UncompressedSize: 5,
})
builder.ObserveLabelPosting(tenantID, postings.LabelObservation{
ObjectPath: "test-path",
SectionIndex: 0,
ColumnName: "postings_only",
LabelValue: "postings",
StreamID: 1,
Timestamp: now.Add(-3 * time.Hour),
UncompressedSize: 5,
})
}, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "app", "foo")}
matchedValues, err := mstore.Values(ctx, start, end, matchers...)
require.NoError(t, err)
require.Contains(t, matchedValues, "foo")
require.Contains(t, matchedValues, "postings")
require.NotContains(t, matchedValues, "streams")
})
}

func TestSectionsForStreamMatchers(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), tenantID)

Expand Down Expand Up @@ -773,6 +875,47 @@ func TestDataobjSectionDescriptorMerge_NilMapPanic(t *testing.T) {
}

func queryMetastore(t *testing.T, tenant string, mfunc func(context.Context, time.Time, time.Time, Metastore)) {
queryMetastoreWithConfig(t, tenant, Config{}, mfunc)
}

func queryMetastoreWithIndexBuilder(
t *testing.T,
tenant string,
cfg Config,
setupBuilder func(builder *indexobj.Builder),
mfunc func(context.Context, time.Time, time.Time, Metastore),
) {
start := now.Add(-5 * time.Hour)
end := now.Add(5 * time.Hour)

builder := newFixtureBuilder(t)
setupBuilder(builder)
timeRanges := builder.TimeRanges()
require.NotEmpty(t, timeRanges)

obj := flushFixture(t, builder)

bucket := objstore.NewInMemBucket()

uploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, log.NewNopLogger())
require.NoError(t, uploader.RegisterMetrics(prometheus.NewPedanticRegistry()))
path, err := uploader.Upload(context.Background(), obj)
require.NoError(t, err)

metastoreTocWriter := NewTableOfContentsWriter(bucket, log.NewNopLogger())
require.NoError(t, metastoreTocWriter.RegisterMetrics(prometheus.NewPedanticRegistry()))
require.NoError(t, metastoreTocWriter.WriteEntry(context.Background(), path, timeRanges))

mstore := newTestObjectMetastoreWithConfig(bucket, cfg)
defer func() {
require.NoError(t, mstore.bucket.Close())
}()

ctx := user.InjectOrgID(context.Background(), tenant)
mfunc(ctx, start, end, mstore)
}

func queryMetastoreWithConfig(t *testing.T, tenant string, cfg Config, mfunc func(context.Context, time.Time, time.Time, Metastore)) {
now := time.Now().UTC()
start := now.Add(-time.Hour * 5)
end := now.Add(time.Hour * 5)
Expand All @@ -783,7 +926,7 @@ func queryMetastore(t *testing.T, tenant string, mfunc func(context.Context, tim
builder.addStreamAndFlush(tenant, stream)
}

mstore := newTestObjectMetastore(builder.bucket)
mstore := newTestObjectMetastoreWithConfig(builder.bucket, cfg)
defer func() {
require.NoError(t, mstore.bucket.Close())
}()
Expand Down Expand Up @@ -826,5 +969,9 @@ func newTestDataBuilder(t testing.TB) *testDataBuilder {
}

func newTestObjectMetastore(bucket objstore.Bucket) *ObjectMetastore {
return NewObjectMetastore(bucket, Config{}, log.NewNopLogger(), NewObjectMetastoreMetrics(prometheus.NewRegistry()))
return newTestObjectMetastoreWithConfig(bucket, Config{})
}

func newTestObjectMetastoreWithConfig(bucket objstore.Bucket, cfg Config) *ObjectMetastore {
return NewObjectMetastore(bucket, cfg, log.NewNopLogger(), NewObjectMetastoreMetrics(prometheus.NewRegistry()))
}
Loading
Loading