-
Notifications
You must be signed in to change notification settings - Fork 26
Cost based planning #311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Cost based planning #311
Conversation
0295eec to
f0ee460
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the upstream work I am taking on will help or modify how this is approached: apache/datafusion#19973
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in this should be provided as a ColumnStatistic by single-node df
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, completely agree
| ProjectionExec: task_count=Some(1) output_rows=184 cost_class=XS accumulated_cost=11592 output_bytes=4784 | ||
| SortPreservingMergeExec: task_count=Some(1) output_rows=184 cost_class=L accumulated_cost=8372 output_bytes=6440 | ||
| [NetworkBoundary] Coalesce: task_count=Some(1) output_rows=184 | ||
| SortExec: task_count=Some(4) output_rows=184 cost_class=XL accumulated_cost=18915 output_bytes=6440 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha cool to see scale up on compute😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! the compute cost calculation is very rough right now, I want to improve it before moving this out of draft, and ideally all the stats used for determining how many bytes will flow through the graph are given by DF upstream.
| /// Given a list of children with a different compute cost each, and a restriction about the maximum | ||
| /// tasks in which they are allowed to run, it assigns tasks counts to them so that the following | ||
| /// conditions are met: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh man, I see what you are saying. This is really tricky
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is...
| // Adjust total subtasks to match budget (or get as close as possible) | ||
| let mut total_subtasks: usize = child_subtask_counts.iter().sum(); | ||
|
|
||
| // Trim if over budget: reduce from children with the highest subtask count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build a heap then pop the top, decrement and push it back? Would reduce time complexity but don't know if its worth legibility
I do remember some unions being quite large though 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙈 this does look like a heap-based problem, although I bet that the code can get pretty complicated if we go down that path.
I think unless a heap approach clearly demonstrates better benchmarks we probably should keep this simple.
| } | ||
|
|
||
| // Expand if under budget: add to children with the highest cost that can expand | ||
| while total_subtasks < task_count_budget { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another heap could be used here I believe
pop the top (highest cost), checks if it can expand, increment and push it back on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😵💫
13de2c9 to
6b3e755
Compare
3c213a5 to
28d7425
Compare
28d7425 to
9a4565b
Compare
30504cd to
705a432
Compare
d20ab72 to
7ff2066
Compare
95cc751 to
e211c01
Compare
|
I added some tests that check the difference between the estimated number of rows and the actual number of rows here: Conclusion: even if the |
Reworks the distributed planner so that it integrates with DataFusion upstream statistics system in favor of having users provide a custom TaskEstimator.
There are some key changes in this PR that rework how this project assign tasks and stages to a plan:
Remove TaskEstimator
Users are no longer free to specific how many tasks should be used for a distributed query:
ExecutionPlan::partition_statistics()method. Based on this statistics, Distributed DataFusion calculates how many tasks are appropriateRely on upstream statistics system
This PR heavily consumes the
Statisticsprovided by the different nodes in order to estimate how much data is going to flow through them.There are still some gaps in upstream statistics that are bridge in this project with some sane defaults.
Compute cost assignation
One of the biggest additions in this PR is the compute complexity estimation for the different nodes. Each node has cost attached that is estimated based on how compute heavy they are.
The cost is measured in "bytes processed", which estimates how many bytes are expected to be processed by the node given the node itself, and the estimated rows and bytes that are going to flow through it given by upstream's
ExecutionPlan::partition_statistics.The computational complexity is taking into account for the different operators with the following enum:
Results
TODO: choose better defaults in order to improve performance