diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 82eb203faafba..c8261aca8de3f 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -339,6 +339,10 @@ query_engine: # CLI flag: -query-engine.range-reads.min-range-size [min_range_size: | default = 1048576] + # Experimental: Enable ahead of time catalog lookups. + # CLI flag: -query-engine.ahead-of-time-catalog-lookups-enabled + [ahead_of_time_catalog_lookups_enabled: | default = false] + # Experimental: Number of worker threads to spawn. Each worker thread runs one # task at a time. 0 means to use GOMAXPROCS value. # CLI flag: -query-engine.worker-threads diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index b40b4f1ea2e13..087aee98ba380 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -58,12 +58,16 @@ type ExecutorConfig struct { // RangeConfig determines how to optimize range reads in the V2 engine. RangeConfig rangeio.Config `yaml:"range_reads" category:"experimental" doc:"description=Configures how to read byte ranges from object storage when using the V2 engine."` + + // AheadOfTimeCatalogLookupsEnabled enables ahead of time catalog lookups + AheadOfTimeCatalogLookupsEnabled bool `yaml:"ahead_of_time_catalog_lookups_enabled" category:"experimental"` } func (cfg *ExecutorConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.IntVar(&cfg.BatchSize, prefix+"batch-size", 100, "Experimental: Batch size of the next generation query engine.") f.IntVar(&cfg.MergePrefetchCount, prefix+"merge-prefetch-count", 0, "Experimental: The number of inputs that are prefetched simultaneously by any Merge node. A value of 0 means that only the currently processed input is prefetched, 1 means that only the next input is prefetched, and so on. A negative value means that all inputs are be prefetched in parallel.") cfg.RangeConfig.RegisterFlags(prefix+"range-reads.", f) + f.BoolVar(&cfg.AheadOfTimeCatalogLookupsEnabled, prefix+"ahead-of-time-catalog-lookups-enabled", false, "Experimental: Enable ahead of time catalog lookups.") } // Params holds parameters for constructing a new [Engine]. @@ -106,7 +110,8 @@ type Engine struct { bucket objstore.Bucket // Bucket to read stored data from. limits logql.Limits // Limits to apply to engine queries. - metastore metastore.Metastore + metastore metastore.Metastore + aheadOfTimeCatalogLookupsEnabled bool } // New creates a new Engine. @@ -120,9 +125,10 @@ func New(params Params) (*Engine, error) { metrics: newMetrics(params.Registerer), rangeConfig: params.Config.RangeConfig, - scheduler: params.Scheduler, - bucket: bucket.NewXCapBucket(params.Bucket), - limits: params.Limits, + scheduler: params.Scheduler, + bucket: bucket.NewXCapBucket(params.Bucket), + limits: params.Limits, + aheadOfTimeCatalogLookupsEnabled: params.Config.AheadOfTimeCatalogLookupsEnabled, } if e.bucket != nil { @@ -305,9 +311,12 @@ func (e *Engine) buildPhysicalPlan(ctx context.Context, logger log.Logger, param region := xcap.RegionFromContext(ctx) timer := prometheus.NewTimer(e.metrics.physicalPlanning) - // TODO(rfratto): To improve the performance of the physical planner, we - // may want to parallelize metastore lookups across scheduled tasks as well. - catalog := physical.NewMetastoreCatalog(ctx, e.metastore) + catalog, err := e.prepareCatalog(ctx, params, logicalPlan) + if err != nil { + level.Warn(logger).Log("msg", "failed to prepare catalog", "err", err) + region.RecordError(err) + return nil, 0, ErrPlanningFailed + } // TODO(rfratto): It feels strange that we need to past the start/end time // to the physical planner. Isn't it already represented by the logical @@ -341,6 +350,43 @@ func (e *Engine) buildPhysicalPlan(ctx context.Context, logger log.Logger, param return physicalPlan, duration, nil } +func (e *Engine) prepareCatalog(ctx context.Context, params logql.Params, logicalPlan *logical.Plan) (physical.Catalog, error) { + start := time.Now() + metastoreCatalog := physical.NewMetastoreCatalog(ctx, e.metastore) + + if !e.aheadOfTimeCatalogLookupsEnabled { + return metastoreCatalog, nil + } + + unresolved := physical.NewUnresolvedCatalog() + collectorPlanner := physical.NewPlanner(physical.NewContext(params.Start(), params.End()), unresolved) + if _, err := collectorPlanner.Build(logicalPlan); err != nil { + return nil, fmt.Errorf("collecting catalog requests: %w", err) + } + + resolved, err := unresolved.Resolve(func(req physical.CatalogRequest) (physical.CatalogResponse, error) { + switch req.Kind { + case physical.CatalogRequestKindResolveShardDescriptorsWithShard: + descriptors, err := metastoreCatalog.ResolveShardDescriptorsWithShard(req.Selector, req.Predicates, req.Shard, req.From, req.Through) + if err != nil { + return physical.CatalogResponse{}, err + } + return physical.CatalogResponse{ + Kind: physical.CatalogRequestKindResolveShardDescriptorsWithShard, + Descriptors: descriptors, + }, nil + default: + return physical.CatalogResponse{}, fmt.Errorf("unsupported catalog request kind: %v", req.Kind) + } + }) + if err != nil { + return nil, fmt.Errorf("resolving catalog: %w", err) + } + + level.Info(e.logger).Log("msg", "finished catalog lookups", "duration", time.Since(start).String(), "requests", unresolved.RequestsCount()) + return resolved, nil +} + // buildWorkflow builds a workflow from the given physical plan. func (e *Engine) buildWorkflow(ctx context.Context, logger log.Logger, physicalPlan *physical.Plan) (*workflow.Workflow, time.Duration, error) { tenantID, err := user.ExtractOrgID(ctx) diff --git a/pkg/engine/internal/planner/physical/catalog.go b/pkg/engine/internal/planner/physical/catalog.go index 17d1ba341e410..ea07e58ac9716 100644 --- a/pkg/engine/internal/planner/physical/catalog.go +++ b/pkg/engine/internal/planner/physical/catalog.go @@ -4,23 +4,12 @@ import ( "context" "errors" "fmt" + "strings" "time" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/dataobj/metastore" - "github.com/grafana/loki/v3/pkg/engine/internal/types" -) - -var ( - binOpToMatchTypeMapping = map[types.BinaryOp]labels.MatchType{ - types.BinaryOpEq: labels.MatchEqual, - types.BinaryOpNeq: labels.MatchNotEqual, - types.BinaryOpMatchRe: labels.MatchRegexp, - types.BinaryOpNotMatchRe: labels.MatchNotRegexp, - } - - noShard = ShardInfo{Shard: 0, Of: 1} ) type ShardInfo struct { @@ -32,7 +21,7 @@ func (s ShardInfo) String() string { return fmt.Sprintf("%d_of_%d", s.Shard, s.Of) } -// Start and End are inclusive. +// TimeRange contains Start and End that are inclusive. type TimeRange struct { Start time.Time End time.Time @@ -80,12 +69,11 @@ type FilteredShardDescriptor struct { // providing this information (e.g. pg_catalog, ...) whereas in Loki there // is the Metastore. type Catalog interface { - // ResolveShardDescriptors returns a list of + // ResolveShardDescriptorsWithShard returns a list of // FilteredShardDescriptor objects, which each include: // a data object path, a list of stream IDs for // each data object path, a list of sections for // each data object path, and a time range. - ResolveShardDescriptors(Expression, time.Time, time.Time) ([]FilteredShardDescriptor, error) ResolveShardDescriptorsWithShard(Expression, []Expression, ShardInfo, time.Time, time.Time) ([]FilteredShardDescriptor, error) } @@ -103,14 +91,6 @@ func NewMetastoreCatalog(ctx context.Context, ms metastore.Metastore) *Metastore } } -// ResolveShardDescriptors resolves an array of FilteredShardDescriptor -// objects based on a given [Expression]. The expression is required -// to be a (tree of) [BinaryExpression] with a [ColumnExpression] -// on the left and a [LiteralExpression] on the right. -func (c *MetastoreCatalog) ResolveShardDescriptors(selector Expression, from, through time.Time) ([]FilteredShardDescriptor, error) { - return c.ResolveShardDescriptorsWithShard(selector, nil, noShard, from, through) -} - func (c *MetastoreCatalog) ResolveShardDescriptorsWithShard(selector Expression, predicates []Expression, shard ShardInfo, from, through time.Time) ([]FilteredShardDescriptor, error) { if c.metastore == nil { return nil, errors.New("no metastore to resolve objects") @@ -125,14 +105,14 @@ func (c *MetastoreCatalog) resolveShardDescriptorsWithIndex(selector Expression, return nil, errors.New("no metastore to resolve objects") } - matchers, err := expressionToMatchers(selector, false) + matchers, err := ExpressionToMatchers(selector, false) if err != nil { return nil, fmt.Errorf("failed to convert selector expression into matchers: %w", err) } predicateMatchers := make([]*labels.Matcher, 0, len(predicates)) for _, predicate := range predicates { - matchers, err := expressionToMatchers(predicate, true) + matchers, err := ExpressionToMatchers(predicate, true) if err != nil { // Not all predicates are supported by the metastore, so some will be skipped continue @@ -145,110 +125,175 @@ func (c *MetastoreCatalog) resolveShardDescriptorsWithIndex(selector Expression, return nil, fmt.Errorf("failed to resolve data object sections: %w", err) } - return filterDescriptorsForShard(shard, sectionDescriptors) + return FilterDescriptorsForShard(shard, sectionDescriptors) } -// filterDescriptorsForShard filters the section descriptors for a given shard. -// It returns the locations, streams, and sections for the shard. -// TODO: Improve filtering: this method could be improved because it doesn't resolve the stream IDs to sections, even though this information is available. Instead, it resolves streamIDs to the whole object. -func filterDescriptorsForShard(shard ShardInfo, sectionDescriptors []*metastore.DataobjSectionDescriptor) ([]FilteredShardDescriptor, error) { - filteredDescriptors := make([]FilteredShardDescriptor, 0, len(sectionDescriptors)) +// CatalogRequestKind describes what metadata the planner needs. +type CatalogRequestKind uint8 - for _, desc := range sectionDescriptors { - filteredDescriptor := FilteredShardDescriptor{} - filteredDescriptor.Location = DataObjLocation(desc.ObjectPath) +const ( + // CatalogRequestKindResolveShardDescriptorsWithShard requests section descriptors + CatalogRequestKindResolveShardDescriptorsWithShard CatalogRequestKind = iota +) - if int(desc.SectionIdx)%int(shard.Of) == int(shard.Shard) { - filteredDescriptor.Streams = desc.StreamIDs - filteredDescriptor.Sections = []int{int(desc.SectionIdx)} - tr, err := newTimeRange(desc.Start, desc.End) - if err != nil { - return nil, err - } - filteredDescriptor.TimeRange = tr - filteredDescriptors = append(filteredDescriptors, filteredDescriptor) - } +// CatalogRequest captures catalog request required to finish physical planning. +type CatalogRequest struct { + Kind CatalogRequestKind + + Selector Expression + Predicates []Expression + + Shard ShardInfo + + From time.Time + Through time.Time +} + +// CatalogResponse contains catalog response. +type CatalogResponse struct { + Kind CatalogRequestKind + + Descriptors []FilteredShardDescriptor +} + +type catalogRequestKey struct { + kind CatalogRequestKind + selector string + predicates string + from int64 + through int64 + shard uint32 + shardOf uint32 +} + +func newCatalogRequestKey(req CatalogRequest) catalogRequestKey { + return catalogRequestKey{ + kind: req.Kind, + selector: expressionSignature(req.Selector), + predicates: expressionsSignature(req.Predicates), + from: req.From.UnixNano(), + through: req.Through.UnixNano(), + shard: req.Shard.Shard, + shardOf: req.Shard.Of, } +} - return filteredDescriptors, nil +func expressionsSignature(exprs []Expression) string { + if len(exprs) == 0 { + return "" + } + parts := make([]string, len(exprs)) + for i, expr := range exprs { + parts[i] = expressionSignature(expr) + } + return strings.Join(parts, ";") } -// expressionToMatchers converts a selector expression to a list of matchers. -// The selector expression is required to be a (tree of) [BinaryExpression] -// with a [ColumnExpression] on the left and a [LiteralExpression] on the right. -// It optionally supports ambiguous column references. Non-ambiguous column references are label matchers. -func expressionToMatchers(selector Expression, allowAmbiguousColumnRefs bool) ([]*labels.Matcher, error) { - if selector == nil { - return nil, nil +func expressionSignature(expr Expression) string { + if expr == nil { + return "" } + return expr.String() +} - switch expr := selector.(type) { - case *BinaryExpr: - switch expr.Op { - case types.BinaryOpAnd: - lhs, err := expressionToMatchers(expr.Left, allowAmbiguousColumnRefs) - if err != nil { - return nil, err - } - rhs, err := expressionToMatchers(expr.Right, allowAmbiguousColumnRefs) - if err != nil { - return nil, err - } - return append(lhs, rhs...), nil - case types.BinaryOpEq, types.BinaryOpNeq, types.BinaryOpMatchRe, types.BinaryOpNotMatchRe: - op, err := convertBinaryOp(expr.Op) - if err != nil { - return nil, err - } - name, err := convertColumnRef(expr.Left, allowAmbiguousColumnRefs) - if err != nil { - return nil, err - } - value, err := convertLiteralToString(expr.Right) - if err != nil { - return nil, err - } - lhs, err := labels.NewMatcher(op, name, value) - if err != nil { - return nil, err - } - return []*labels.Matcher{lhs}, nil - default: - return nil, fmt.Errorf("invalid binary expression in stream selector expression: %v", expr.Op.String()) - } - default: - return nil, fmt.Errorf("invalid expression type in stream selector expression: %T", expr) +type UnresolvedCatalog struct { + requests map[catalogRequestKey]CatalogRequest +} + +func NewUnresolvedCatalog() UnresolvedCatalog { + return UnresolvedCatalog{ + requests: make(map[catalogRequestKey]CatalogRequest), } } -func convertLiteralToString(expr Expression) (string, error) { - l, ok := expr.(*LiteralExpr) - if !ok { - return "", fmt.Errorf("expected literal expression, got %T", expr) +func (c UnresolvedCatalog) ResolveShardDescriptorsWithShard(selector Expression, predicates []Expression, shard ShardInfo, from, through time.Time) ([]FilteredShardDescriptor, error) { + req := CatalogRequest{ + Kind: CatalogRequestKindResolveShardDescriptorsWithShard, + Selector: cloneExpressions([]Expression{selector})[0], + Predicates: cloneExpressions(predicates), + Shard: shard, + From: from, + Through: through, } - if l.ValueType() != types.Loki.String { - return "", fmt.Errorf("literal type is not a string, got %v", l.ValueType()) + + key := newCatalogRequestKey(req) + if _, ok := c.requests[key]; !ok { + c.requests[key] = req } - return l.Value().(string), nil + return nil, nil } -func convertColumnRef(expr Expression, allowAmbiguousColumnRefs bool) (string, error) { - ref, ok := expr.(*ColumnExpr) - if !ok { - return "", fmt.Errorf("expected column expression, got %T", expr) +func (c UnresolvedCatalog) RequestsCount() int { + return len(c.requests) +} + +func (c UnresolvedCatalog) Resolve(resolve func(CatalogRequest) (CatalogResponse, error)) (ResolvedCatalog, error) { + resolved := ResolvedCatalog{ + responses: make(map[catalogRequestKey]CatalogResponse), } - if !allowAmbiguousColumnRefs && ref.Ref.Type != types.ColumnTypeLabel { - return "", fmt.Errorf("column type is not a label, got %v", ref.Ref.Type) + for key, req := range c.requests { + resp, err := resolve(req) + if err != nil { + return resolved, fmt.Errorf("failed to resolve catalog response: %w", err) + } + resolved.responses[key] = resp } - return ref.Ref.Column, nil + + return resolved, nil +} + +type ResolvedCatalog struct { + responses map[catalogRequestKey]CatalogResponse } -func convertBinaryOp(t types.BinaryOp) (labels.MatchType, error) { - ty, ok := binOpToMatchTypeMapping[t] +func (c ResolvedCatalog) ResolveShardDescriptorsWithShard(selector Expression, predicates []Expression, shard ShardInfo, from, through time.Time) ([]FilteredShardDescriptor, error) { + req := CatalogRequest{ + Kind: CatalogRequestKindResolveShardDescriptorsWithShard, + Selector: selector, + Predicates: predicates, + Shard: shard, + From: from, + Through: through, + } + reqKey := newCatalogRequestKey(req) + resp, ok := c.responses[reqKey] if !ok { - return -1, fmt.Errorf("invalid binary operator for matcher: %v", t) + return nil, fmt.Errorf("catalog response missing for request %+v", reqKey) + } + + switch resp.Kind { + case CatalogRequestKindResolveShardDescriptorsWithShard: + return resp.Descriptors, nil + default: + return nil, fmt.Errorf("unsupported response kind %d", resp.Kind) + } +} + +// FilterDescriptorsForShard filters the section descriptors for a given shard. +// It returns the locations, streams, and sections for the shard. +// TODO: Improve filtering: this method could be improved because it doesn't resolve the stream IDs to sections, even though this information is available. Instead, it resolves streamIDs to the whole object. +func FilterDescriptorsForShard(shard ShardInfo, sectionDescriptors []*metastore.DataobjSectionDescriptor) ([]FilteredShardDescriptor, error) { + filteredDescriptors := make([]FilteredShardDescriptor, 0, len(sectionDescriptors)) + + for _, desc := range sectionDescriptors { + filteredDescriptor := FilteredShardDescriptor{} + filteredDescriptor.Location = DataObjLocation(desc.ObjectPath) + + if int(desc.SectionIdx)%int(shard.Of) == int(shard.Shard) { + filteredDescriptor.Streams = desc.StreamIDs + filteredDescriptor.Sections = []int{int(desc.SectionIdx)} + tr, err := newTimeRange(desc.Start, desc.End) + if err != nil { + return nil, err + } + filteredDescriptor.TimeRange = tr + filteredDescriptors = append(filteredDescriptors, filteredDescriptor) + } } - return ty, nil + + return filteredDescriptors, nil } var _ Catalog = (*MetastoreCatalog)(nil) +var _ Catalog = UnresolvedCatalog{} +var _ Catalog = ResolvedCatalog{} diff --git a/pkg/engine/internal/planner/physical/catalog_test.go b/pkg/engine/internal/planner/physical/catalog_test.go index 9aabcc9bae2fb..7a3cc52306c8c 100644 --- a/pkg/engine/internal/planner/physical/catalog_test.go +++ b/pkg/engine/internal/planner/physical/catalog_test.go @@ -1,6 +1,8 @@ package physical import ( + "context" + "fmt" "testing" "time" @@ -11,162 +13,6 @@ import ( "github.com/grafana/loki/v3/pkg/engine/internal/types" ) -func TestCatalog_ConvertLiteral(t *testing.T) { - tests := []struct { - expr Expression - want string - wantErr bool - }{ - { - expr: NewLiteral("foo"), - want: "foo", - }, - { - expr: NewLiteral(false), - wantErr: true, - }, - { - expr: NewLiteral(int64(123)), - wantErr: true, - }, - { - expr: NewLiteral(types.Timestamp(time.Now().UnixNano())), - wantErr: true, - }, - { - expr: NewLiteral(types.Duration(time.Hour.Nanoseconds())), - wantErr: true, - }, - { - expr: newColumnExpr("foo", types.ColumnTypeLabel), - wantErr: true, - }, - { - expr: &BinaryExpr{ - Left: newColumnExpr("foo", types.ColumnTypeLabel), - Right: NewLiteral("foo"), - Op: types.BinaryOpEq, - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.expr.String(), func(t *testing.T) { - got, err := convertLiteralToString(tt.expr) - if tt.wantErr { - require.Error(t, err) - t.Log(err) - } else { - require.NoError(t, err) - require.Equal(t, tt.want, got) - } - }) - } -} - -func TestCatalog_ConvertColumnRef(t *testing.T) { - tests := []struct { - expr Expression - want string - wantErr bool - }{ - { - expr: newColumnExpr("foo", types.ColumnTypeLabel), - want: "foo", - }, - { - expr: newColumnExpr("foo", types.ColumnTypeAmbiguous), - wantErr: true, - }, - { - expr: newColumnExpr("foo", types.ColumnTypeBuiltin), - wantErr: true, - }, - { - expr: NewLiteral(false), - wantErr: true, - }, - { - expr: &BinaryExpr{ - Left: newColumnExpr("foo", types.ColumnTypeLabel), - Right: NewLiteral("foo"), - Op: types.BinaryOpEq, - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.expr.String(), func(t *testing.T) { - got, err := convertColumnRef(tt.expr, false) - if tt.wantErr { - require.Error(t, err) - t.Log(err) - } else { - require.NoError(t, err) - require.Equal(t, tt.want, got) - } - }) - } -} - -func TestCatalog_ExpressionToMatchers(t *testing.T) { - tests := []struct { - expr Expression - want []*labels.Matcher - wantErr bool - }{ - { - expr: newColumnExpr("foo", types.ColumnTypeLabel), - wantErr: true, - }, - { - expr: NewLiteral("foo"), - wantErr: true, - }, - { - expr: &BinaryExpr{ - Left: newColumnExpr("foo", types.ColumnTypeLabel), - Right: NewLiteral("bar"), - Op: types.BinaryOpEq, - }, - want: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), - }, - }, - { - expr: &BinaryExpr{ - Left: &BinaryExpr{ - Left: newColumnExpr("foo", types.ColumnTypeLabel), - Right: NewLiteral("bar"), - Op: types.BinaryOpEq, - }, - Right: &BinaryExpr{ - Left: newColumnExpr("bar", types.ColumnTypeLabel), - Right: NewLiteral("baz"), - Op: types.BinaryOpNeq, - }, - Op: types.BinaryOpAnd, - }, - want: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), - labels.MustNewMatcher(labels.MatchNotEqual, "bar", "baz"), - }, - }, - } - for _, tt := range tests { - t.Run(tt.expr.String(), func(t *testing.T) { - got, err := expressionToMatchers(tt.expr, false) - if tt.wantErr { - require.Error(t, err) - t.Log(err) - } else { - require.NoError(t, err) - require.ElementsMatch(t, tt.want, got) - } - }) - } -} - func TestCatalog_TimeRangeValidate(t *testing.T) { tests := []struct { name string @@ -280,7 +126,7 @@ func TestCatalog_FilterDescriptorsForShard(t *testing.T) { desc3.ObjectPath = "baz" desc3.SectionIdx = 3 sectionDescriptors := []*metastore.DataobjSectionDescriptor{&desc1, &desc2, &desc3} - res, err := filterDescriptorsForShard(shard, sectionDescriptors) + res, err := FilterDescriptorsForShard(shard, sectionDescriptors) require.NoError(t, err) tr1, err := newTimeRange(start1, end1) require.NoError(t, err) @@ -292,5 +138,244 @@ func TestCatalog_FilterDescriptorsForShard(t *testing.T) { } require.ElementsMatch(t, res, expected) }) +} + +func TestUnresolvedCatalog_RequestCollection(t *testing.T) { + t.Run("Collects unique requests", func(t *testing.T) { + catalog := NewUnresolvedCatalog() + selector := newTestLabelMatcher("app", "test") + predicates := []Expression{newTestLabelMatcher("env", "prod")} + shard := ShardInfo{Shard: 1, Of: 2} + from := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC) + through := time.Date(2025, time.January, 1, 12, 0, 0, 0, time.UTC) + + // First call should add the request + _, err := catalog.ResolveShardDescriptorsWithShard(selector, predicates, shard, from, through) + require.NoError(t, err) + require.Equal(t, 1, catalog.RequestsCount()) + + // Second call with same parameters should not add another request + _, err = catalog.ResolveShardDescriptorsWithShard(selector, predicates, shard, from, through) + require.NoError(t, err) + require.Equal(t, 1, catalog.RequestsCount()) + }) + + t.Run("Collects different requests", func(t *testing.T) { + catalog := NewUnresolvedCatalog() + selector := newTestLabelMatcher("app", "test") + predicates := []Expression{newTestLabelMatcher("env", "prod")} + shard1 := ShardInfo{Shard: 1, Of: 2} + shard2 := ShardInfo{Shard: 2, Of: 2} + from := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC) + through := time.Date(2025, time.January, 1, 12, 0, 0, 0, time.UTC) + + // First request with shard 1 + _, err := catalog.ResolveShardDescriptorsWithShard(selector, predicates, shard1, from, through) + require.NoError(t, err) + require.Equal(t, 1, catalog.RequestsCount()) + + // Second request with shard 2 should add another request + _, err = catalog.ResolveShardDescriptorsWithShard(selector, predicates, shard2, from, through) + require.NoError(t, err) + require.Equal(t, 2, catalog.RequestsCount()) + }) +} + +func TestUnresolvedCatalog_Resolve(t *testing.T) { + t.Run("Successfully resolves all requests", func(t *testing.T) { + catalog := NewUnresolvedCatalog() + selector := newTestLabelMatcher("app", "test") + predicates := []Expression{newTestLabelMatcher("env", "prod")} + shard := ShardInfo{Shard: 1, Of: 2} + from := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC) + through := time.Date(2025, time.January, 1, 12, 0, 0, 0, time.UTC) + + _, err := catalog.ResolveShardDescriptorsWithShard(selector, predicates, shard, from, through) + require.NoError(t, err) + + tr, err := newTimeRange(from, through) + require.NoError(t, err) + + // Resolve with a function that returns mock descriptors + resolved, err := catalog.Resolve(func(_ CatalogRequest) (CatalogResponse, error) { + return CatalogResponse{ + Kind: CatalogRequestKindResolveShardDescriptorsWithShard, + Descriptors: []FilteredShardDescriptor{ + {Location: "test-location", Streams: []int64{1}, Sections: []int{1}, TimeRange: tr}, + }, + }, nil + }) + require.NoError(t, err) + + // Verify we can retrieve the resolved descriptors + descriptors, err := resolved.ResolveShardDescriptorsWithShard(selector, predicates, shard, from, through) + require.NoError(t, err) + require.Len(t, descriptors, 1) + require.Equal(t, DataObjLocation("test-location"), descriptors[0].Location) + }) + + t.Run("Returns error when resolve function fails", func(t *testing.T) { + catalog := NewUnresolvedCatalog() + selector := newTestLabelMatcher("app", "test") + shard := ShardInfo{Shard: 1, Of: 2} + from := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC) + through := time.Date(2025, time.January, 1, 12, 0, 0, 0, time.UTC) + + _, err := catalog.ResolveShardDescriptorsWithShard(selector, nil, shard, from, through) + require.NoError(t, err) + + // Resolve with a function that returns an error + _, err = catalog.Resolve(func(_ CatalogRequest) (CatalogResponse, error) { + return CatalogResponse{}, fmt.Errorf("mock error") + }) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to resolve catalog response") + }) +} + +func TestResolvedCatalog_ResolveShardDescriptorsWithShard(t *testing.T) { + t.Run("Returns descriptors for resolved request", func(t *testing.T) { + catalog := NewUnresolvedCatalog() + selector := newTestLabelMatcher("app", "test") + predicates := []Expression{newTestLabelMatcher("env", "prod")} + shard := ShardInfo{Shard: 1, Of: 2} + from := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC) + through := time.Date(2025, time.January, 1, 12, 0, 0, 0, time.UTC) + + _, err := catalog.ResolveShardDescriptorsWithShard(selector, predicates, shard, from, through) + require.NoError(t, err) + + tr, err := newTimeRange(from, through) + require.NoError(t, err) + + resolved, err := catalog.Resolve(func(_ CatalogRequest) (CatalogResponse, error) { + return CatalogResponse{ + Kind: CatalogRequestKindResolveShardDescriptorsWithShard, + Descriptors: []FilteredShardDescriptor{ + {Location: "test-location", Streams: []int64{1, 2, 3}, Sections: []int{1, 2}, TimeRange: tr}, + }, + }, nil + }) + require.NoError(t, err) + + descriptors, err := resolved.ResolveShardDescriptorsWithShard(selector, predicates, shard, from, through) + require.NoError(t, err) + require.Len(t, descriptors, 1) + require.Equal(t, DataObjLocation("test-location"), descriptors[0].Location) + require.Equal(t, []int64{1, 2, 3}, descriptors[0].Streams) + require.Equal(t, []int{1, 2}, descriptors[0].Sections) + }) + + t.Run("Returns error for missing request", func(t *testing.T) { + catalog := NewUnresolvedCatalog() + selector := newTestLabelMatcher("app", "test") + shard := ShardInfo{Shard: 1, Of: 2} + from := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC) + through := time.Date(2025, time.January, 1, 12, 0, 0, 0, time.UTC) + + resolved, err := catalog.Resolve(func(_ CatalogRequest) (CatalogResponse, error) { + return CatalogResponse{}, nil + }) + require.NoError(t, err) + + // Try to resolve a request that was never added + _, err = resolved.ResolveShardDescriptorsWithShard(selector, nil, shard, from, through) + require.Error(t, err) + require.Contains(t, err.Error(), "catalog response missing for request") + }) +} + +func TestMetastoreCatalog_ResolveShardDescriptorsWithShard(t *testing.T) { + t.Run("Successfully resolves descriptors", func(t *testing.T) { + ctx := context.Background() + now := time.Now() + start := now.Add(time.Second * -10) + end := now.Add(time.Second * -5) + + desc1 := &metastore.DataobjSectionDescriptor{ + SectionKey: metastore.SectionKey{ + ObjectPath: "test-path", + SectionIdx: 1, + }, + StreamIDs: []int64{1, 2}, + RowCount: 10, + Size: 100, + Start: start, + End: end, + } + + mockMetastore := &mockMetastore{ + sections: []*metastore.DataobjSectionDescriptor{desc1}, + } + + catalog := NewMetastoreCatalog(ctx, mockMetastore) + selector := newTestLabelMatcher("app", "test") + shard := ShardInfo{Shard: 1, Of: 2} + + descriptors, err := catalog.ResolveShardDescriptorsWithShard(selector, nil, shard, start, end) + require.NoError(t, err) + require.Len(t, descriptors, 1) + require.Equal(t, DataObjLocation("test-path"), descriptors[0].Location) + require.Equal(t, []int64{1, 2}, descriptors[0].Streams) + }) + + t.Run("Returns error when metastore is nil", func(t *testing.T) { + ctx := context.Background() + catalog := NewMetastoreCatalog(ctx, nil) + selector := newTestLabelMatcher("app", "test") + shard := ShardInfo{Shard: 1, Of: 2} + now := time.Now() + + _, err := catalog.ResolveShardDescriptorsWithShard(selector, nil, shard, now, now) + require.Error(t, err) + require.Contains(t, err.Error(), "no metastore to resolve objects") + }) + + t.Run("Returns error when metastore.Sections fails", func(t *testing.T) { + ctx := context.Background() + mockMetastore := &mockMetastore{ + err: fmt.Errorf("metastore error"), + } + + catalog := NewMetastoreCatalog(ctx, mockMetastore) + selector := newTestLabelMatcher("app", "test") + shard := ShardInfo{Shard: 1, Of: 2} + now := time.Now() + + _, err := catalog.ResolveShardDescriptorsWithShard(selector, nil, shard, now, now) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to resolve data object sections") + }) +} + +// Test helper functions + +// newTestLabelMatcher creates a BinaryExpr that represents a label matcher (label = "value") +func newTestLabelMatcher(label, value string) Expression { + return &BinaryExpr{ + Op: types.BinaryOpEq, + Left: &ColumnExpr{Ref: types.ColumnRef{Column: label, Type: types.ColumnTypeLabel}}, + Right: NewLiteral(value), + } +} + +// mockMetastore is a mock implementation of the metastore.Metastore interface for testing +type mockMetastore struct { + sections []*metastore.DataobjSectionDescriptor + err error +} + +func (m *mockMetastore) Sections(_ context.Context, _, _ time.Time, _ []*labels.Matcher, _ []*labels.Matcher) ([]*metastore.DataobjSectionDescriptor, error) { + if m.err != nil { + return nil, m.err + } + return m.sections, nil +} + +func (m *mockMetastore) Labels(_ context.Context, _, _ time.Time, _ ...*labels.Matcher) ([]string, error) { + return nil, nil +} +func (m *mockMetastore) Values(_ context.Context, _, _ time.Time, _ ...*labels.Matcher) ([]string, error) { + return nil, nil } diff --git a/pkg/engine/internal/planner/physical/dataobjscan.go b/pkg/engine/internal/planner/physical/dataobjscan.go index e8db84dfb83a3..72636d6a3b7fb 100644 --- a/pkg/engine/internal/planner/physical/dataobjscan.go +++ b/pkg/engine/internal/planner/physical/dataobjscan.go @@ -6,7 +6,7 @@ import ( "github.com/oklog/ulid/v2" ) -// DataObjLocation is a string that uniquely indentifies a data object location in +// DataObjLocation is a string that uniquely identifies a data object location in // object storage. type DataObjLocation string diff --git a/pkg/engine/internal/planner/physical/expressions.go b/pkg/engine/internal/planner/physical/expressions.go index bfcd27bafd4aa..89605d6cda2ca 100644 --- a/pkg/engine/internal/planner/physical/expressions.go +++ b/pkg/engine/internal/planner/physical/expressions.go @@ -4,6 +4,8 @@ import ( "fmt" "strings" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/engine/internal/types" ) @@ -253,3 +255,90 @@ func (e *VariadicExpr) String() string { func (*VariadicExpr) Type() ExpressionType { return ExprTypeVariadic } + +// ExpressionToMatchers converts a selector expression to a list of matchers. +// The selector expression is required to be a (tree of) [BinaryExpression] +// with a [ColumnExpression] on the left and a [LiteralExpression] on the right. +// It optionally supports ambiguous column references. Non-ambiguous column references are label matchers. +func ExpressionToMatchers(selector Expression, allowAmbiguousColumnRefs bool) ([]*labels.Matcher, error) { + if selector == nil { + return nil, nil + } + + switch expr := selector.(type) { + case *BinaryExpr: + switch expr.Op { + case types.BinaryOpAnd: + lhs, err := ExpressionToMatchers(expr.Left, allowAmbiguousColumnRefs) + if err != nil { + return nil, err + } + rhs, err := ExpressionToMatchers(expr.Right, allowAmbiguousColumnRefs) + if err != nil { + return nil, err + } + return append(lhs, rhs...), nil + case types.BinaryOpEq, types.BinaryOpNeq, types.BinaryOpMatchRe, types.BinaryOpNotMatchRe: + op, err := convertBinaryOp(expr.Op) + if err != nil { + return nil, err + } + name, err := convertColumnRef(expr.Left, allowAmbiguousColumnRefs) + if err != nil { + return nil, err + } + value, err := convertLiteralToString(expr.Right) + if err != nil { + return nil, err + } + lhs, err := labels.NewMatcher(op, name, value) + if err != nil { + return nil, err + } + return []*labels.Matcher{lhs}, nil + default: + return nil, fmt.Errorf("invalid binary expression in stream selector expression: %v", expr.Op.String()) + } + default: + return nil, fmt.Errorf("invalid expression type in stream selector expression: %T", expr) + } +} + +func convertLiteralToString(expr Expression) (string, error) { + l, ok := expr.(*LiteralExpr) + if !ok { + return "", fmt.Errorf("expected literal expression, got %T", expr) + } + if l.ValueType() != types.Loki.String { + return "", fmt.Errorf("literal type is not a string, got %v", l.ValueType()) + } + return l.Value().(string), nil +} + +func convertColumnRef(expr Expression, allowAmbiguousColumnRefs bool) (string, error) { + ref, ok := expr.(*ColumnExpr) + if !ok { + return "", fmt.Errorf("expected column expression, got %T", expr) + } + if !allowAmbiguousColumnRefs && ref.Ref.Type != types.ColumnTypeLabel { + return "", fmt.Errorf("column type is not a label, got %v", ref.Ref.Type) + } + return ref.Ref.Column, nil +} + +var ( + binOpToMatchTypeMapping = map[types.BinaryOp]labels.MatchType{ + types.BinaryOpEq: labels.MatchEqual, + types.BinaryOpNeq: labels.MatchNotEqual, + types.BinaryOpMatchRe: labels.MatchRegexp, + types.BinaryOpNotMatchRe: labels.MatchNotRegexp, + } +) + +func convertBinaryOp(t types.BinaryOp) (labels.MatchType, error) { + ty, ok := binOpToMatchTypeMapping[t] + if !ok { + return -1, fmt.Errorf("invalid binary operator for matcher: %v", t) + } + return ty, nil +} diff --git a/pkg/engine/internal/planner/physical/expressions_test.go b/pkg/engine/internal/planner/physical/expressions_test.go index 952237b683e2d..13a7f86f7dd31 100644 --- a/pkg/engine/internal/planner/physical/expressions_test.go +++ b/pkg/engine/internal/planner/physical/expressions_test.go @@ -2,7 +2,9 @@ package physical import ( "testing" + "time" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/engine/internal/types" @@ -108,3 +110,159 @@ func TestLiteralExpr(t *testing.T) { require.Equal(t, types.Loki.String, literal.ValueType()) }) } + +func TestExpressionToMatchers(t *testing.T) { + tests := []struct { + expr Expression + want []*labels.Matcher + wantErr bool + }{ + { + expr: newColumnExpr("foo", types.ColumnTypeLabel), + wantErr: true, + }, + { + expr: NewLiteral("foo"), + wantErr: true, + }, + { + expr: &BinaryExpr{ + Left: newColumnExpr("foo", types.ColumnTypeLabel), + Right: NewLiteral("bar"), + Op: types.BinaryOpEq, + }, + want: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + { + expr: &BinaryExpr{ + Left: &BinaryExpr{ + Left: newColumnExpr("foo", types.ColumnTypeLabel), + Right: NewLiteral("bar"), + Op: types.BinaryOpEq, + }, + Right: &BinaryExpr{ + Left: newColumnExpr("bar", types.ColumnTypeLabel), + Right: NewLiteral("baz"), + Op: types.BinaryOpNeq, + }, + Op: types.BinaryOpAnd, + }, + want: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchNotEqual, "bar", "baz"), + }, + }, + } + for _, tt := range tests { + t.Run(tt.expr.String(), func(t *testing.T) { + got, err := ExpressionToMatchers(tt.expr, false) + if tt.wantErr { + require.Error(t, err) + t.Log(err) + } else { + require.NoError(t, err) + require.ElementsMatch(t, tt.want, got) + } + }) + } +} + +func TestConvertLiteral(t *testing.T) { + tests := []struct { + expr Expression + want string + wantErr bool + }{ + { + expr: NewLiteral("foo"), + want: "foo", + }, + { + expr: NewLiteral(false), + wantErr: true, + }, + { + expr: NewLiteral(int64(123)), + wantErr: true, + }, + { + expr: NewLiteral(types.Timestamp(time.Now().UnixNano())), + wantErr: true, + }, + { + expr: NewLiteral(types.Duration(time.Hour.Nanoseconds())), + wantErr: true, + }, + { + expr: newColumnExpr("foo", types.ColumnTypeLabel), + wantErr: true, + }, + { + expr: &BinaryExpr{ + Left: newColumnExpr("foo", types.ColumnTypeLabel), + Right: NewLiteral("foo"), + Op: types.BinaryOpEq, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.expr.String(), func(t *testing.T) { + got, err := convertLiteralToString(tt.expr) + if tt.wantErr { + require.Error(t, err) + t.Log(err) + } else { + require.NoError(t, err) + require.Equal(t, tt.want, got) + } + }) + } +} + +func TestConvertColumnRef(t *testing.T) { + tests := []struct { + expr Expression + want string + wantErr bool + }{ + { + expr: newColumnExpr("foo", types.ColumnTypeLabel), + want: "foo", + }, + { + expr: newColumnExpr("foo", types.ColumnTypeAmbiguous), + wantErr: true, + }, + { + expr: newColumnExpr("foo", types.ColumnTypeBuiltin), + wantErr: true, + }, + { + expr: NewLiteral(false), + wantErr: true, + }, + { + expr: &BinaryExpr{ + Left: newColumnExpr("foo", types.ColumnTypeLabel), + Right: NewLiteral("foo"), + Op: types.BinaryOpEq, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.expr.String(), func(t *testing.T) { + got, err := convertColumnRef(tt.expr, false) + if tt.wantErr { + require.Error(t, err) + t.Log(err) + } else { + require.NoError(t, err) + require.Equal(t, tt.want, got) + } + }) + } +} diff --git a/pkg/engine/internal/planner/physical/planner_test.go b/pkg/engine/internal/planner/physical/planner_test.go index 1a90059dbfe2f..ecb15ee997763 100644 --- a/pkg/engine/internal/planner/physical/planner_test.go +++ b/pkg/engine/internal/planner/physical/planner_test.go @@ -17,14 +17,9 @@ type catalog struct { sectionDescriptors []*metastore.DataobjSectionDescriptor } -// ResolveShardDescriptors implements Catalog. -func (c *catalog) ResolveShardDescriptors(e Expression, from, through time.Time) ([]FilteredShardDescriptor, error) { - return c.ResolveShardDescriptorsWithShard(e, nil, noShard, from, through) -} - // ResolveDataObjForShard implements Catalog. func (c *catalog) ResolveShardDescriptorsWithShard(_ Expression, _ []Expression, shard ShardInfo, _, _ time.Time) ([]FilteredShardDescriptor, error) { - return filterDescriptorsForShard(shard, c.sectionDescriptors) + return FilterDescriptorsForShard(shard, c.sectionDescriptors) } var _ Catalog = (*catalog)(nil) diff --git a/pkg/logql/bench/store_dataobj_v2_engine.go b/pkg/logql/bench/store_dataobj_v2_engine.go index a293b340c2ab5..d3fae549b991e 100644 --- a/pkg/logql/bench/store_dataobj_v2_engine.go +++ b/pkg/logql/bench/store_dataobj_v2_engine.go @@ -47,9 +47,10 @@ type DataObjV2EngineStore struct { func NewDataObjV2EngineStore(dir string, tenantID string) (*DataObjV2EngineStore, error) { storageDir := filepath.Join(dir, storageDir) return dataobjV2StoreWithOpts(storageDir, tenantID, engine.ExecutorConfig{ - BatchSize: 512, - RangeConfig: rangeio.DefaultConfig, - MergePrefetchCount: 8, + BatchSize: 512, + RangeConfig: rangeio.DefaultConfig, + MergePrefetchCount: 8, + AheadOfTimeCatalogLookupsEnabled: true, }, metastore.Config{ IndexStoragePrefix: "index/v0", }) diff --git a/tools/querytee/goldfish/config.go b/tools/querytee/goldfish/config.go index 1ddc9825a0291..de5793f1d0ebe 100644 --- a/tools/querytee/goldfish/config.go +++ b/tools/querytee/goldfish/config.go @@ -60,7 +60,7 @@ type Config struct { // 1. Data before ComparisonStartDate -> split goes to the preferred backend only // 2. Data between ComparisonStartDate and (now - ComparisonMinAge) -> splits go to all backends and are compared with goldfish // 3. Data after (now - ComparisonMinAge) -> split goes to the preferred backend only - ComparisonStartDate flagext.Time `yaml:"storage_start_date" category:"experimental"` + ComparisonStartDate flagext.Time `yaml:"storage_start_date" category:"experimental"` } // SamplingConfig defines how queries are sampled