-
Notifications
You must be signed in to change notification settings - Fork 3.9k
feat(engine): ahead of time catalog lookups #20189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
As a preparation for distributed metastore lookups I introduce ahead of time catalog lookups. I explicitly query metastore before the physical plan is built. The workflow is: - build a physical plan to collect catalog requests required - resolve catalog requests to responses (atm I use metastore catalog to do that) - save the responses to a new ResolvedCatalog - pass resolved catalog to physical planner to build a real physical plan
|
💻 Deploy preview available (feat(engine): ahead of time catalog lookups): |
| // FilterDescriptorsForShard filters the section descriptors for a given shard. | ||
| // It returns the locations, streams, and sections for the shard. | ||
| // TODO: Improve filtering: this method could be improved because it doesn't resolve the stream IDs to sections, even though this information is available. Instead, it resolves streamIDs to the whole object. | ||
| func FilterDescriptorsForShard(shard ShardInfo, sectionDescriptors []*metastore.DataobjSectionDescriptor) ([]FilteredShardDescriptor, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we use the ShardInfo anymore with the new scheduler. It might be time to remove it. We can do it after this is merged though.
| } | ||
|
|
||
| unresolved := physical.NewUnresolvedCatalog() | ||
| collectorPlanner := physical.NewPlanner(physical.NewContext(params.Start(), params.End()), unresolved) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feel a bit strange to me that we do physical planning twice, once to gather fake requests and once with the real data. Did you do it this way to maintain a separation of concerns or some other reason?
Physical planning is reasonably cheap without the metastore calls now but it might change in the future if we build different physical plans depending on the output of the catalog requests, or even issue new requests based off the old ones. Does it make sense to instead implement a catalog which will resolve requests via distributing metaqueries directly instead of doing this 2 step process?
That said, if this is just a stepping stone to the full implementation I'm happy to approve it to keep things moving!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to give the engine full control of the (meta)query execution
- to streamline the query execution pipeline (engine triggers all the queries, not planner implicitly)
- and to unblock certain optimizations. The requests we collect are essentially metaqueries we need to execute, hence if we know them beforehand we can optimize their execution (deduplicate / run in parallel / etc).
At the moment I think there is no difference in how we collect / execute metaqueries (given there's at most 1 catalog request per physical plan, right?), I can simplify the approach to delegate the work to a new catalog.
if we build different physical plans depending on the output of the catalog requests, or even issue new requests based off the old ones
Yes! Current approach doesn't work in this case, so I was thinking about a loop like
for catalogReqs := plan.Unresolved() ;; len(catalogReqs) > 0 {
catalogResps := e.queryCatalog(catalogReqs)
plan.Resolve(catalogResps)
}I'm still slightly hesitant about executing metaqueries one by one in the order that depends on the physical planner walk just because we won't have enough context to optimize these queries (parallelize / decouple).
Wdyt about all above?
I'm considering simplifying this part for now (applying "implement a catalog which will resolve requests via distributing metaqueries directly" suggestion) and just keeping a comment that we need to revisit this part when we have multiple metaqueries per physical plan.
As a preparation for distributed metastore lookups I introduce ahead of time catalog lookups. I explicitly query metastore before the physical plan is built. The workflow is:
Despite of the metastore catalog still being used this approach allows us to later reuse engine's capability to run distributed queries to resolve catalog requests (by replacing this resolve func). This change will be introduced in a different PR (WIP example).
Checklist
CONTRIBUTING.mdguide (required)featPRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.mddeprecated-config.yamlanddeleted-config.yamlfiles respectively in thetools/deprecated-config-checkerdirectory. Example PR