Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ query_engine:
# CLI flag: -query-engine.range-reads.min-range-size
[min_range_size: <int> | default = 1048576]

# Experimental: Enable ahead of time catalog lookups.
# CLI flag: -query-engine.ahead-of-time-catalog-lookups-enabled
[ahead_of_time_catalog_lookups_enabled: <boolean> | default = false]

# Experimental: Number of worker threads to spawn. Each worker thread runs one
# task at a time. 0 means to use GOMAXPROCS value.
# CLI flag: -query-engine.worker-threads
Expand Down
60 changes: 53 additions & 7 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,16 @@ type ExecutorConfig struct {

// RangeConfig determines how to optimize range reads in the V2 engine.
RangeConfig rangeio.Config `yaml:"range_reads" category:"experimental" doc:"description=Configures how to read byte ranges from object storage when using the V2 engine."`

// AheadOfTimeCatalogLookupsEnabled enables ahead of time catalog lookups
AheadOfTimeCatalogLookupsEnabled bool `yaml:"ahead_of_time_catalog_lookups_enabled" category:"experimental"`
}

func (cfg *ExecutorConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.BatchSize, prefix+"batch-size", 100, "Experimental: Batch size of the next generation query engine.")
f.IntVar(&cfg.MergePrefetchCount, prefix+"merge-prefetch-count", 0, "Experimental: The number of inputs that are prefetched simultaneously by any Merge node. A value of 0 means that only the currently processed input is prefetched, 1 means that only the next input is prefetched, and so on. A negative value means that all inputs are be prefetched in parallel.")
cfg.RangeConfig.RegisterFlags(prefix+"range-reads.", f)
f.BoolVar(&cfg.AheadOfTimeCatalogLookupsEnabled, prefix+"ahead-of-time-catalog-lookups-enabled", false, "Experimental: Enable ahead of time catalog lookups.")
}

// Params holds parameters for constructing a new [Engine].
Expand Down Expand Up @@ -106,7 +110,8 @@ type Engine struct {
bucket objstore.Bucket // Bucket to read stored data from.
limits logql.Limits // Limits to apply to engine queries.

metastore metastore.Metastore
metastore metastore.Metastore
aheadOfTimeCatalogLookupsEnabled bool
}

// New creates a new Engine.
Expand All @@ -120,9 +125,10 @@ func New(params Params) (*Engine, error) {
metrics: newMetrics(params.Registerer),
rangeConfig: params.Config.RangeConfig,

scheduler: params.Scheduler,
bucket: bucket.NewXCapBucket(params.Bucket),
limits: params.Limits,
scheduler: params.Scheduler,
bucket: bucket.NewXCapBucket(params.Bucket),
limits: params.Limits,
aheadOfTimeCatalogLookupsEnabled: params.Config.AheadOfTimeCatalogLookupsEnabled,
}

if e.bucket != nil {
Expand Down Expand Up @@ -305,9 +311,12 @@ func (e *Engine) buildPhysicalPlan(ctx context.Context, logger log.Logger, param
region := xcap.RegionFromContext(ctx)
timer := prometheus.NewTimer(e.metrics.physicalPlanning)

// TODO(rfratto): To improve the performance of the physical planner, we
// may want to parallelize metastore lookups across scheduled tasks as well.
catalog := physical.NewMetastoreCatalog(ctx, e.metastore)
catalog, err := e.prepareCatalog(ctx, params, logicalPlan)
if err != nil {
level.Warn(logger).Log("msg", "failed to prepare catalog", "err", err)
region.RecordError(err)
return nil, 0, ErrPlanningFailed
}

// TODO(rfratto): It feels strange that we need to past the start/end time
// to the physical planner. Isn't it already represented by the logical
Expand Down Expand Up @@ -341,6 +350,43 @@ func (e *Engine) buildPhysicalPlan(ctx context.Context, logger log.Logger, param
return physicalPlan, duration, nil
}

func (e *Engine) prepareCatalog(ctx context.Context, params logql.Params, logicalPlan *logical.Plan) (physical.Catalog, error) {
start := time.Now()
metastoreCatalog := physical.NewMetastoreCatalog(ctx, e.metastore)

if !e.aheadOfTimeCatalogLookupsEnabled {
return metastoreCatalog, nil
}

unresolved := physical.NewUnresolvedCatalog()
collectorPlanner := physical.NewPlanner(physical.NewContext(params.Start(), params.End()), unresolved)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feel a bit strange to me that we do physical planning twice, once to gather fake requests and once with the real data. Did you do it this way to maintain a separation of concerns or some other reason?

Physical planning is reasonably cheap without the metastore calls now but it might change in the future if we build different physical plans depending on the output of the catalog requests, or even issue new requests based off the old ones. Does it make sense to instead implement a catalog which will resolve requests via distributing metaqueries directly instead of doing this 2 step process?

That said, if this is just a stepping stone to the full implementation I'm happy to approve it to keep things moving!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to give the engine full control of the (meta)query execution

  • to streamline the query execution pipeline (engine triggers all the queries, not planner implicitly)
  • and to unblock certain optimizations. The requests we collect are essentially metaqueries we need to execute, hence if we know them beforehand we can optimize their execution (deduplicate / run in parallel / etc).

At the moment I think there is no difference in how we collect / execute metaqueries (given there's at most 1 catalog request per physical plan, right?), I can simplify the approach to delegate the work to a new catalog.

if we build different physical plans depending on the output of the catalog requests, or even issue new requests based off the old ones

Yes! Current approach doesn't work in this case, so I was thinking about a loop like

for catalogReqs := plan.Unresolved() ;; len(catalogReqs) > 0 {
  catalogResps := e.queryCatalog(catalogReqs)
  plan.Resolve(catalogResps)
}

I'm still slightly hesitant about executing metaqueries one by one in the order that depends on the physical planner walk just because we won't have enough context to optimize these queries (parallelize / decouple).


Wdyt about all above?

I'm considering simplifying this part for now (applying "implement a catalog which will resolve requests via distributing metaqueries directly" suggestion) and just keeping a comment that we need to revisit this part when we have multiple metaqueries per physical plan.

if _, err := collectorPlanner.Build(logicalPlan); err != nil {
return nil, fmt.Errorf("collecting catalog requests: %w", err)
}

resolved, err := unresolved.Resolve(func(req physical.CatalogRequest) (physical.CatalogResponse, error) {
switch req.Kind {
case physical.CatalogRequestKindResolveShardDescriptorsWithShard:
descriptors, err := metastoreCatalog.ResolveShardDescriptorsWithShard(req.Selector, req.Predicates, req.Shard, req.From, req.Through)
if err != nil {
return physical.CatalogResponse{}, err
}
return physical.CatalogResponse{
Kind: physical.CatalogRequestKindResolveShardDescriptorsWithShard,
Descriptors: descriptors,
}, nil
default:
return physical.CatalogResponse{}, fmt.Errorf("unsupported catalog request kind: %v", req.Kind)
}
})
if err != nil {
return nil, fmt.Errorf("resolving catalog: %w", err)
}

level.Info(e.logger).Log("msg", "finished catalog lookups", "duration", time.Since(start).String(), "requests", unresolved.RequestsCount())
return resolved, nil
}

// buildWorkflow builds a workflow from the given physical plan.
func (e *Engine) buildWorkflow(ctx context.Context, logger log.Logger, physicalPlan *physical.Plan) (*workflow.Workflow, time.Duration, error) {
tenantID, err := user.ExtractOrgID(ctx)
Expand Down
Loading