diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 82eb203faafba..c8261aca8de3f 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -339,6 +339,10 @@ query_engine: # CLI flag: -query-engine.range-reads.min-range-size [min_range_size: | default = 1048576] + # Experimental: Enable ahead of time catalog lookups. + # CLI flag: -query-engine.ahead-of-time-catalog-lookups-enabled + [ahead_of_time_catalog_lookups_enabled: | default = false] + # Experimental: Number of worker threads to spawn. Each worker thread runs one # task at a time. 0 means to use GOMAXPROCS value. # CLI flag: -query-engine.worker-threads diff --git a/pkg/dataobj/metastore/iter.go b/pkg/dataobj/metastore/iter.go index 64c26f4f27c38..810d94eb39d78 100644 --- a/pkg/dataobj/metastore/iter.go +++ b/pkg/dataobj/metastore/iter.go @@ -26,6 +26,7 @@ func forEachStreamSectionPointer( streamIDs []int64, f func(pointers.SectionPointer), ) error { + // TODO(ivkalita): remove targetTenant, err := user.ExtractOrgID(ctx) if err != nil { return fmt.Errorf("extracting org ID: %w", err) @@ -210,6 +211,7 @@ func forEachMatchedPointerSectionKey( matchColumnValue string, f func(key SectionKey), ) error { + // TODO(ivkalita): move to scan_pointers? and remove from here targetTenant, err := user.ExtractOrgID(ctx) if err != nil { return fmt.Errorf("extracting org ID: %w", err) diff --git a/pkg/dataobj/metastore/metastore.go b/pkg/dataobj/metastore/metastore.go index b31be9aeb9475..44960be55f809 100644 --- a/pkg/dataobj/metastore/metastore.go +++ b/pkg/dataobj/metastore/metastore.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/apache/arrow-go/v18/arrow" "github.com/prometheus/prometheus/model/labels" ) @@ -11,9 +12,19 @@ type Metastore interface { // Sections returns a list of SectionDescriptors, including metadata (stream IDs, start & end times, bytes), for the given matchers & predicates between [start,end] Sections(ctx context.Context, start, end time.Time, matchers []*labels.Matcher, predicates []*labels.Matcher) ([]*DataobjSectionDescriptor, error) + GetIndexes(ctx context.Context, start, end time.Time) ([]string, error) + + ScanPointers(ctx context.Context, indexPath string, start, end time.Time, matchers []*labels.Matcher) (ArrowRecordBatchReader, error) + CollectScanPointersResult(ctx context.Context, reader ArrowRecordBatchReader) ([]*DataobjSectionDescriptor, error) + // Labels returns all possible labels from matching streams between [start,end] Labels(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) // Used to get possible labels for a given stream // Values returns all possible values for the given label matchers between [start,end] Values(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) // Used to get all values for a given set of label matchers } + +type ArrowRecordBatchReader interface { + Read(ctx context.Context) (arrow.RecordBatch, error) + Close() +} diff --git a/pkg/dataobj/metastore/metrics.go b/pkg/dataobj/metastore/metrics.go index 31a8d29d3bfa2..5cc99a183ad6a 100644 --- a/pkg/dataobj/metastore/metrics.go +++ b/pkg/dataobj/metastore/metrics.go @@ -108,7 +108,7 @@ func (p *tocMetrics) observeMetastoreProcessing(recordTimestamp time.Time) { } } -type objectMetastoreMetrics struct { +type ObjectMetastoreMetrics struct { indexObjectsTotal prometheus.Histogram streamFilterTotalDuration prometheus.Histogram streamFilterSections prometheus.Histogram @@ -122,8 +122,8 @@ type objectMetastoreMetrics struct { resolvedSectionsRatio prometheus.Histogram } -func newObjectMetastoreMetrics() *objectMetastoreMetrics { - metrics := &objectMetastoreMetrics{ +func NewObjectMetastoreMetrics(reg prometheus.Registerer) *ObjectMetastoreMetrics { + metrics := &ObjectMetastoreMetrics{ indexObjectsTotal: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "loki_metastore_index_objects_total", Help: "Total number of objects to be searched for a Metastore query", @@ -214,19 +214,19 @@ func newObjectMetastoreMetrics() *objectMetastoreMetrics { }), } - return metrics -} + if reg != nil { + reg.MustRegister(metrics.indexObjectsTotal) + reg.MustRegister(metrics.streamFilterTotalDuration) + reg.MustRegister(metrics.streamFilterSections) + reg.MustRegister(metrics.streamFilterStreamsReadDuration) + reg.MustRegister(metrics.streamFilterPointersReadDuration) + reg.MustRegister(metrics.estimateSectionsTotalDuration) + reg.MustRegister(metrics.estimateSectionsPointerReadDuration) + reg.MustRegister(metrics.estimateSectionsSections) + reg.MustRegister(metrics.resolvedSectionsTotalDuration) + reg.MustRegister(metrics.resolvedSectionsTotal) + reg.MustRegister(metrics.resolvedSectionsRatio) + } -func (p *objectMetastoreMetrics) register(reg prometheus.Registerer) { - reg.MustRegister(p.indexObjectsTotal) - reg.MustRegister(p.streamFilterTotalDuration) - reg.MustRegister(p.streamFilterSections) - reg.MustRegister(p.streamFilterStreamsReadDuration) - reg.MustRegister(p.streamFilterPointersReadDuration) - reg.MustRegister(p.estimateSectionsTotalDuration) - reg.MustRegister(p.estimateSectionsPointerReadDuration) - reg.MustRegister(p.estimateSectionsSections) - reg.MustRegister(p.resolvedSectionsTotalDuration) - reg.MustRegister(p.resolvedSectionsTotal) - reg.MustRegister(p.resolvedSectionsRatio) + return metrics } diff --git a/pkg/dataobj/metastore/object.go b/pkg/dataobj/metastore/object.go index 4d8662f7a9e0a..2a2da507bfacd 100644 --- a/pkg/dataobj/metastore/object.go +++ b/pkg/dataobj/metastore/object.go @@ -15,6 +15,7 @@ import ( "time" "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/scalar" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -28,9 +29,9 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy" "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" - "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" + "github.com/grafana/loki/v3/pkg/storage/bucket" utillog "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/xcap" ) @@ -43,7 +44,7 @@ type ObjectMetastore struct { bucket objstore.Bucket parallelism int logger log.Logger - metrics *objectMetastoreMetrics + metrics *ObjectMetastoreMetrics } type SectionKey struct { @@ -109,15 +110,12 @@ func iterTableOfContentsPaths(start, end time.Time) iter.Seq2[string, multitenan } } -func NewObjectMetastore(bucket objstore.Bucket, logger log.Logger, reg prometheus.Registerer) *ObjectMetastore { +func NewObjectMetastore(b objstore.Bucket, logger log.Logger, metrics *ObjectMetastoreMetrics) *ObjectMetastore { store := &ObjectMetastore{ - bucket: bucket, + bucket: bucket.NewXCapBucket(b), parallelism: 64, logger: logger, - metrics: newObjectMetastoreMetrics(), - } - if reg != nil { - store.metrics.register(reg) + metrics: metrics, } return store } @@ -167,29 +165,11 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma sectionsTimer := prometheus.NewTimer(m.metrics.resolvedSectionsTotalDuration) - // Get all metastore paths for the time range - var tablePaths []string - for path := range iterTableOfContentsPaths(start, end) { - tablePaths = append(tablePaths, path) - } - - // Return early if no toc files are found - if len(tablePaths) == 0 { - m.metrics.indexObjectsTotal.Observe(0) - m.metrics.resolvedSectionsTotal.Observe(0) - level.Debug(utillog.WithContext(ctx, m.logger)).Log("msg", "no sections resolved", "reason", "no toc paths") - return nil, nil - } - - // List index objects from all tables concurrently - indexPaths, err := m.listObjectsFromTables(ctx, tablePaths, start, end) + tablePaths, indexPaths, err := m.getIndexes(ctx, start, end) if err != nil { return nil, err } - m.metrics.indexObjectsTotal.Observe(float64(len(indexPaths))) - region.Record(xcap.StatMetastoreIndexObjects.Observe(int64(len(indexPaths)))) - // Return early if no index files are found if len(indexPaths) == 0 { m.metrics.resolvedSectionsTotal.Observe(0) @@ -462,36 +442,6 @@ func (m *ObjectMetastore) listStreamsFromObjects(ctx context.Context, paths []st return streamsSlice, nil } -func (m *ObjectMetastore) listStreamIDsFromLogObjects(ctx context.Context, objectPaths []string, predicate streams.RowPredicate) ([][]int64, []int, error) { - streamIDs := make([][]int64, len(objectPaths)) - sections := make([]int, len(objectPaths)) - - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(m.parallelism) - - for idx, objectPath := range objectPaths { - g.Go(func() error { - object, err := dataobj.FromBucket(ctx, m.bucket, objectPath) - if err != nil { - return fmt.Errorf("getting object from bucket: %w", err) - } - - sections[idx] = object.Sections().Count(logs.CheckSection) - streamIDs[idx] = make([]int64, 0, 8) - - return forEachStream(ctx, object, predicate, func(stream streams.Stream) { - streamIDs[idx] = append(streamIDs[idx], stream.ID) - }) - }) - } - - if err := g.Wait(); err != nil { - return nil, nil, err - } - - return streamIDs, sections, nil -} - // getSectionsForStreams reads the section data from matching streams and aggregates them into section descriptors. // This is an exact lookup and includes metadata from the streams in each section: the stream IDs, the min-max timestamps, the number of bytes & number of lines. func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObjects []*dataobj.Object, streamPredicate streams.RowPredicate, start, end time.Time) ([]*DataobjSectionDescriptor, error) { @@ -723,3 +673,174 @@ func dedupeAndSort(objects [][]string) []string { sort.Strings(paths) return paths } + +func (m *ObjectMetastore) GetIndexes(ctx context.Context, start, end time.Time) ([]string, error) { + _, indexPaths, err := m.getIndexes(ctx, start, end) + return indexPaths, err +} + +func (m *ObjectMetastore) getIndexes(ctx context.Context, start, end time.Time) ([]string, []string, error) { + ctx, region := xcap.StartRegion(ctx, "ObjectMetastore.GetIndexes") + defer region.End() + + // Get all metastore paths for the time range + var tablePaths []string + for path := range iterTableOfContentsPaths(start, end) { + tablePaths = append(tablePaths, path) + } + + // Return early if no toc files are found + if len(tablePaths) == 0 { + m.metrics.indexObjectsTotal.Observe(0) + m.metrics.resolvedSectionsTotal.Observe(0) + level.Debug(utillog.WithContext(ctx, m.logger)).Log("msg", "no sections resolved", "reason", "no toc paths") + return nil, nil, nil + } + + // List index objects from all tables concurrently + indexPaths, err := m.listObjectsFromTables(ctx, tablePaths, start, end) + if err != nil { + return tablePaths, nil, err + } + + m.metrics.indexObjectsTotal.Observe(float64(len(indexPaths))) + region.Record(xcap.StatMetastoreIndexObjects.Observe(int64(len(indexPaths)))) + + return tablePaths, indexPaths, nil +} + +func (m *ObjectMetastore) ScanPointers(ctx context.Context, indexPath string, start, end time.Time, matchers []*labels.Matcher) (ArrowRecordBatchReader, error) { + idxObj, err := dataobj.FromBucket(ctx, m.bucket, indexPath) + if err != nil { + return nil, fmt.Errorf("prepare obj %s: %w", indexPath, err) + } + + return &scanPointers{ + obj: idxObj, + // TODO(ivkalita): this conversion happens for every index file, can be improved + streamPredicate: streamPredicateFromMatchers(start, end, matchers...), + sStart: scalar.NewTimestampScalar(arrow.Timestamp(start.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns), + sEnd: scalar.NewTimestampScalar(arrow.Timestamp(end.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns), + }, nil + +} + +func (m *ObjectMetastore) CollectScanPointersResult(ctx context.Context, reader ArrowRecordBatchReader) ([]*DataobjSectionDescriptor, error) { + objectSectionDescriptors := make(map[SectionKey]*DataobjSectionDescriptor) + for { + rec, err := reader.Read(ctx) + if err != nil && !errors.Is(err, io.EOF) { + level.Warn(m.logger).Log( + "msg", "error during execution", + "err", err, + ) + return nil, err + } + + if rec != nil && rec.NumRows() > 0 { + if err := addSectionDescriptors(rec, objectSectionDescriptors); err != nil { + return nil, err + } + } + + if errors.Is(err, io.EOF) { + break + } + } + + descriptors := make([]*DataobjSectionDescriptor, 0, len(objectSectionDescriptors)) + for _, s := range objectSectionDescriptors { + descriptors = append(descriptors, s) + } + + return descriptors, nil +} + +func addSectionDescriptors(rec arrow.RecordBatch, result map[SectionKey]*DataobjSectionDescriptor) error { + numRows := int(rec.NumRows()) + buf := make([]pointers.SectionPointer, numRows) + schema := rec.Schema() + for fIdx := range schema.Fields() { + field := schema.Field(fIdx) + col := rec.Column(fIdx) + switch field.Name { + case "path.path.utf8": + values := col.(*array.String) + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].Path = values.Value(rIdx) + } + case "section.int64": + values := col.(*array.Int64) + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].Section = values.Value(rIdx) + } + case "stream_id.int64": + values := col.(*array.Int64) + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].StreamID = values.Value(rIdx) + } + case "stream_id_ref.int64": + values := col.(*array.Int64) + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].StreamIDRef = values.Value(rIdx) + } + case "min_timestamp.timestamp": + values := col.(*array.Timestamp) + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].StartTs = time.Unix(0, int64(values.Value(rIdx))) + } + case "max_timestamp.timestamp": + values := col.(*array.Timestamp) + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].EndTs = time.Unix(0, int64(values.Value(rIdx))) + } + case "row_count.int64": + values := col.(*array.Int64) + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].LineCount = values.Value(rIdx) + } + case "uncompressed_size.int64": + values := col.(*array.Int64) + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].UncompressedSize = values.Value(rIdx) + } + default: + continue + } + } + + for _, ptr := range buf { + key := SectionKey{ObjectPath: ptr.Path, SectionIdx: ptr.Section} + existing, ok := result[key] + if !ok { + result[key] = NewSectionDescriptor(ptr) + continue + } + existing.Merge(ptr) + } + return nil +} diff --git a/pkg/dataobj/metastore/object_bench_test.go b/pkg/dataobj/metastore/object_bench_test.go index 1923865976a08..79c43836e56de 100644 --- a/pkg/dataobj/metastore/object_bench_test.go +++ b/pkg/dataobj/metastore/object_bench_test.go @@ -47,7 +47,7 @@ func benchmarkReadSections(b *testing.B, bm readSectionsBenchmarkParams) { bucket := objstore.NewInMemBucket() objUploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, log.NewNopLogger()) - require.NoError(b, objUploader.RegisterMetrics(prometheus.NewPedanticRegistry())) + require.NoError(b, objUploader.RegisterMetrics(prometheus.DefaultRegisterer)) metastoreTocWriter := NewTableOfContentsWriter(bucket, log.NewNopLogger()) @@ -117,7 +117,7 @@ func benchmarkReadSections(b *testing.B, bm readSectionsBenchmarkParams) { } // Create the metastore instance - mstore := NewObjectMetastore(bucket, log.NewNopLogger(), nil) + mstore := NewObjectMetastore(bucket, log.NewNopLogger(), NewObjectMetastoreMetrics(prometheus.DefaultRegisterer)) // Prepare benchmark parameters benchCtx := user.InjectOrgID(ctx, tenantID) @@ -220,7 +220,7 @@ func BenchmarkSectionsForPredicateMatchers(b *testing.B) { bucket := objstore.NewInMemBucket() objUploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, log.NewNopLogger()) - require.NoError(b, objUploader.RegisterMetrics(prometheus.NewPedanticRegistry())) + require.NoError(b, objUploader.RegisterMetrics(prometheus.DefaultRegisterer)) path, err := objUploader.Upload(context.Background(), obj) require.NoError(b, err) @@ -229,7 +229,7 @@ func BenchmarkSectionsForPredicateMatchers(b *testing.B) { err = metastoreTocWriter.WriteEntry(context.Background(), path, timeRanges) require.NoError(b, err) - mstore := NewObjectMetastore(bucket, log.NewNopLogger(), prometheus.NewPedanticRegistry()) + mstore := NewObjectMetastore(bucket, log.NewNopLogger(), NewObjectMetastoreMetrics(prometheus.DefaultRegisterer)) matchers := []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), diff --git a/pkg/dataobj/metastore/object_test.go b/pkg/dataobj/metastore/object_test.go index 00ba99ee7ce82..3f53af90d1ef1 100644 --- a/pkg/dataobj/metastore/object_test.go +++ b/pkg/dataobj/metastore/object_test.go @@ -275,7 +275,7 @@ func TestSectionsForStreamMatchers(t *testing.T) { err = metastoreTocWriter.WriteEntry(context.Background(), path, timeRanges) require.NoError(t, err) - mstore := NewObjectMetastore(bucket, log.NewNopLogger(), prometheus.NewPedanticRegistry()) + mstore := NewObjectMetastore(bucket, log.NewNopLogger(), NewObjectMetastoreMetrics(prometheus.DefaultRegisterer)) tests := []struct { name string @@ -414,7 +414,7 @@ func TestSectionsForPredicateMatchers(t *testing.T) { err = metastoreTocWriter.WriteEntry(context.Background(), path, timeRanges) require.NoError(t, err) - mstore := NewObjectMetastore(bucket, log.NewNopLogger(), prometheus.NewPedanticRegistry()) + mstore := NewObjectMetastore(bucket, log.NewNopLogger(), NewObjectMetastoreMetrics(prometheus.DefaultRegisterer)) tests := []struct { name string @@ -489,7 +489,7 @@ func queryMetastore(t *testing.T, tenant string, mfunc func(context.Context, tim builder.addStreamAndFlush(tenant, stream) } - mstore := NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil) + mstore := NewObjectMetastore(builder.bucket, log.NewNopLogger(), NewObjectMetastoreMetrics(prometheus.DefaultRegisterer)) defer func() { require.NoError(t, mstore.bucket.Close()) }() diff --git a/pkg/dataobj/metastore/scan_pointers.go b/pkg/dataobj/metastore/scan_pointers.go new file mode 100644 index 0000000000000..9d8d64f8d2a27 --- /dev/null +++ b/pkg/dataobj/metastore/scan_pointers.go @@ -0,0 +1,211 @@ +package metastore + +import ( + "context" + "errors" + "fmt" + "io" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/arrow/scalar" + "github.com/grafana/dskit/user" + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" + "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" + "github.com/grafana/loki/v3/pkg/xcap" +) + +type scanPointers struct { + region *xcap.Region + obj *dataobj.Object + streamPredicate streams.RowPredicate + + sStart *scalar.Timestamp + sEnd *scalar.Timestamp + + initialized bool + matchingStreamIDs []scalar.Scalar + pointersSections []*dataobj.Section + pointersSectionIdx int + pointersReader *pointers.Reader + cols []*pointers.Column +} + +func (s *scanPointers) Read(ctx context.Context) (arrow.RecordBatch, error) { + if err := s.init(ctx); err != nil { + return nil, err + } + + ctx = xcap.ContextWithRegion(ctx, s.region) + for { + rec, err := s.pointersReader.Read(ctx, 128) + if err != nil && !errors.Is(err, io.EOF) { + return nil, err + } else if (rec == nil || rec.NumRows() == 0) && errors.Is(err, io.EOF) { + // the section is fully read, proceed to the next one, continue the iteration so we read from the next section + err := s.prepareForNextSection(ctx) + if err != nil { + return nil, err + } + continue + } + + return rec, nil + } +} + +func (s *scanPointers) init(ctx context.Context) error { + if s.initialized { + return nil + } + + if s.streamPredicate == nil { + return io.EOF + } + + targetTenant, err := user.ExtractOrgID(ctx) + if err != nil { + return fmt.Errorf("extracting org ID: %w", err) + } + + for _, section := range s.obj.Sections().Filter(pointers.CheckSection) { + if section.Tenant != targetTenant { + continue + } + + s.pointersSections = append(s.pointersSections, section) + } + + if len(s.pointersSections) == 0 { + return io.EOF + } + + // find stream ids that satisfy the predicate and start/end + s.matchingStreamIDs, err = s.findMatchingStreamIDs(ctx) + if err != nil { + return fmt.Errorf("creating matching stream ids: %w", err) + } + + if s.matchingStreamIDs == nil { + return io.EOF + } + + s.pointersSectionIdx = -1 + err = s.prepareForNextSection(ctx) + if err != nil { + return err + } + + s.initialized = true + + return nil +} + +func (s *scanPointers) findMatchingStreamIDs(ctx context.Context) ([]scalar.Scalar, error) { + //TODO(ivkalita): implement using columnar reader, the main complexity is in predicate mapping + var matchingStreamIDs []scalar.Scalar + err := forEachStream(ctx, s.obj, s.streamPredicate, func(stream streams.Stream) { + matchingStreamIDs = append(matchingStreamIDs, scalar.NewInt64Scalar(stream.ID)) + }) + if err != nil { + return nil, fmt.Errorf("error iterating streams: %v", err) + } + + return matchingStreamIDs, nil +} + +func (s *scanPointers) prepareForNextSection(ctx context.Context) error { + for { + skip, err := s.prepareForNextSectionOrSkip(ctx) + if err != nil { + return err + } + if skip { + continue + } + return nil + } +} + +func (s *scanPointers) prepareForNextSectionOrSkip(ctx context.Context) (bool, error) { + s.pointersSectionIdx++ + if s.pointersSectionIdx == len(s.pointersSections) { + // no more pointers sections to read + return false, io.EOF + } + + if s.pointersReader != nil { + _ = s.pointersReader.Close() + } + + sec, err := pointers.Open(ctx, s.pointersSections[s.pointersSectionIdx]) + if err != nil { + return false, fmt.Errorf("opening section: %w", err) + } + + cols, err := findPointersColumnsByTypes( + sec.Columns(), + pointers.ColumnTypePath, + pointers.ColumnTypeSection, + pointers.ColumnTypeStreamID, + pointers.ColumnTypeStreamIDRef, + pointers.ColumnTypeMinTimestamp, + pointers.ColumnTypeMaxTimestamp, + pointers.ColumnTypeRowCount, + pointers.ColumnTypeUncompressedSize, + ) + if err != nil { + return false, fmt.Errorf("finding pointers columns: %w", err) + } + + var ( + colStreamID *pointers.Column + colMinTimestamp *pointers.Column + colMaxTimestamp *pointers.Column + ) + + for _, c := range cols { + if c.Type == pointers.ColumnTypeStreamID { + colStreamID = c + } + if c.Type == pointers.ColumnTypeMinTimestamp { + colMinTimestamp = c + } + if c.Type == pointers.ColumnTypeMaxTimestamp { + colMaxTimestamp = c + } + if colStreamID != nil && colMinTimestamp != nil && colMaxTimestamp != nil { + break + } + } + + if colStreamID == nil || colMinTimestamp == nil || colMaxTimestamp == nil { + // the section has no rows with stream-based indices and can be ignored completely + return true, nil + } + + s.cols = cols + + s.pointersReader = pointers.NewReader(pointers.ReaderOptions{ + Columns: s.cols, + Predicates: []pointers.Predicate{ + pointers.WhereTimeRangeOverlapsWith(colMinTimestamp, colMaxTimestamp, s.sStart, s.sEnd), + pointers.InPredicate{ + Column: colStreamID, + Values: s.matchingStreamIDs, + }, + }, + Allocator: memory.DefaultAllocator, + }) + + return false, nil +} + +func (s *scanPointers) Close() { + if s.pointersReader != nil { + _ = s.pointersReader.Close() + } +} + +var _ ArrowRecordBatchReader = (*scanPointers)(nil) diff --git a/pkg/engine/basic_engine.go b/pkg/engine/basic_engine.go index 2ba6bc4bccea2..bb7d00f11e6f0 100644 --- a/pkg/engine/basic_engine.go +++ b/pkg/engine/basic_engine.go @@ -35,16 +35,7 @@ var ErrNotSupported = errors.New("feature not supported in new query engine") // NewBasic creates a new instance of the basic query engine that implements the // [logql.Engine] interface. The basic engine executes plans sequentially with // no local or distributed parallelism. -func NewBasic(cfg ExecutorConfig, metastoreCfg metastore.Config, bucket objstore.Bucket, limits logql.Limits, reg prometheus.Registerer, logger log.Logger) *Basic { - var ms metastore.Metastore - if bucket != nil { - indexBucket := bucket - if metastoreCfg.IndexStoragePrefix != "" { - indexBucket = objstore.NewPrefixedBucket(bucket, metastoreCfg.IndexStoragePrefix) - } - ms = metastore.NewObjectMetastore(indexBucket, logger, reg) - } - +func NewBasic(cfg ExecutorConfig, ms metastore.Metastore, bucket objstore.Bucket, limits logql.Limits, reg prometheus.Registerer, logger log.Logger) *Basic { if cfg.BatchSize <= 0 { panic(fmt.Sprintf("invalid batch size for query engine. must be greater than 0, got %d", cfg.BatchSize)) } @@ -204,7 +195,7 @@ func (e *Basic) Execute(ctx context.Context, params logql.Params) (logqlmodel.Re Bucket: e.bucket, } - pipeline := executor.Run(ctx, cfg, physicalPlan, logger) + pipeline := executor.Run(ctx, cfg, physicalPlan, logger, e.metastore) defer pipeline.Close() var builder ResultBuilder diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index b40b4f1ea2e13..4f62211b6dd21 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -14,7 +14,6 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" - "github.com/thanos-io/objstore" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -29,7 +28,6 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" - "github.com/grafana/loki/v3/pkg/storage/bucket" "github.com/grafana/loki/v3/pkg/util/httpreq" util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/rangeio" @@ -58,12 +56,16 @@ type ExecutorConfig struct { // RangeConfig determines how to optimize range reads in the V2 engine. RangeConfig rangeio.Config `yaml:"range_reads" category:"experimental" doc:"description=Configures how to read byte ranges from object storage when using the V2 engine."` + + // AheadOfTimeCatalogLookupsEnabled enables ahead of time catalog lookups + AheadOfTimeCatalogLookupsEnabled bool `yaml:"ahead_of_time_catalog_lookups_enabled" category:"experimental"` } func (cfg *ExecutorConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.IntVar(&cfg.BatchSize, prefix+"batch-size", 100, "Experimental: Batch size of the next generation query engine.") f.IntVar(&cfg.MergePrefetchCount, prefix+"merge-prefetch-count", 0, "Experimental: The number of inputs that are prefetched simultaneously by any Merge node. A value of 0 means that only the currently processed input is prefetched, 1 means that only the next input is prefetched, and so on. A negative value means that all inputs are be prefetched in parallel.") cfg.RangeConfig.RegisterFlags(prefix+"range-reads.", f) + f.BoolVar(&cfg.AheadOfTimeCatalogLookupsEnabled, prefix+"ahead-of-time-catalog-lookups-enabled", false, "Experimental: Enable ahead of time catalog lookups.") } // Params holds parameters for constructing a new [Engine]. @@ -71,12 +73,12 @@ type Params struct { Logger log.Logger // Logger for optional log messages. Registerer prometheus.Registerer // Registerer for optional metrics. - Config ExecutorConfig // Config for the Engine. - MetastoreConfig metastore.Config // Config for the Metastore. + Config ExecutorConfig // Config for the Engine. + + Scheduler *Scheduler // Scheduler to manage the execution of tasks. + Limits logql.Limits // Limits to apply to engine queries. - Scheduler *Scheduler // Scheduler to manage the execution of tasks. - Bucket objstore.Bucket // Bucket to read stored data from. - Limits logql.Limits // Limits to apply to engine queries. + Metastore metastore.Metastore } // validate validates p and applies defaults. @@ -102,11 +104,12 @@ type Engine struct { metrics *metrics rangeConfig rangeio.Config - scheduler *Scheduler // Scheduler to manage the execution of tasks. - bucket objstore.Bucket // Bucket to read stored data from. - limits logql.Limits // Limits to apply to engine queries. + scheduler *Scheduler // Scheduler to manage the execution of tasks. + limits logql.Limits // Limits to apply to engine queries. - metastore metastore.Metastore + metastore metastore.Metastore + metastorePlanner physical.MetastorePlanner + aheadOfTimeCatalogLookupsEnabled bool } // New creates a new Engine. @@ -120,18 +123,22 @@ func New(params Params) (*Engine, error) { metrics: newMetrics(params.Registerer), rangeConfig: params.Config.RangeConfig, - scheduler: params.Scheduler, - bucket: bucket.NewXCapBucket(params.Bucket), - limits: params.Limits, - } + scheduler: params.Scheduler, + limits: params.Limits, + aheadOfTimeCatalogLookupsEnabled: params.Config.AheadOfTimeCatalogLookupsEnabled, - if e.bucket != nil { - indexBucket := e.bucket - if params.MetastoreConfig.IndexStoragePrefix != "" { - indexBucket = objstore.NewPrefixedBucket(e.bucket, params.MetastoreConfig.IndexStoragePrefix) - } - e.metastore = metastore.NewObjectMetastore(indexBucket, e.logger, params.Registerer) + metastore: params.Metastore, + metastorePlanner: physical.NewMetastorePlanner(params.Metastore), } + // + //if e.bucket != nil { + // indexBucket := e.bucket + // if params.MetastoreConfig.IndexStoragePrefix != "" { + // indexBucket = objstore.NewPrefixedBucket(e.bucket, params.MetastoreConfig.IndexStoragePrefix) + // } + // e.metastore = metastore.NewObjectMetastore(indexBucket, e.logger, params.Registerer) + // e.metastorePlanner = + //} return e, nil } @@ -305,9 +312,12 @@ func (e *Engine) buildPhysicalPlan(ctx context.Context, logger log.Logger, param region := xcap.RegionFromContext(ctx) timer := prometheus.NewTimer(e.metrics.physicalPlanning) - // TODO(rfratto): To improve the performance of the physical planner, we - // may want to parallelize metastore lookups across scheduled tasks as well. - catalog := physical.NewMetastoreCatalog(ctx, e.metastore) + catalog, err := e.prepareCatalog(ctx, params, logicalPlan) + if err != nil { + level.Warn(logger).Log("msg", "failed to prepare catalog", "err", err) + region.RecordError(err) + return nil, 0, ErrPlanningFailed + } // TODO(rfratto): It feels strange that we need to past the start/end time // to the physical planner. Isn't it already represented by the logical @@ -341,6 +351,64 @@ func (e *Engine) buildPhysicalPlan(ctx context.Context, logger log.Logger, param return physicalPlan, duration, nil } +func (e *Engine) prepareCatalog(ctx context.Context, params logql.Params, logicalPlan *logical.Plan) (physical.Catalog, error) { + start := time.Now() + + if !e.aheadOfTimeCatalogLookupsEnabled { + return physical.NewMetastoreCatalog(ctx, e.metastore), nil + } + + unresolved := physical.NewUnresolvedCatalog() + collectorPlanner := physical.NewPlanner(physical.NewContext(params.Start(), params.End()), unresolved) + if _, err := collectorPlanner.Build(logicalPlan); err != nil { + return nil, fmt.Errorf("collecting catalog requests: %w", err) + } + + resolved, err := unresolved.Resolve(func(req physical.CatalogRequest) (physical.CatalogResponse, error) { + return e.queryMetastore(ctx, req) + }) + if err != nil { + return nil, fmt.Errorf("resolving catalog: %w", err) + } + + level.Info(e.logger).Log("msg", "finished catalog lookups", "duration", time.Since(start).String(), "requests", unresolved.RequestsCount()) + return resolved, nil +} + +func (e *Engine) queryMetastore(ctx context.Context, req physical.CatalogRequest) (physical.CatalogResponse, error) { + physicalPlan, err := e.metastorePlanner.Plan(ctx, req) + if err != nil { + return physical.CatalogResponse{}, fmt.Errorf("planning metastore request: %w", err) + } + + wf, _, err := e.buildWorkflow(ctx, e.logger, physicalPlan) + if err != nil { + return physical.CatalogResponse{}, fmt.Errorf("building workflow: %w", err) + } + + pipeline, err := wf.Run(ctx) + if err != nil { + return physical.CatalogResponse{}, fmt.Errorf("running workflow: %w", err) + } + + // TODO(ivkalita): switch on request type? + + descriptors, err := e.metastore.CollectScanPointersResult(ctx, executor.TranslateEOF(pipeline)) + if err != nil { + return physical.CatalogResponse{}, fmt.Errorf("reading results: %w", err) + } + + filteredDescriptors, err := physical.FilterDescriptorsForShard(req.Shard, descriptors) + if err != nil { + return physical.CatalogResponse{}, err + } + + return physical.CatalogResponse{ + Kind: physical.CatalogRequestKindResolveShardDescriptorsWithShard, + Descriptors: filteredDescriptors, + }, nil +} + // buildWorkflow builds a workflow from the given physical plan. func (e *Engine) buildWorkflow(ctx context.Context, logger log.Logger, physicalPlan *physical.Plan) (*workflow.Workflow, time.Duration, error) { tenantID, err := user.ExtractOrgID(ctx) diff --git a/pkg/engine/internal/executor/executor.go b/pkg/engine/internal/executor/executor.go index 467cf5abd9fc2..c4ca6e2456348 100644 --- a/pkg/engine/internal/executor/executor.go +++ b/pkg/engine/internal/executor/executor.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/otel/attribute" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" @@ -33,7 +34,7 @@ type Config struct { GetExternalInputs func(ctx context.Context, node physical.Node) []Pipeline } -func Run(ctx context.Context, cfg Config, plan *physical.Plan, logger log.Logger) Pipeline { +func Run(ctx context.Context, cfg Config, plan *physical.Plan, logger log.Logger, metastore metastore.Metastore) Pipeline { c := &Context{ plan: plan, batchSize: cfg.BatchSize, @@ -42,6 +43,7 @@ func Run(ctx context.Context, cfg Config, plan *physical.Plan, logger log.Logger logger: logger, evaluator: newExpressionEvaluator(), getExternalInputs: cfg.GetExternalInputs, + metastore: metastore, } if plan == nil { return errorPipeline(ctx, errors.New("plan is nil")) @@ -66,6 +68,8 @@ type Context struct { getExternalInputs func(ctx context.Context, node physical.Node) []Pipeline mergePrefetchCount int + + metastore metastore.Metastore } func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline { @@ -95,6 +99,8 @@ func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline { return newObservedPipeline(c.executeDataObjScan(ctx, n, nodeRegion)) }, inputs) + case *physical.PointersScan: + return newObservedPipeline(c.executePointersScan(ctx, n, nodeRegion)) case *physical.TopK: return newObservedPipeline(c.executeTopK(ctx, n, inputs, nodeRegion)) case *physical.Limit: @@ -208,6 +214,28 @@ func (c *Context) executeDataObjScan(ctx context.Context, node *physical.DataObj return pipeline } +func (c *Context) executePointersScan(ctx context.Context, node *physical.PointersScan, region *xcap.Region) Pipeline { + matchers, err := physical.ExpressionToMatchers(node.Selector, false) + if err != nil { + return errorPipeline(ctx, fmt.Errorf("expression to matchers: %w", err)) + } + + return newLazyPipeline(func(ctx context.Context, inputs []Pipeline) Pipeline { + pipeline, err := newScanPointersPipeline(ctx, scanPointersOptions{ + metastore: c.metastore, + location: string(node.Location), + start: node.Start, + end: node.End, + matchers: matchers, + region: region, + }) + if err != nil { + return errorPipeline(ctx, err) + } + return pipeline + }, nil) +} + func (c *Context) executeTopK(ctx context.Context, topK *physical.TopK, inputs []Pipeline, region *xcap.Region) Pipeline { if len(inputs) == 0 { return emptyPipeline() @@ -336,7 +364,7 @@ func (c *Context) executeScanSet(ctx context.Context, set *physical.ScanSet, _ * // ScanSet typically gets partitioned by the scheduler into multiple scan // nodes. // - // However, for locally testing unpartitioned pipelines, we still supprt + // However, for locally testing unpartitioned pipelines, we still support // running a ScanSet. In this case, we treat internally execute it as a // Merge on top of multiple sequential scans. ctx, mergeRegion := xcap.StartRegion(ctx, physical.NodeTypeMerge.String()) @@ -356,6 +384,12 @@ func (c *Context) executeScanSet(ctx context.Context, set *physical.ScanSet, _ * targets = append(targets, newLazyPipeline(func(_ context.Context, _ []Pipeline) Pipeline { return newObservedPipeline(c.executeDataObjScan(nodeCtx, partition, partitionRegion)) }, nil)) + case physical.ScanTypePointers: + partition := target.PointersScan + + nodeCtx, partitionRegion := startRegionForNode(ctx, partition) + + targets = append(targets, c.executePointersScan(nodeCtx, partition, partitionRegion)) default: return errorPipeline(ctx, fmt.Errorf("unrecognized ScanSet target %s", target.Type)) } @@ -390,6 +424,11 @@ func startRegionForNode(ctx context.Context, n physical.Node) (context.Context, attribute.Int("num_projections", len(n.Projections)), ) + case *physical.PointersScan: + attributes = append(attributes, + attribute.String("location", string(n.Location)), + ) + case *physical.TopK: attributes = append(attributes, attribute.Int("k", n.K), diff --git a/pkg/engine/internal/executor/executor_test.go b/pkg/engine/internal/executor/executor_test.go index 91243a758e894..278b37cd12e40 100644 --- a/pkg/engine/internal/executor/executor_test.go +++ b/pkg/engine/internal/executor/executor_test.go @@ -12,14 +12,14 @@ import ( func TestExecutor(t *testing.T) { t.Run("pipeline fails if plan is nil", func(t *testing.T) { ctx := t.Context() - pipeline := Run(ctx, Config{}, nil, log.NewNopLogger()) + pipeline := Run(ctx, Config{}, nil, log.NewNopLogger(), nil) _, err := pipeline.Read(ctx) require.ErrorContains(t, err, "failed to execute pipeline: plan is nil") }) t.Run("pipeline fails if plan has no root node", func(t *testing.T) { ctx := t.Context() - pipeline := Run(ctx, Config{}, &physical.Plan{}, log.NewNopLogger()) + pipeline := Run(ctx, Config{}, &physical.Plan{}, log.NewNopLogger(), nil) _, err := pipeline.Read(ctx) require.ErrorContains(t, err, "failed to execute pipeline: plan has no root node") }) diff --git a/pkg/engine/internal/executor/metastore.go b/pkg/engine/internal/executor/metastore.go new file mode 100644 index 0000000000000..20b1c7f9f741c --- /dev/null +++ b/pkg/engine/internal/executor/metastore.go @@ -0,0 +1,49 @@ +package executor + +import ( + "context" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" + "github.com/grafana/loki/v3/pkg/xcap" + "github.com/prometheus/prometheus/model/labels" +) + +type metastorePipeline struct { + reader metastore.ArrowRecordBatchReader + region *xcap.Region +} + +func (m *metastorePipeline) Read(ctx context.Context) (arrow.RecordBatch, error) { + rec, err := m.reader.Read(xcap.ContextWithRegion(ctx, m.region)) + // metastore reader returns io.EOF that we translate to executor.EOF + return rec, translateEOF(err, true) +} + +func (m *metastorePipeline) Close() { + if m.region != nil { + m.region.End() + } + m.reader.Close() +} + +var _ Pipeline = (*metastorePipeline)(nil) + +type scanPointersOptions struct { + metastore metastore.Metastore + region *xcap.Region + + location string + start, end time.Time + matchers []*labels.Matcher +} + +func newScanPointersPipeline(ctx context.Context, opts scanPointersOptions) (*metastorePipeline, error) { + reader, err := opts.metastore.ScanPointers(ctx, opts.location, opts.start, opts.end, opts.matchers) + if err != nil { + return nil, translateEOF(err, true) + } + + return &metastorePipeline{reader, opts.region}, nil +} diff --git a/pkg/engine/internal/executor/translate_errors.go b/pkg/engine/internal/executor/translate_errors.go new file mode 100644 index 0000000000000..f7002b96d8432 --- /dev/null +++ b/pkg/engine/internal/executor/translate_errors.go @@ -0,0 +1,44 @@ +package executor + +import ( + "context" + "io" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/pkg/errors" +) + +func TranslateEOF(pipeline Pipeline) Pipeline { + return translateEOFPipeline{pipeline, false} +} + +type translateEOFPipeline struct { + pipeline Pipeline + toInternal bool +} + +func (p translateEOFPipeline) Close() { + p.pipeline.Close() +} + +func (p translateEOFPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) { + rec, err := p.pipeline.Read(ctx) + return rec, translateEOF(err, p.toInternal) +} + +func translateEOF(err error, toInternal bool) error { + if toInternal { + // io.EOF to executor.EOF + if errors.Is(err, io.EOF) { + err = EOF + } + } + if !toInternal { + // executor.EOF to EOF + if errors.Is(err, EOF) { + err = io.EOF + } + } + + return err +} diff --git a/pkg/engine/internal/planner/physical/catalog.go b/pkg/engine/internal/planner/physical/catalog.go index 17d1ba341e410..ea07e58ac9716 100644 --- a/pkg/engine/internal/planner/physical/catalog.go +++ b/pkg/engine/internal/planner/physical/catalog.go @@ -4,23 +4,12 @@ import ( "context" "errors" "fmt" + "strings" "time" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/dataobj/metastore" - "github.com/grafana/loki/v3/pkg/engine/internal/types" -) - -var ( - binOpToMatchTypeMapping = map[types.BinaryOp]labels.MatchType{ - types.BinaryOpEq: labels.MatchEqual, - types.BinaryOpNeq: labels.MatchNotEqual, - types.BinaryOpMatchRe: labels.MatchRegexp, - types.BinaryOpNotMatchRe: labels.MatchNotRegexp, - } - - noShard = ShardInfo{Shard: 0, Of: 1} ) type ShardInfo struct { @@ -32,7 +21,7 @@ func (s ShardInfo) String() string { return fmt.Sprintf("%d_of_%d", s.Shard, s.Of) } -// Start and End are inclusive. +// TimeRange contains Start and End that are inclusive. type TimeRange struct { Start time.Time End time.Time @@ -80,12 +69,11 @@ type FilteredShardDescriptor struct { // providing this information (e.g. pg_catalog, ...) whereas in Loki there // is the Metastore. type Catalog interface { - // ResolveShardDescriptors returns a list of + // ResolveShardDescriptorsWithShard returns a list of // FilteredShardDescriptor objects, which each include: // a data object path, a list of stream IDs for // each data object path, a list of sections for // each data object path, and a time range. - ResolveShardDescriptors(Expression, time.Time, time.Time) ([]FilteredShardDescriptor, error) ResolveShardDescriptorsWithShard(Expression, []Expression, ShardInfo, time.Time, time.Time) ([]FilteredShardDescriptor, error) } @@ -103,14 +91,6 @@ func NewMetastoreCatalog(ctx context.Context, ms metastore.Metastore) *Metastore } } -// ResolveShardDescriptors resolves an array of FilteredShardDescriptor -// objects based on a given [Expression]. The expression is required -// to be a (tree of) [BinaryExpression] with a [ColumnExpression] -// on the left and a [LiteralExpression] on the right. -func (c *MetastoreCatalog) ResolveShardDescriptors(selector Expression, from, through time.Time) ([]FilteredShardDescriptor, error) { - return c.ResolveShardDescriptorsWithShard(selector, nil, noShard, from, through) -} - func (c *MetastoreCatalog) ResolveShardDescriptorsWithShard(selector Expression, predicates []Expression, shard ShardInfo, from, through time.Time) ([]FilteredShardDescriptor, error) { if c.metastore == nil { return nil, errors.New("no metastore to resolve objects") @@ -125,14 +105,14 @@ func (c *MetastoreCatalog) resolveShardDescriptorsWithIndex(selector Expression, return nil, errors.New("no metastore to resolve objects") } - matchers, err := expressionToMatchers(selector, false) + matchers, err := ExpressionToMatchers(selector, false) if err != nil { return nil, fmt.Errorf("failed to convert selector expression into matchers: %w", err) } predicateMatchers := make([]*labels.Matcher, 0, len(predicates)) for _, predicate := range predicates { - matchers, err := expressionToMatchers(predicate, true) + matchers, err := ExpressionToMatchers(predicate, true) if err != nil { // Not all predicates are supported by the metastore, so some will be skipped continue @@ -145,110 +125,175 @@ func (c *MetastoreCatalog) resolveShardDescriptorsWithIndex(selector Expression, return nil, fmt.Errorf("failed to resolve data object sections: %w", err) } - return filterDescriptorsForShard(shard, sectionDescriptors) + return FilterDescriptorsForShard(shard, sectionDescriptors) } -// filterDescriptorsForShard filters the section descriptors for a given shard. -// It returns the locations, streams, and sections for the shard. -// TODO: Improve filtering: this method could be improved because it doesn't resolve the stream IDs to sections, even though this information is available. Instead, it resolves streamIDs to the whole object. -func filterDescriptorsForShard(shard ShardInfo, sectionDescriptors []*metastore.DataobjSectionDescriptor) ([]FilteredShardDescriptor, error) { - filteredDescriptors := make([]FilteredShardDescriptor, 0, len(sectionDescriptors)) +// CatalogRequestKind describes what metadata the planner needs. +type CatalogRequestKind uint8 - for _, desc := range sectionDescriptors { - filteredDescriptor := FilteredShardDescriptor{} - filteredDescriptor.Location = DataObjLocation(desc.ObjectPath) +const ( + // CatalogRequestKindResolveShardDescriptorsWithShard requests section descriptors + CatalogRequestKindResolveShardDescriptorsWithShard CatalogRequestKind = iota +) - if int(desc.SectionIdx)%int(shard.Of) == int(shard.Shard) { - filteredDescriptor.Streams = desc.StreamIDs - filteredDescriptor.Sections = []int{int(desc.SectionIdx)} - tr, err := newTimeRange(desc.Start, desc.End) - if err != nil { - return nil, err - } - filteredDescriptor.TimeRange = tr - filteredDescriptors = append(filteredDescriptors, filteredDescriptor) - } +// CatalogRequest captures catalog request required to finish physical planning. +type CatalogRequest struct { + Kind CatalogRequestKind + + Selector Expression + Predicates []Expression + + Shard ShardInfo + + From time.Time + Through time.Time +} + +// CatalogResponse contains catalog response. +type CatalogResponse struct { + Kind CatalogRequestKind + + Descriptors []FilteredShardDescriptor +} + +type catalogRequestKey struct { + kind CatalogRequestKind + selector string + predicates string + from int64 + through int64 + shard uint32 + shardOf uint32 +} + +func newCatalogRequestKey(req CatalogRequest) catalogRequestKey { + return catalogRequestKey{ + kind: req.Kind, + selector: expressionSignature(req.Selector), + predicates: expressionsSignature(req.Predicates), + from: req.From.UnixNano(), + through: req.Through.UnixNano(), + shard: req.Shard.Shard, + shardOf: req.Shard.Of, } +} - return filteredDescriptors, nil +func expressionsSignature(exprs []Expression) string { + if len(exprs) == 0 { + return "" + } + parts := make([]string, len(exprs)) + for i, expr := range exprs { + parts[i] = expressionSignature(expr) + } + return strings.Join(parts, ";") } -// expressionToMatchers converts a selector expression to a list of matchers. -// The selector expression is required to be a (tree of) [BinaryExpression] -// with a [ColumnExpression] on the left and a [LiteralExpression] on the right. -// It optionally supports ambiguous column references. Non-ambiguous column references are label matchers. -func expressionToMatchers(selector Expression, allowAmbiguousColumnRefs bool) ([]*labels.Matcher, error) { - if selector == nil { - return nil, nil +func expressionSignature(expr Expression) string { + if expr == nil { + return "" } + return expr.String() +} - switch expr := selector.(type) { - case *BinaryExpr: - switch expr.Op { - case types.BinaryOpAnd: - lhs, err := expressionToMatchers(expr.Left, allowAmbiguousColumnRefs) - if err != nil { - return nil, err - } - rhs, err := expressionToMatchers(expr.Right, allowAmbiguousColumnRefs) - if err != nil { - return nil, err - } - return append(lhs, rhs...), nil - case types.BinaryOpEq, types.BinaryOpNeq, types.BinaryOpMatchRe, types.BinaryOpNotMatchRe: - op, err := convertBinaryOp(expr.Op) - if err != nil { - return nil, err - } - name, err := convertColumnRef(expr.Left, allowAmbiguousColumnRefs) - if err != nil { - return nil, err - } - value, err := convertLiteralToString(expr.Right) - if err != nil { - return nil, err - } - lhs, err := labels.NewMatcher(op, name, value) - if err != nil { - return nil, err - } - return []*labels.Matcher{lhs}, nil - default: - return nil, fmt.Errorf("invalid binary expression in stream selector expression: %v", expr.Op.String()) - } - default: - return nil, fmt.Errorf("invalid expression type in stream selector expression: %T", expr) +type UnresolvedCatalog struct { + requests map[catalogRequestKey]CatalogRequest +} + +func NewUnresolvedCatalog() UnresolvedCatalog { + return UnresolvedCatalog{ + requests: make(map[catalogRequestKey]CatalogRequest), } } -func convertLiteralToString(expr Expression) (string, error) { - l, ok := expr.(*LiteralExpr) - if !ok { - return "", fmt.Errorf("expected literal expression, got %T", expr) +func (c UnresolvedCatalog) ResolveShardDescriptorsWithShard(selector Expression, predicates []Expression, shard ShardInfo, from, through time.Time) ([]FilteredShardDescriptor, error) { + req := CatalogRequest{ + Kind: CatalogRequestKindResolveShardDescriptorsWithShard, + Selector: cloneExpressions([]Expression{selector})[0], + Predicates: cloneExpressions(predicates), + Shard: shard, + From: from, + Through: through, } - if l.ValueType() != types.Loki.String { - return "", fmt.Errorf("literal type is not a string, got %v", l.ValueType()) + + key := newCatalogRequestKey(req) + if _, ok := c.requests[key]; !ok { + c.requests[key] = req } - return l.Value().(string), nil + return nil, nil } -func convertColumnRef(expr Expression, allowAmbiguousColumnRefs bool) (string, error) { - ref, ok := expr.(*ColumnExpr) - if !ok { - return "", fmt.Errorf("expected column expression, got %T", expr) +func (c UnresolvedCatalog) RequestsCount() int { + return len(c.requests) +} + +func (c UnresolvedCatalog) Resolve(resolve func(CatalogRequest) (CatalogResponse, error)) (ResolvedCatalog, error) { + resolved := ResolvedCatalog{ + responses: make(map[catalogRequestKey]CatalogResponse), } - if !allowAmbiguousColumnRefs && ref.Ref.Type != types.ColumnTypeLabel { - return "", fmt.Errorf("column type is not a label, got %v", ref.Ref.Type) + for key, req := range c.requests { + resp, err := resolve(req) + if err != nil { + return resolved, fmt.Errorf("failed to resolve catalog response: %w", err) + } + resolved.responses[key] = resp } - return ref.Ref.Column, nil + + return resolved, nil +} + +type ResolvedCatalog struct { + responses map[catalogRequestKey]CatalogResponse } -func convertBinaryOp(t types.BinaryOp) (labels.MatchType, error) { - ty, ok := binOpToMatchTypeMapping[t] +func (c ResolvedCatalog) ResolveShardDescriptorsWithShard(selector Expression, predicates []Expression, shard ShardInfo, from, through time.Time) ([]FilteredShardDescriptor, error) { + req := CatalogRequest{ + Kind: CatalogRequestKindResolveShardDescriptorsWithShard, + Selector: selector, + Predicates: predicates, + Shard: shard, + From: from, + Through: through, + } + reqKey := newCatalogRequestKey(req) + resp, ok := c.responses[reqKey] if !ok { - return -1, fmt.Errorf("invalid binary operator for matcher: %v", t) + return nil, fmt.Errorf("catalog response missing for request %+v", reqKey) + } + + switch resp.Kind { + case CatalogRequestKindResolveShardDescriptorsWithShard: + return resp.Descriptors, nil + default: + return nil, fmt.Errorf("unsupported response kind %d", resp.Kind) + } +} + +// FilterDescriptorsForShard filters the section descriptors for a given shard. +// It returns the locations, streams, and sections for the shard. +// TODO: Improve filtering: this method could be improved because it doesn't resolve the stream IDs to sections, even though this information is available. Instead, it resolves streamIDs to the whole object. +func FilterDescriptorsForShard(shard ShardInfo, sectionDescriptors []*metastore.DataobjSectionDescriptor) ([]FilteredShardDescriptor, error) { + filteredDescriptors := make([]FilteredShardDescriptor, 0, len(sectionDescriptors)) + + for _, desc := range sectionDescriptors { + filteredDescriptor := FilteredShardDescriptor{} + filteredDescriptor.Location = DataObjLocation(desc.ObjectPath) + + if int(desc.SectionIdx)%int(shard.Of) == int(shard.Shard) { + filteredDescriptor.Streams = desc.StreamIDs + filteredDescriptor.Sections = []int{int(desc.SectionIdx)} + tr, err := newTimeRange(desc.Start, desc.End) + if err != nil { + return nil, err + } + filteredDescriptor.TimeRange = tr + filteredDescriptors = append(filteredDescriptors, filteredDescriptor) + } } - return ty, nil + + return filteredDescriptors, nil } var _ Catalog = (*MetastoreCatalog)(nil) +var _ Catalog = UnresolvedCatalog{} +var _ Catalog = ResolvedCatalog{} diff --git a/pkg/engine/internal/planner/physical/catalog_planner.go b/pkg/engine/internal/planner/physical/catalog_planner.go new file mode 100644 index 0000000000000..09404f30966b5 --- /dev/null +++ b/pkg/engine/internal/planner/physical/catalog_planner.go @@ -0,0 +1,78 @@ +package physical + +import ( + "context" + "fmt" + + "github.com/grafana/loki/v3/pkg/dataobj/metastore" + "github.com/grafana/loki/v3/pkg/engine/internal/util/dag" + "github.com/oklog/ulid/v2" +) + +type MetastorePlanner struct { + metastore metastore.Metastore +} + +func NewMetastorePlanner(metastore metastore.Metastore) MetastorePlanner { + return MetastorePlanner{ + metastore: metastore, + } +} + +func (p MetastorePlanner) Plan(ctx context.Context, req CatalogRequest) (*Plan, error) { + plan := &Plan{ + graph: dag.Graph[Node]{}, + } + + indexPaths, err := p.metastore.GetIndexes(ctx, req.From, req.Through) + if err != nil { + return nil, fmt.Errorf("metastore plan failed: %w", err) + } + + parallelize := &Parallelize{ + NodeID: ulid.Make(), + } + plan.graph.Add(parallelize) + + // TODO(ivkalita): I'm adding a nested parallelize node for workflow builder to actually split the ScanSet to + // multiple tasks. Obviously, it's a hack, we need to support it properly. + noop := &Parallelize{ + NodeID: ulid.Make(), + } + plan.graph.Add(noop) + + scanSet := &ScanSet{ + NodeID: ulid.Make(), + } + plan.graph.Add(scanSet) + + for _, indexPath := range indexPaths { + scanSet.Targets = append(scanSet.Targets, &ScanTarget{ + Type: ScanTypePointers, + PointersScan: &PointersScan{ + NodeID: ulid.Make(), + Location: DataObjLocation(indexPath), + + Selector: req.Selector, + + Start: req.From, + End: req.Through, + + MaxTimeRange: TimeRange{ + Start: req.From, + End: req.Through, + }, + }, + }) + } + + if err := plan.graph.AddEdge(dag.Edge[Node]{Parent: parallelize, Child: noop}); err != nil { + return nil, err + } + + if err := plan.graph.AddEdge(dag.Edge[Node]{Parent: noop, Child: scanSet}); err != nil { + return nil, err + } + + return plan, nil +} diff --git a/pkg/engine/internal/planner/physical/catalog_test.go b/pkg/engine/internal/planner/physical/catalog_test.go index 9aabcc9bae2fb..7a3cc52306c8c 100644 --- a/pkg/engine/internal/planner/physical/catalog_test.go +++ b/pkg/engine/internal/planner/physical/catalog_test.go @@ -1,6 +1,8 @@ package physical import ( + "context" + "fmt" "testing" "time" @@ -11,162 +13,6 @@ import ( "github.com/grafana/loki/v3/pkg/engine/internal/types" ) -func TestCatalog_ConvertLiteral(t *testing.T) { - tests := []struct { - expr Expression - want string - wantErr bool - }{ - { - expr: NewLiteral("foo"), - want: "foo", - }, - { - expr: NewLiteral(false), - wantErr: true, - }, - { - expr: NewLiteral(int64(123)), - wantErr: true, - }, - { - expr: NewLiteral(types.Timestamp(time.Now().UnixNano())), - wantErr: true, - }, - { - expr: NewLiteral(types.Duration(time.Hour.Nanoseconds())), - wantErr: true, - }, - { - expr: newColumnExpr("foo", types.ColumnTypeLabel), - wantErr: true, - }, - { - expr: &BinaryExpr{ - Left: newColumnExpr("foo", types.ColumnTypeLabel), - Right: NewLiteral("foo"), - Op: types.BinaryOpEq, - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.expr.String(), func(t *testing.T) { - got, err := convertLiteralToString(tt.expr) - if tt.wantErr { - require.Error(t, err) - t.Log(err) - } else { - require.NoError(t, err) - require.Equal(t, tt.want, got) - } - }) - } -} - -func TestCatalog_ConvertColumnRef(t *testing.T) { - tests := []struct { - expr Expression - want string - wantErr bool - }{ - { - expr: newColumnExpr("foo", types.ColumnTypeLabel), - want: "foo", - }, - { - expr: newColumnExpr("foo", types.ColumnTypeAmbiguous), - wantErr: true, - }, - { - expr: newColumnExpr("foo", types.ColumnTypeBuiltin), - wantErr: true, - }, - { - expr: NewLiteral(false), - wantErr: true, - }, - { - expr: &BinaryExpr{ - Left: newColumnExpr("foo", types.ColumnTypeLabel), - Right: NewLiteral("foo"), - Op: types.BinaryOpEq, - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.expr.String(), func(t *testing.T) { - got, err := convertColumnRef(tt.expr, false) - if tt.wantErr { - require.Error(t, err) - t.Log(err) - } else { - require.NoError(t, err) - require.Equal(t, tt.want, got) - } - }) - } -} - -func TestCatalog_ExpressionToMatchers(t *testing.T) { - tests := []struct { - expr Expression - want []*labels.Matcher - wantErr bool - }{ - { - expr: newColumnExpr("foo", types.ColumnTypeLabel), - wantErr: true, - }, - { - expr: NewLiteral("foo"), - wantErr: true, - }, - { - expr: &BinaryExpr{ - Left: newColumnExpr("foo", types.ColumnTypeLabel), - Right: NewLiteral("bar"), - Op: types.BinaryOpEq, - }, - want: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), - }, - }, - { - expr: &BinaryExpr{ - Left: &BinaryExpr{ - Left: newColumnExpr("foo", types.ColumnTypeLabel), - Right: NewLiteral("bar"), - Op: types.BinaryOpEq, - }, - Right: &BinaryExpr{ - Left: newColumnExpr("bar", types.ColumnTypeLabel), - Right: NewLiteral("baz"), - Op: types.BinaryOpNeq, - }, - Op: types.BinaryOpAnd, - }, - want: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), - labels.MustNewMatcher(labels.MatchNotEqual, "bar", "baz"), - }, - }, - } - for _, tt := range tests { - t.Run(tt.expr.String(), func(t *testing.T) { - got, err := expressionToMatchers(tt.expr, false) - if tt.wantErr { - require.Error(t, err) - t.Log(err) - } else { - require.NoError(t, err) - require.ElementsMatch(t, tt.want, got) - } - }) - } -} - func TestCatalog_TimeRangeValidate(t *testing.T) { tests := []struct { name string @@ -280,7 +126,7 @@ func TestCatalog_FilterDescriptorsForShard(t *testing.T) { desc3.ObjectPath = "baz" desc3.SectionIdx = 3 sectionDescriptors := []*metastore.DataobjSectionDescriptor{&desc1, &desc2, &desc3} - res, err := filterDescriptorsForShard(shard, sectionDescriptors) + res, err := FilterDescriptorsForShard(shard, sectionDescriptors) require.NoError(t, err) tr1, err := newTimeRange(start1, end1) require.NoError(t, err) @@ -292,5 +138,244 @@ func TestCatalog_FilterDescriptorsForShard(t *testing.T) { } require.ElementsMatch(t, res, expected) }) +} + +func TestUnresolvedCatalog_RequestCollection(t *testing.T) { + t.Run("Collects unique requests", func(t *testing.T) { + catalog := NewUnresolvedCatalog() + selector := newTestLabelMatcher("app", "test") + predicates := []Expression{newTestLabelMatcher("env", "prod")} + shard := ShardInfo{Shard: 1, Of: 2} + from := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC) + through := time.Date(2025, time.January, 1, 12, 0, 0, 0, time.UTC) + + // First call should add the request + _, err := catalog.ResolveShardDescriptorsWithShard(selector, predicates, shard, from, through) + require.NoError(t, err) + require.Equal(t, 1, catalog.RequestsCount()) + + // Second call with same parameters should not add another request + _, err = catalog.ResolveShardDescriptorsWithShard(selector, predicates, shard, from, through) + require.NoError(t, err) + require.Equal(t, 1, catalog.RequestsCount()) + }) + + t.Run("Collects different requests", func(t *testing.T) { + catalog := NewUnresolvedCatalog() + selector := newTestLabelMatcher("app", "test") + predicates := []Expression{newTestLabelMatcher("env", "prod")} + shard1 := ShardInfo{Shard: 1, Of: 2} + shard2 := ShardInfo{Shard: 2, Of: 2} + from := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC) + through := time.Date(2025, time.January, 1, 12, 0, 0, 0, time.UTC) + + // First request with shard 1 + _, err := catalog.ResolveShardDescriptorsWithShard(selector, predicates, shard1, from, through) + require.NoError(t, err) + require.Equal(t, 1, catalog.RequestsCount()) + + // Second request with shard 2 should add another request + _, err = catalog.ResolveShardDescriptorsWithShard(selector, predicates, shard2, from, through) + require.NoError(t, err) + require.Equal(t, 2, catalog.RequestsCount()) + }) +} + +func TestUnresolvedCatalog_Resolve(t *testing.T) { + t.Run("Successfully resolves all requests", func(t *testing.T) { + catalog := NewUnresolvedCatalog() + selector := newTestLabelMatcher("app", "test") + predicates := []Expression{newTestLabelMatcher("env", "prod")} + shard := ShardInfo{Shard: 1, Of: 2} + from := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC) + through := time.Date(2025, time.January, 1, 12, 0, 0, 0, time.UTC) + + _, err := catalog.ResolveShardDescriptorsWithShard(selector, predicates, shard, from, through) + require.NoError(t, err) + + tr, err := newTimeRange(from, through) + require.NoError(t, err) + + // Resolve with a function that returns mock descriptors + resolved, err := catalog.Resolve(func(_ CatalogRequest) (CatalogResponse, error) { + return CatalogResponse{ + Kind: CatalogRequestKindResolveShardDescriptorsWithShard, + Descriptors: []FilteredShardDescriptor{ + {Location: "test-location", Streams: []int64{1}, Sections: []int{1}, TimeRange: tr}, + }, + }, nil + }) + require.NoError(t, err) + + // Verify we can retrieve the resolved descriptors + descriptors, err := resolved.ResolveShardDescriptorsWithShard(selector, predicates, shard, from, through) + require.NoError(t, err) + require.Len(t, descriptors, 1) + require.Equal(t, DataObjLocation("test-location"), descriptors[0].Location) + }) + + t.Run("Returns error when resolve function fails", func(t *testing.T) { + catalog := NewUnresolvedCatalog() + selector := newTestLabelMatcher("app", "test") + shard := ShardInfo{Shard: 1, Of: 2} + from := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC) + through := time.Date(2025, time.January, 1, 12, 0, 0, 0, time.UTC) + + _, err := catalog.ResolveShardDescriptorsWithShard(selector, nil, shard, from, through) + require.NoError(t, err) + + // Resolve with a function that returns an error + _, err = catalog.Resolve(func(_ CatalogRequest) (CatalogResponse, error) { + return CatalogResponse{}, fmt.Errorf("mock error") + }) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to resolve catalog response") + }) +} + +func TestResolvedCatalog_ResolveShardDescriptorsWithShard(t *testing.T) { + t.Run("Returns descriptors for resolved request", func(t *testing.T) { + catalog := NewUnresolvedCatalog() + selector := newTestLabelMatcher("app", "test") + predicates := []Expression{newTestLabelMatcher("env", "prod")} + shard := ShardInfo{Shard: 1, Of: 2} + from := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC) + through := time.Date(2025, time.January, 1, 12, 0, 0, 0, time.UTC) + + _, err := catalog.ResolveShardDescriptorsWithShard(selector, predicates, shard, from, through) + require.NoError(t, err) + + tr, err := newTimeRange(from, through) + require.NoError(t, err) + + resolved, err := catalog.Resolve(func(_ CatalogRequest) (CatalogResponse, error) { + return CatalogResponse{ + Kind: CatalogRequestKindResolveShardDescriptorsWithShard, + Descriptors: []FilteredShardDescriptor{ + {Location: "test-location", Streams: []int64{1, 2, 3}, Sections: []int{1, 2}, TimeRange: tr}, + }, + }, nil + }) + require.NoError(t, err) + + descriptors, err := resolved.ResolveShardDescriptorsWithShard(selector, predicates, shard, from, through) + require.NoError(t, err) + require.Len(t, descriptors, 1) + require.Equal(t, DataObjLocation("test-location"), descriptors[0].Location) + require.Equal(t, []int64{1, 2, 3}, descriptors[0].Streams) + require.Equal(t, []int{1, 2}, descriptors[0].Sections) + }) + + t.Run("Returns error for missing request", func(t *testing.T) { + catalog := NewUnresolvedCatalog() + selector := newTestLabelMatcher("app", "test") + shard := ShardInfo{Shard: 1, Of: 2} + from := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC) + through := time.Date(2025, time.January, 1, 12, 0, 0, 0, time.UTC) + + resolved, err := catalog.Resolve(func(_ CatalogRequest) (CatalogResponse, error) { + return CatalogResponse{}, nil + }) + require.NoError(t, err) + + // Try to resolve a request that was never added + _, err = resolved.ResolveShardDescriptorsWithShard(selector, nil, shard, from, through) + require.Error(t, err) + require.Contains(t, err.Error(), "catalog response missing for request") + }) +} + +func TestMetastoreCatalog_ResolveShardDescriptorsWithShard(t *testing.T) { + t.Run("Successfully resolves descriptors", func(t *testing.T) { + ctx := context.Background() + now := time.Now() + start := now.Add(time.Second * -10) + end := now.Add(time.Second * -5) + + desc1 := &metastore.DataobjSectionDescriptor{ + SectionKey: metastore.SectionKey{ + ObjectPath: "test-path", + SectionIdx: 1, + }, + StreamIDs: []int64{1, 2}, + RowCount: 10, + Size: 100, + Start: start, + End: end, + } + + mockMetastore := &mockMetastore{ + sections: []*metastore.DataobjSectionDescriptor{desc1}, + } + + catalog := NewMetastoreCatalog(ctx, mockMetastore) + selector := newTestLabelMatcher("app", "test") + shard := ShardInfo{Shard: 1, Of: 2} + + descriptors, err := catalog.ResolveShardDescriptorsWithShard(selector, nil, shard, start, end) + require.NoError(t, err) + require.Len(t, descriptors, 1) + require.Equal(t, DataObjLocation("test-path"), descriptors[0].Location) + require.Equal(t, []int64{1, 2}, descriptors[0].Streams) + }) + + t.Run("Returns error when metastore is nil", func(t *testing.T) { + ctx := context.Background() + catalog := NewMetastoreCatalog(ctx, nil) + selector := newTestLabelMatcher("app", "test") + shard := ShardInfo{Shard: 1, Of: 2} + now := time.Now() + + _, err := catalog.ResolveShardDescriptorsWithShard(selector, nil, shard, now, now) + require.Error(t, err) + require.Contains(t, err.Error(), "no metastore to resolve objects") + }) + + t.Run("Returns error when metastore.Sections fails", func(t *testing.T) { + ctx := context.Background() + mockMetastore := &mockMetastore{ + err: fmt.Errorf("metastore error"), + } + + catalog := NewMetastoreCatalog(ctx, mockMetastore) + selector := newTestLabelMatcher("app", "test") + shard := ShardInfo{Shard: 1, Of: 2} + now := time.Now() + + _, err := catalog.ResolveShardDescriptorsWithShard(selector, nil, shard, now, now) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to resolve data object sections") + }) +} + +// Test helper functions + +// newTestLabelMatcher creates a BinaryExpr that represents a label matcher (label = "value") +func newTestLabelMatcher(label, value string) Expression { + return &BinaryExpr{ + Op: types.BinaryOpEq, + Left: &ColumnExpr{Ref: types.ColumnRef{Column: label, Type: types.ColumnTypeLabel}}, + Right: NewLiteral(value), + } +} + +// mockMetastore is a mock implementation of the metastore.Metastore interface for testing +type mockMetastore struct { + sections []*metastore.DataobjSectionDescriptor + err error +} + +func (m *mockMetastore) Sections(_ context.Context, _, _ time.Time, _ []*labels.Matcher, _ []*labels.Matcher) ([]*metastore.DataobjSectionDescriptor, error) { + if m.err != nil { + return nil, m.err + } + return m.sections, nil +} + +func (m *mockMetastore) Labels(_ context.Context, _, _ time.Time, _ ...*labels.Matcher) ([]string, error) { + return nil, nil +} +func (m *mockMetastore) Values(_ context.Context, _, _ time.Time, _ ...*labels.Matcher) ([]string, error) { + return nil, nil } diff --git a/pkg/engine/internal/planner/physical/dataobjscan.go b/pkg/engine/internal/planner/physical/dataobjscan.go index e8db84dfb83a3..72636d6a3b7fb 100644 --- a/pkg/engine/internal/planner/physical/dataobjscan.go +++ b/pkg/engine/internal/planner/physical/dataobjscan.go @@ -6,7 +6,7 @@ import ( "github.com/oklog/ulid/v2" ) -// DataObjLocation is a string that uniquely indentifies a data object location in +// DataObjLocation is a string that uniquely identifies a data object location in // object storage. type DataObjLocation string diff --git a/pkg/engine/internal/planner/physical/expressions.go b/pkg/engine/internal/planner/physical/expressions.go index bfcd27bafd4aa..89605d6cda2ca 100644 --- a/pkg/engine/internal/planner/physical/expressions.go +++ b/pkg/engine/internal/planner/physical/expressions.go @@ -4,6 +4,8 @@ import ( "fmt" "strings" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/engine/internal/types" ) @@ -253,3 +255,90 @@ func (e *VariadicExpr) String() string { func (*VariadicExpr) Type() ExpressionType { return ExprTypeVariadic } + +// ExpressionToMatchers converts a selector expression to a list of matchers. +// The selector expression is required to be a (tree of) [BinaryExpression] +// with a [ColumnExpression] on the left and a [LiteralExpression] on the right. +// It optionally supports ambiguous column references. Non-ambiguous column references are label matchers. +func ExpressionToMatchers(selector Expression, allowAmbiguousColumnRefs bool) ([]*labels.Matcher, error) { + if selector == nil { + return nil, nil + } + + switch expr := selector.(type) { + case *BinaryExpr: + switch expr.Op { + case types.BinaryOpAnd: + lhs, err := ExpressionToMatchers(expr.Left, allowAmbiguousColumnRefs) + if err != nil { + return nil, err + } + rhs, err := ExpressionToMatchers(expr.Right, allowAmbiguousColumnRefs) + if err != nil { + return nil, err + } + return append(lhs, rhs...), nil + case types.BinaryOpEq, types.BinaryOpNeq, types.BinaryOpMatchRe, types.BinaryOpNotMatchRe: + op, err := convertBinaryOp(expr.Op) + if err != nil { + return nil, err + } + name, err := convertColumnRef(expr.Left, allowAmbiguousColumnRefs) + if err != nil { + return nil, err + } + value, err := convertLiteralToString(expr.Right) + if err != nil { + return nil, err + } + lhs, err := labels.NewMatcher(op, name, value) + if err != nil { + return nil, err + } + return []*labels.Matcher{lhs}, nil + default: + return nil, fmt.Errorf("invalid binary expression in stream selector expression: %v", expr.Op.String()) + } + default: + return nil, fmt.Errorf("invalid expression type in stream selector expression: %T", expr) + } +} + +func convertLiteralToString(expr Expression) (string, error) { + l, ok := expr.(*LiteralExpr) + if !ok { + return "", fmt.Errorf("expected literal expression, got %T", expr) + } + if l.ValueType() != types.Loki.String { + return "", fmt.Errorf("literal type is not a string, got %v", l.ValueType()) + } + return l.Value().(string), nil +} + +func convertColumnRef(expr Expression, allowAmbiguousColumnRefs bool) (string, error) { + ref, ok := expr.(*ColumnExpr) + if !ok { + return "", fmt.Errorf("expected column expression, got %T", expr) + } + if !allowAmbiguousColumnRefs && ref.Ref.Type != types.ColumnTypeLabel { + return "", fmt.Errorf("column type is not a label, got %v", ref.Ref.Type) + } + return ref.Ref.Column, nil +} + +var ( + binOpToMatchTypeMapping = map[types.BinaryOp]labels.MatchType{ + types.BinaryOpEq: labels.MatchEqual, + types.BinaryOpNeq: labels.MatchNotEqual, + types.BinaryOpMatchRe: labels.MatchRegexp, + types.BinaryOpNotMatchRe: labels.MatchNotRegexp, + } +) + +func convertBinaryOp(t types.BinaryOp) (labels.MatchType, error) { + ty, ok := binOpToMatchTypeMapping[t] + if !ok { + return -1, fmt.Errorf("invalid binary operator for matcher: %v", t) + } + return ty, nil +} diff --git a/pkg/engine/internal/planner/physical/expressions_test.go b/pkg/engine/internal/planner/physical/expressions_test.go index 952237b683e2d..13a7f86f7dd31 100644 --- a/pkg/engine/internal/planner/physical/expressions_test.go +++ b/pkg/engine/internal/planner/physical/expressions_test.go @@ -2,7 +2,9 @@ package physical import ( "testing" + "time" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/engine/internal/types" @@ -108,3 +110,159 @@ func TestLiteralExpr(t *testing.T) { require.Equal(t, types.Loki.String, literal.ValueType()) }) } + +func TestExpressionToMatchers(t *testing.T) { + tests := []struct { + expr Expression + want []*labels.Matcher + wantErr bool + }{ + { + expr: newColumnExpr("foo", types.ColumnTypeLabel), + wantErr: true, + }, + { + expr: NewLiteral("foo"), + wantErr: true, + }, + { + expr: &BinaryExpr{ + Left: newColumnExpr("foo", types.ColumnTypeLabel), + Right: NewLiteral("bar"), + Op: types.BinaryOpEq, + }, + want: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + { + expr: &BinaryExpr{ + Left: &BinaryExpr{ + Left: newColumnExpr("foo", types.ColumnTypeLabel), + Right: NewLiteral("bar"), + Op: types.BinaryOpEq, + }, + Right: &BinaryExpr{ + Left: newColumnExpr("bar", types.ColumnTypeLabel), + Right: NewLiteral("baz"), + Op: types.BinaryOpNeq, + }, + Op: types.BinaryOpAnd, + }, + want: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchNotEqual, "bar", "baz"), + }, + }, + } + for _, tt := range tests { + t.Run(tt.expr.String(), func(t *testing.T) { + got, err := ExpressionToMatchers(tt.expr, false) + if tt.wantErr { + require.Error(t, err) + t.Log(err) + } else { + require.NoError(t, err) + require.ElementsMatch(t, tt.want, got) + } + }) + } +} + +func TestConvertLiteral(t *testing.T) { + tests := []struct { + expr Expression + want string + wantErr bool + }{ + { + expr: NewLiteral("foo"), + want: "foo", + }, + { + expr: NewLiteral(false), + wantErr: true, + }, + { + expr: NewLiteral(int64(123)), + wantErr: true, + }, + { + expr: NewLiteral(types.Timestamp(time.Now().UnixNano())), + wantErr: true, + }, + { + expr: NewLiteral(types.Duration(time.Hour.Nanoseconds())), + wantErr: true, + }, + { + expr: newColumnExpr("foo", types.ColumnTypeLabel), + wantErr: true, + }, + { + expr: &BinaryExpr{ + Left: newColumnExpr("foo", types.ColumnTypeLabel), + Right: NewLiteral("foo"), + Op: types.BinaryOpEq, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.expr.String(), func(t *testing.T) { + got, err := convertLiteralToString(tt.expr) + if tt.wantErr { + require.Error(t, err) + t.Log(err) + } else { + require.NoError(t, err) + require.Equal(t, tt.want, got) + } + }) + } +} + +func TestConvertColumnRef(t *testing.T) { + tests := []struct { + expr Expression + want string + wantErr bool + }{ + { + expr: newColumnExpr("foo", types.ColumnTypeLabel), + want: "foo", + }, + { + expr: newColumnExpr("foo", types.ColumnTypeAmbiguous), + wantErr: true, + }, + { + expr: newColumnExpr("foo", types.ColumnTypeBuiltin), + wantErr: true, + }, + { + expr: NewLiteral(false), + wantErr: true, + }, + { + expr: &BinaryExpr{ + Left: newColumnExpr("foo", types.ColumnTypeLabel), + Right: NewLiteral("foo"), + Op: types.BinaryOpEq, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.expr.String(), func(t *testing.T) { + got, err := convertColumnRef(tt.expr, false) + if tt.wantErr { + require.Error(t, err) + t.Log(err) + } else { + require.NoError(t, err) + require.Equal(t, tt.want, got) + } + }) + } +} diff --git a/pkg/engine/internal/planner/physical/plan.go b/pkg/engine/internal/planner/physical/plan.go index 4682fb797bd2c..b28b971b49b7f 100644 --- a/pkg/engine/internal/planner/physical/plan.go +++ b/pkg/engine/internal/planner/physical/plan.go @@ -26,6 +26,7 @@ const ( NodeTypeParallelize NodeTypeScanSet NodeTypeJoin + NodeTypePointersScan ) func (t NodeType) String() string { @@ -58,6 +59,8 @@ func (t NodeType) String() string { return "ScanSet" case NodeTypeJoin: return "Join" + case NodeTypePointersScan: + return "PointersScan" default: return "Undefined" } @@ -106,6 +109,7 @@ var _ Node = (*TopK)(nil) var _ Node = (*Parallelize)(nil) var _ Node = (*ScanSet)(nil) var _ Node = (*Join)(nil) +var _ Node = (*PointersScan)(nil) func (*DataObjScan) isNode() {} func (*Projection) isNode() {} @@ -118,6 +122,7 @@ func (*TopK) isNode() {} func (*Parallelize) isNode() {} func (*ScanSet) isNode() {} func (*Join) isNode() {} +func (*PointersScan) isNode() {} // Plan represents a physical execution plan as a directed acyclic graph (DAG). // It maintains the relationships between nodes, tracking parent-child connections @@ -182,10 +187,20 @@ func (p *Plan) CalculateMaxTimeRange() TimeRange { return fmt.Errorf("stop after RangeAggregation") case *ScanSet: for _, t := range s.Targets { - timeRange = timeRange.Merge(t.DataObject.MaxTimeRange) + switch t.Type { + case ScanTypeDataObject: + timeRange = timeRange.Merge(t.DataObject.MaxTimeRange) + case ScanTypePointers: + timeRange = timeRange.Merge(t.PointersScan.MaxTimeRange) + default: + panic("unhandled default case") + } + } case *DataObjScan: timeRange = timeRange.Merge(s.MaxTimeRange) + case *PointersScan: + timeRange = timeRange.Merge(s.MaxTimeRange) } return nil diff --git a/pkg/engine/internal/planner/physical/planner_test.go b/pkg/engine/internal/planner/physical/planner_test.go index 1a90059dbfe2f..ecb15ee997763 100644 --- a/pkg/engine/internal/planner/physical/planner_test.go +++ b/pkg/engine/internal/planner/physical/planner_test.go @@ -17,14 +17,9 @@ type catalog struct { sectionDescriptors []*metastore.DataobjSectionDescriptor } -// ResolveShardDescriptors implements Catalog. -func (c *catalog) ResolveShardDescriptors(e Expression, from, through time.Time) ([]FilteredShardDescriptor, error) { - return c.ResolveShardDescriptorsWithShard(e, nil, noShard, from, through) -} - // ResolveDataObjForShard implements Catalog. func (c *catalog) ResolveShardDescriptorsWithShard(_ Expression, _ []Expression, shard ShardInfo, _, _ time.Time) ([]FilteredShardDescriptor, error) { - return filterDescriptorsForShard(shard, c.sectionDescriptors) + return FilterDescriptorsForShard(shard, c.sectionDescriptors) } var _ Catalog = (*catalog)(nil) diff --git a/pkg/engine/internal/planner/physical/pointersscan.go b/pkg/engine/internal/planner/physical/pointersscan.go new file mode 100644 index 0000000000000..b6a6891d5688a --- /dev/null +++ b/pkg/engine/internal/planner/physical/pointersscan.go @@ -0,0 +1,36 @@ +package physical + +import ( + "time" + + "github.com/oklog/ulid/v2" +) + +type PointersScan struct { + NodeID ulid.ULID + + Location DataObjLocation + + Selector Expression + + MaxTimeRange TimeRange + Start time.Time + End time.Time +} + +func (s *PointersScan) ID() ulid.ULID { return s.NodeID } + +func (s *PointersScan) Clone() Node { + return &PointersScan{ + NodeID: ulid.Make(), + Location: s.Location, + Selector: s.Selector.Clone(), + MaxTimeRange: s.MaxTimeRange, + Start: s.Start, + End: s.End, + } +} + +func (s *PointersScan) Type() NodeType { + return NodeTypePointersScan +} diff --git a/pkg/engine/internal/planner/physical/scanset.go b/pkg/engine/internal/planner/physical/scanset.go index 6c55c38675b7a..133eec2516ddc 100644 --- a/pkg/engine/internal/planner/physical/scanset.go +++ b/pkg/engine/internal/planner/physical/scanset.go @@ -14,6 +14,9 @@ type ScanTarget struct { // DataObj is non-nil if Type is [ScanTypeDataObject]. Despite DataObjScan // implementing [Node], the value is not inserted into the graph as a node. DataObject *DataObjScan + + // PointersScan is non-nil if Type is [ScanTypePointers]. + PointersScan *PointersScan } // Clone returns a copy of the scan target. @@ -31,6 +34,7 @@ type ScanType int const ( ScanTypeInvalid ScanType = iota ScanTypeDataObject + ScanTypePointers ) // String returns a string representation of the scan type. @@ -40,6 +44,8 @@ func (ty ScanType) String() string { return "ScanTypeInvalid" case ScanTypeDataObject: return "ScanTypeDataObject" + case ScanTypePointers: + return "ScanTypePointers" default: return fmt.Sprintf("ScanType(%d)", ty) } @@ -108,6 +114,12 @@ func (s *ScanSet) Shards() iter.Seq[Node] { return } + case ScanTypePointers: + node := target.PointersScan.Clone().(*PointersScan) + node.NodeID = target.PointersScan.NodeID + if !yield(node) { + return + } default: panic(fmt.Sprintf("invalid scan type %s", target.Type)) } diff --git a/pkg/engine/internal/worker/thread.go b/pkg/engine/internal/worker/thread.go index dcba20f7dfa85..4d128a20f71e6 100644 --- a/pkg/engine/internal/worker/thread.go +++ b/pkg/engine/internal/worker/thread.go @@ -9,6 +9,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/user" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/oklog/ulid/v2" "github.com/thanos-io/objstore" @@ -40,6 +41,7 @@ type thread struct { BatchSize int64 Bucket objstore.Bucket Logger log.Logger + Metastore metastore.Metastore Ready chan<- readyRequest } @@ -162,7 +164,7 @@ func (t *thread) runJob(ctx context.Context, job *threadJob) { ctx, capture := xcap.NewCapture(ctx, nil) defer capture.End() - pipeline := executor.Run(ctx, cfg, job.Task.Fragment, logger) + pipeline := executor.Run(ctx, cfg, job.Task.Fragment, logger, t.Metastore) // If the root pipeline can be interested in some specific contributing time range // then subscribe to changes. diff --git a/pkg/engine/internal/worker/worker.go b/pkg/engine/internal/worker/worker.go index dccec403f743d..643851e0fa444 100644 --- a/pkg/engine/internal/worker/worker.go +++ b/pkg/engine/internal/worker/worker.go @@ -21,6 +21,7 @@ import ( "go.opentelemetry.io/otel/propagation" "golang.org/x/sync/errgroup" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/engine/internal/scheduler/wire" "github.com/grafana/loki/v3/pkg/engine/internal/workflow" ) @@ -66,6 +67,8 @@ type Config struct { // Absolute path of the endpoint where the frame handler is registered. // Used for connecting to scheduler and other workers. Endpoint string + + Metastore metastore.Metastore } // readyRequest is a message sent from a thread to notify the worker that it's @@ -170,6 +173,7 @@ func (w *Worker) run(ctx context.Context) error { BatchSize: w.config.BatchSize, Logger: log.With(w.logger, "thread", i), Bucket: w.config.Bucket, + Metastore: w.config.Metastore, Ready: w.readyCh, } diff --git a/pkg/engine/worker.go b/pkg/engine/worker.go index 3ceb6ce393f5f..41986751a77b6 100644 --- a/pkg/engine/worker.go +++ b/pkg/engine/worker.go @@ -9,6 +9,7 @@ import ( "github.com/go-kit/log" "github.com/gorilla/mux" "github.com/grafana/dskit/services" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/pkg/errors" "github.com/thanos-io/objstore" @@ -32,8 +33,9 @@ func (cfg *WorkerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) // WorkerParams holds parameters for constructing a new [Worker]. type WorkerParams struct { - Logger log.Logger // Logger for optional log messages. - Bucket objstore.Bucket // Bucket to read stored data from. + Logger log.Logger // Logger for optional log messages. + Bucket objstore.Bucket // Bucket to read stored data from. + Metastore metastore.Metastore Config WorkerConfig // Configuration for the worker. Executor ExecutorConfig // Configuration for task execution. @@ -111,8 +113,9 @@ func NewWorker(params WorkerParams) (*Worker, error) { } inner, err := worker.New(worker.Config{ - Logger: params.Logger, - Bucket: params.Bucket, + Logger: params.Logger, + Bucket: params.Bucket, + Metastore: params.Metastore, Dialer: dialer, Listener: listener, diff --git a/pkg/logql/bench/store_dataobj_v2_engine.go b/pkg/logql/bench/store_dataobj_v2_engine.go index a293b340c2ab5..9a3f261fe70d4 100644 --- a/pkg/logql/bench/store_dataobj_v2_engine.go +++ b/pkg/logql/bench/store_dataobj_v2_engine.go @@ -17,7 +17,9 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/server" "github.com/grafana/dskit/services" + "github.com/grafana/loki/v3/pkg/storage/bucket" "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" @@ -47,9 +49,10 @@ type DataObjV2EngineStore struct { func NewDataObjV2EngineStore(dir string, tenantID string) (*DataObjV2EngineStore, error) { storageDir := filepath.Join(dir, storageDir) return dataobjV2StoreWithOpts(storageDir, tenantID, engine.ExecutorConfig{ - BatchSize: 512, - RangeConfig: rangeio.DefaultConfig, - MergePrefetchCount: 8, + BatchSize: 512, + RangeConfig: rangeio.DefaultConfig, + MergePrefetchCount: 8, + AheadOfTimeCatalogLookupsEnabled: true, }, metastore.Config{ IndexStoragePrefix: "index/v0", }) @@ -91,6 +94,12 @@ func dataobjV2StoreWithOpts(dataDir string, tenantID string, cfg engine.Executor schedLookupAddr string ) + var metastoreBucket objstore.Bucket = bucketClient + if metastoreCfg.IndexStoragePrefix != "" { + metastoreBucket = bucket.NewPrefixedBucketClient(bucketClient, metastoreCfg.IndexStoragePrefix) + } + metastoreMetrics := metastore.NewObjectMetastoreMetrics(prometheus.DefaultRegisterer) + if *remoteTransport { schedSrv, schedSvc, err = newServerService("scheduler", logger) if err != nil { @@ -123,10 +132,16 @@ func dataobjV2StoreWithOpts(dataDir string, tenantID string, cfg engine.Executor schedLookupAddr = schedAdvertiseAddr.String() } + workerLogger := log.With(logger, "component", "worker") worker, err := engine.NewWorker(engine.WorkerParams{ - Logger: log.With(logger, "component", "worker"), - AdvertiseAddr: workerAdvertiseAddr, - Bucket: bucketClient, + Logger: workerLogger, + AdvertiseAddr: workerAdvertiseAddr, + Bucket: bucketClient, + Metastore: metastore.NewObjectMetastore( + metastoreBucket, + workerLogger, + metastoreMetrics, + ), LocalScheduler: sched, Config: engine.WorkerConfig{ SchedulerLookupAddress: schedLookupAddr, @@ -156,14 +171,18 @@ func dataobjV2StoreWithOpts(dataDir string, tenantID string, cfg engine.Executor worker.RegisterWorkerServer(workerSrv.HTTP) } + engineLogger := log.With(logger, "component", "engine") newEngine, err := engine.New(engine.Params{ - Logger: logger, - Registerer: prometheus.NewRegistry(), - Config: cfg, - MetastoreConfig: metastoreCfg, - Scheduler: sched, - Bucket: bucketClient, - Limits: logql.NoLimits, + Logger: engineLogger, + Registerer: prometheus.NewRegistry(), + Config: cfg, + Metastore: metastore.NewObjectMetastore( + metastoreBucket, + workerLogger, + metastoreMetrics, + ), + Scheduler: sched, + Limits: logql.NoLimits, }) if err != nil { return nil, fmt.Errorf("creating engine: %w", err) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index b293c05b34cf6..5c0a637fb4648 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -38,6 +38,7 @@ import ( dataobjconfig "github.com/grafana/loki/v3/pkg/dataobj/config" "github.com/grafana/loki/v3/pkg/dataobj/consumer" dataobjindex "github.com/grafana/loki/v3/pkg/dataobj/index" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/engine" "github.com/grafana/loki/v3/pkg/indexgateway" @@ -465,6 +466,8 @@ type Loki struct { Metrics *server.Metrics UsageTracker push.UsageTracker + + MetastoreMetrics *metastore.ObjectMetastoreMetrics } // New makes a new Loki. diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index bffbae29b629d..0a0a6e26a55ef 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -50,6 +50,7 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/consumer" "github.com/grafana/loki/v3/pkg/dataobj/explorer" dataobjindex "github.com/grafana/loki/v3/pkg/dataobj/index" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/distributor" engine_v2 "github.com/grafana/loki/v3/pkg/engine" "github.com/grafana/loki/v3/pkg/indexgateway" @@ -603,7 +604,7 @@ func (t *Loki) initQuerier() (services.Service, error) { } } - t.querierAPI = querier.NewQuerierAPI(t.Cfg.Querier, t.Cfg.QueryEngine, t.Cfg.DataObj.Metastore, t.Querier, t.Overrides, store, prometheus.DefaultRegisterer, logger) + t.querierAPI = querier.NewQuerierAPI(t.Cfg.Querier, t.Cfg.QueryEngine, t.getMetastore(store, "dataobj-querier"), t.Querier, t.Overrides, store, prometheus.DefaultRegisterer, logger) indexStatsHTTPMiddleware := querier.WrapQuerySpanAndTimeout("query.IndexStats", t.Overrides) indexShardsHTTPMiddleware := querier.WrapQuerySpanAndTimeout("query.IndexShards", t.Overrides) @@ -1412,26 +1413,44 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { }), nil } +func (t *Loki) getMetastore(store objstore.Bucket, component string) metastore.Metastore { + logger := log.With(util_log.Logger, "component", component) + store = bucket.NewXCapBucket(store) + + if t.Cfg.DataObj.Metastore.IndexStoragePrefix != "" { + store = objstore.NewPrefixedBucket(store, t.Cfg.DataObj.Metastore.IndexStoragePrefix) + } + + if t.MetastoreMetrics == nil { + // there could be multiple metastore clients from different components but the metrics instance should be the + // same to avoid reregistration of metrics on the same Prometheus registerer. + t.MetastoreMetrics = metastore.NewObjectMetastoreMetrics(prometheus.DefaultRegisterer) + } + + return metastore.NewObjectMetastore(store, logger, t.MetastoreMetrics) +} + func (t *Loki) initV2QueryEngine() (services.Service, error) { if !t.Cfg.QueryEngine.Enable { return nil, nil } - store, err := t.getDataObjBucket("query-engine") + componentName := "query-engine" + + store, err := t.getDataObjBucket(componentName) if err != nil { return nil, err } - logger := log.With(util_log.Logger, "component", "query-engine") + logger := log.With(util_log.Logger, "component", componentName) engine, err := engine_v2.New(engine_v2.Params{ Logger: logger, Registerer: prometheus.DefaultRegisterer, - Config: t.Cfg.QueryEngine.Executor, - MetastoreConfig: t.Cfg.DataObj.Metastore, + Config: t.Cfg.QueryEngine.Executor, + Metastore: t.getMetastore(store, componentName), Scheduler: t.queryEngineV2Scheduler, - Bucket: store, Limits: t.Overrides, }) if err != nil { @@ -1508,6 +1527,8 @@ func (t *Loki) initV2QueryEngineWorker() (services.Service, error) { return nil, nil } + componentName := "query-engine-worker" + // Determine the advertise address. Results in nil if not running // distributed execution. listenPort := uint16(t.Cfg.Server.HTTPListenPort) @@ -1516,14 +1537,15 @@ func (t *Loki) initV2QueryEngineWorker() (services.Service, error) { return nil, err } - store, err := t.getDataObjBucket("query-engine-worker") + store, err := t.getDataObjBucket(componentName) if err != nil { return nil, err } worker, err := engine_v2.NewWorker(engine_v2.WorkerParams{ - Logger: log.With(util_log.Logger, "component", "query-engine-worker"), - Bucket: store, + Logger: log.With(util_log.Logger, "component", componentName), + Bucket: store, + Metastore: t.getMetastore(store, componentName), Config: t.Cfg.QueryEngine.Worker, Executor: t.Cfg.QueryEngine.Executor, diff --git a/pkg/querier/http.go b/pkg/querier/http.go index eecdfb991ba2e..e30177e992b72 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -63,7 +63,7 @@ type QuerierAPI struct { } // NewQuerierAPI returns an instance of the QuerierAPI. -func NewQuerierAPI(v1Cfg Config, v2Cfg engine.Config, mCfg metastore.Config, querier Querier, limits querier_limits.Limits, store objstore.Bucket, reg prometheus.Registerer, logger log.Logger) *QuerierAPI { +func NewQuerierAPI(v1Cfg Config, v2Cfg engine.Config, ms metastore.Metastore, querier Querier, limits querier_limits.Limits, store objstore.Bucket, reg prometheus.Registerer, logger log.Logger) *QuerierAPI { q := &QuerierAPI{ cfgV1: v1Cfg, cfgV2: v2Cfg, @@ -74,7 +74,7 @@ func NewQuerierAPI(v1Cfg Config, v2Cfg engine.Config, mCfg metastore.Config, que } if v2Cfg.Enable { - q.engineV2 = engine.NewBasic(v2Cfg.Executor, mCfg, store, limits, reg, logger) + q.engineV2 = engine.NewBasic(v2Cfg.Executor, ms, store, limits, reg, logger) } return q diff --git a/pkg/querier/http_test.go b/pkg/querier/http_test.go index 1615997109a6f..f0306997e934c 100644 --- a/pkg/querier/http_test.go +++ b/pkg/querier/http_test.go @@ -32,7 +32,7 @@ func TestInstantQueryHandler(t *testing.T) { require.NoError(t, err) t.Run("log selector expression not allowed for instant queries", func(t *testing.T) { - api := NewQuerierAPI(mockQuerierConfig(), engine.Config{}, metastore.Config{}, nil, limits, nil, nil, log.NewNopLogger()) + api := NewQuerierAPI(mockQuerierConfig(), engine.Config{}, nil, nil, limits, nil, nil, log.NewNopLogger()) ctx := user.InjectOrgID(context.Background(), "user") req, err := http.NewRequestWithContext(ctx, "GET", `/api/v1/query`, nil) @@ -420,7 +420,7 @@ func setupAPI(t *testing.T, querier *querierMock, enableMetricAggregation bool) limits, err := validation.NewOverrides(defaultLimits, nil) require.NoError(t, err) - api := NewQuerierAPI(mockQuerierConfig(), engine.Config{}, metastore.Config{}, querier, limits, nil, nil, log.NewNopLogger()) + api := NewQuerierAPI(mockQuerierConfig(), engine.Config{}, nil, querier, limits, nil, nil, log.NewNopLogger()) return api } diff --git a/tools/querycomparator/execute.go b/tools/querycomparator/execute.go index c68213f4c30d7..5366642c2592d 100644 --- a/tools/querycomparator/execute.go +++ b/tools/querycomparator/execute.go @@ -115,12 +115,21 @@ func checkResult(result logqlmodel.Result) error { // doLocalQueryWithV2Engine executes a query using the V2 engine func doLocalQueryWithV2Engine(params logql.LiteralParams) (logqlmodel.Result, error) { + metastore := metastore.NewObjectMetastore( + bucket.NewPrefixedBucketClient(MustDataobjBucket(), "index/v0"), + glog.NewLogfmtLogger(os.Stderr), + metastore.NewObjectMetastoreMetrics(prometheus.DefaultRegisterer), + ) + ctx := user.InjectOrgID(context.Background(), orgID) - qe := engine.NewBasic(engine.ExecutorConfig{ - BatchSize: 512, - }, metastore.Config{ - IndexStoragePrefix: "index/v0", - }, MustDataobjBucket(), logql.NoLimits, prometheus.DefaultRegisterer, glog.NewLogfmtLogger(os.Stderr)) + qe := engine.NewBasic( + engine.ExecutorConfig{BatchSize: 512}, + metastore, + MustDataobjBucket(), + logql.NoLimits, + prometheus.DefaultRegisterer, + glog.NewLogfmtLogger(os.Stderr), + ) query := qe.Query(params) return query.Exec(ctx) } diff --git a/tools/querytee/goldfish/config.go b/tools/querytee/goldfish/config.go index 1ddc9825a0291..de5793f1d0ebe 100644 --- a/tools/querytee/goldfish/config.go +++ b/tools/querytee/goldfish/config.go @@ -60,7 +60,7 @@ type Config struct { // 1. Data before ComparisonStartDate -> split goes to the preferred backend only // 2. Data between ComparisonStartDate and (now - ComparisonMinAge) -> splits go to all backends and are compared with goldfish // 3. Data after (now - ComparisonMinAge) -> split goes to the preferred backend only - ComparisonStartDate flagext.Time `yaml:"storage_start_date" category:"experimental"` + ComparisonStartDate flagext.Time `yaml:"storage_start_date" category:"experimental"` } // SamplingConfig defines how queries are sampled