Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: some code clean after all logical op are migrated #56426

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ go_library(
"//pkg/planner/util/domainmisc",
"//pkg/planner/util/fixcontrol",
"//pkg/planner/util/optimizetrace",
"//pkg/planner/util/optimizetrace/logicaltrace",
"//pkg/planner/util/tablesampler",
"//pkg/planner/util/utilfuncp",
"//pkg/privilege",
Expand Down
6 changes: 1 addition & 5 deletions pkg/planner/core/core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ import (

func init() {
// For code refactor init.
utilfuncp.FindBestTask = findBestTask
utilfuncp.PruneByItems = pruneByItems
utilfuncp.CanPushToCopImpl = canPushToCopImpl
utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove these assignments?

utilfuncp.FindBestTask4BaseLogicalPlan = findBestTask
utilfuncp.FindBestTask4LogicalCTE = findBestTask4LogicalCTE
utilfuncp.FindBestTask4LogicalShow = findBestTask4LogicalShow
utilfuncp.FindBestTask4LogicalCTETable = findBestTask4LogicalCTETable
Expand Down Expand Up @@ -68,7 +65,6 @@ func init() {
utilfuncp.ApplyPredicateSimplification = applyPredicateSimplification
utilfuncp.DeriveStats4LogicalIndexScan = deriveStats4LogicalIndexScan
utilfuncp.DeriveStats4LogicalTableScan = deriveStats4LogicalTableScan
utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan

// For mv index init.
cardinality.GetTblInfoForUsedStatsByPhysicalID = getTblInfoForUsedStatsByPhysicalID
Expand Down
86 changes: 2 additions & 84 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/cardinality"
Expand Down Expand Up @@ -2426,87 +2425,6 @@ func exhaustPhysicalPlans4LogicalWindow(lp base.LogicalPlan, prop *property.Phys
return windows, true, nil
}

func canPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bool) bool {
p := lp.GetBaseLogicalPlan().(*logicalop.BaseLogicalPlan)
ret := true
for _, ch := range p.Children() {
switch c := ch.(type) {
case *logicalop.DataSource:
validDs := false
indexMergeIsIntersection := false
for _, path := range c.PossibleAccessPaths {
if path.StoreType == storeTp {
validDs = true
}
if len(path.PartialIndexPaths) > 0 && path.IndexMergeIsIntersection {
indexMergeIsIntersection = true
}
}
ret = ret && validDs

_, isTopN := p.Self().(*logicalop.LogicalTopN)
_, isLimit := p.Self().(*logicalop.LogicalLimit)
if (isTopN || isLimit) && indexMergeIsIntersection {
return false // TopN and Limit cannot be pushed down to the intersection type IndexMerge
}

if c.TableInfo.TableCacheStatusType != model.TableCacheStatusDisable {
// Don't push to cop for cached table, it brings more harm than good:
// 1. Those tables are small enough, push to cop can't utilize several TiKV to accelerate computation.
// 2. Cached table use UnionScan to read the cache data, and push to cop is not supported when an UnionScan exists.
// Once aggregation is pushed to cop, the cache data can't be use any more.
return false
}
case *logicalop.LogicalUnionAll:
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, true)
case *logicalop.LogicalSort:
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, true)
case *logicalop.LogicalProjection:
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual)
case *logicalop.LogicalExpand:
// Expand itself only contains simple col ref and literal projection. (always ok, check its child)
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual)
case *logicalop.LogicalTableDual:
return storeTp == kv.TiFlash && considerDual
case *logicalop.LogicalAggregation, *logicalop.LogicalSelection, *logicalop.LogicalJoin, *logicalop.LogicalWindow:
if storeTp != kv.TiFlash {
return false
}
ret = ret && c.CanPushToCop(storeTp)
// These operators can be partially push down to TiFlash, so we don't raise warning for them.
case *logicalop.LogicalLimit, *logicalop.LogicalTopN:
return false
case *logicalop.LogicalSequence:
return storeTp == kv.TiFlash
case *logicalop.LogicalCTE:
if storeTp != kv.TiFlash {
return false
}
if c.Cte.RecursivePartLogicalPlan != nil || !c.Cte.SeedPartLogicalPlan.CanPushToCop(storeTp) {
return false
}
return true
default:
p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced(
"MPP mode may be blocked because operator `" + c.TP() + "` is not supported now.")
return false
}
}
return ret
}

func getEnforcedStreamAggs(la *logicalop.LogicalAggregation, prop *property.PhysicalProperty) []base.PhysicalPlan {
if prop.IsFlashProp() {
return nil
Expand Down Expand Up @@ -2972,7 +2890,7 @@ func exhaustPhysicalPlans4LogicalUnionAll(lp base.LogicalPlan, prop *property.Ph
if prop.TaskTp == property.MppTaskType && prop.MPPPartitionTp != property.AnyType {
return nil, true, nil
}
canUseMpp := p.SCtx().GetSessionVars().IsMPPAllowed() && canPushToCopImpl(&p.BaseLogicalPlan, kv.TiFlash, true)
canUseMpp := p.SCtx().GetSessionVars().IsMPPAllowed() && logicalop.CanPushToCopImpl(&p.BaseLogicalPlan, kv.TiFlash, true)
chReqProps := make([]*property.PhysicalProperty, 0, p.ChildLen())
for range p.Children() {
if canUseMpp && prop.TaskTp == property.MppTaskType {
Expand Down Expand Up @@ -3040,7 +2958,7 @@ func exhaustPhysicalPlans4LogicalSort(lp base.LogicalPlan, prop *property.Physic
return ret, true, nil
}
} else if prop.TaskTp == property.MppTaskType && prop.RejectSort {
if canPushToCopImpl(&ls.BaseLogicalPlan, kv.TiFlash, true) {
if logicalop.CanPushToCopImpl(&ls.BaseLogicalPlan, kv.TiFlash, true) {
ps := getNominalSortSimple(ls, prop)
return []base.PhysicalPlan{ps}, true, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/planner/core/operator/logicalop/base_logical_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (p *BaseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column, opt
// FindBestTask implements LogicalPlan.<3rd> interface.
func (p *BaseLogicalPlan) FindBestTask(prop *property.PhysicalProperty, planCounter *base.PlanCounterTp,
opt *optimizetrace.PhysicalOptimizeOp) (bestTask base.Task, cntPlan int64, err error) {
return utilfuncp.FindBestTask(p, prop, planCounter, opt)
return utilfuncp.FindBestTask4BaseLogicalPlan(p, prop, planCounter, opt)
}

// BuildKeyInfo implements LogicalPlan.<4th> interface.
Expand All @@ -156,7 +156,7 @@ func (p *BaseLogicalPlan) BuildKeyInfo(_ *expression.Schema, _ []*expression.Sch

// PushDownTopN implements the LogicalPlan.<5th> interface.
func (p *BaseLogicalPlan) PushDownTopN(topNLogicalPlan base.LogicalPlan, opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan {
return utilfuncp.PushDownTopNForBaseLogicalPlan(p, topNLogicalPlan, opt)
return pushDownTopNForBaseLogicalPlan(p, topNLogicalPlan, opt)
}

// DeriveTopN implements the LogicalPlan.<6th> interface.
Expand Down Expand Up @@ -309,7 +309,7 @@ func (p *BaseLogicalPlan) RollBackTaskMap(ts uint64) {
// For TiFlash, it will check whether the operator is supported, but note that the check
// might be inaccurate.
func (p *BaseLogicalPlan) CanPushToCop(storeTp kv.StoreType) bool {
return utilfuncp.CanPushToCopImpl(p, storeTp, false)
return CanPushToCopImpl(p, storeTp, false)
}

// ExtractFD implements LogicalPlan.<22nd> interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/operator/logicalop/logical_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (la *LogicalAggregation) PruneColumns(parentUsedCols []*expression.Column,
selfUsedCols = expression.ExtractColumnsFromExpressions(selfUsedCols, aggrFunc.Args, nil)

var cols []*expression.Column
aggrFunc.OrderByItems, cols = utilfuncp.PruneByItems(la, aggrFunc.OrderByItems, opt)
aggrFunc.OrderByItems, cols = pruneByItems(la, aggrFunc.OrderByItems, opt)
selfUsedCols = append(selfUsedCols, cols...)
}
if len(la.AggFuncs) == 0 || (!allFirstRow && allRemainFirstRow) {
Expand Down
136 changes: 136 additions & 0 deletions pkg/planner/core/operator/logicalop/logical_plans_misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ package logicalop

import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/constraint"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace/logicaltrace"
)

var (
Expand Down Expand Up @@ -98,3 +103,134 @@ func addSelection(p base.LogicalPlan, child base.LogicalPlan, conditions []expre
p.Children()[chIdx] = selection
AppendAddSelectionTraceStep(p, child, selection, opt)
}

// pushDownTopNForBaseLogicalPlan can be moved when LogicalTopN has been moved to logicalop.
func pushDownTopNForBaseLogicalPlan(lp base.LogicalPlan, topNLogicalPlan base.LogicalPlan,
opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan {
s := lp.GetBaseLogicalPlan().(*BaseLogicalPlan)
var topN *LogicalTopN
if topNLogicalPlan != nil {
topN = topNLogicalPlan.(*LogicalTopN)
}
p := s.Self()
for i, child := range p.Children() {
p.Children()[i] = child.PushDownTopN(nil, opt)
}
if topN != nil {
return topN.AttachChild(p, opt)
}
return p
}

func pruneByItems(p base.LogicalPlan, old []*util.ByItems, opt *optimizetrace.LogicalOptimizeOp) (byItems []*util.ByItems,
parentUsedCols []*expression.Column) {
prunedByItems := make([]*util.ByItems, 0)
byItems = make([]*util.ByItems, 0, len(old))
seen := make(map[string]struct{}, len(old))
for _, byItem := range old {
pruned := true
hash := string(byItem.Expr.HashCode())
_, hashMatch := seen[hash]
seen[hash] = struct{}{}
cols := expression.ExtractColumns(byItem.Expr)
if !hashMatch {
if len(cols) == 0 {
if !expression.IsRuntimeConstExpr(byItem.Expr) {
pruned = false
byItems = append(byItems, byItem)
}
} else if byItem.Expr.GetType(p.SCtx().GetExprCtx().GetEvalCtx()).GetType() != mysql.TypeNull {
pruned = false
parentUsedCols = append(parentUsedCols, cols...)
byItems = append(byItems, byItem)
}
}
if pruned {
prunedByItems = append(prunedByItems, byItem)
}
}
logicaltrace.AppendByItemsPruneTraceStep(p, prunedByItems, opt)
return
}

// CanPushToCopImpl checks whether the logical plan can be pushed to coprocessor.
func CanPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bool) bool {
p := lp.GetBaseLogicalPlan().(*BaseLogicalPlan)
ret := true
for _, ch := range p.Children() {
switch c := ch.(type) {
case *DataSource:
validDs := false
indexMergeIsIntersection := false
for _, path := range c.PossibleAccessPaths {
if path.StoreType == storeTp {
validDs = true
}
if len(path.PartialIndexPaths) > 0 && path.IndexMergeIsIntersection {
indexMergeIsIntersection = true
}
}
ret = ret && validDs

_, isTopN := p.Self().(*LogicalTopN)
_, isLimit := p.Self().(*LogicalLimit)
if (isTopN || isLimit) && indexMergeIsIntersection {
return false // TopN and Limit cannot be pushed down to the intersection type IndexMerge
}

if c.TableInfo.TableCacheStatusType != model.TableCacheStatusDisable {
// Don't push to cop for cached table, it brings more harm than good:
// 1. Those tables are small enough, push to cop can't utilize several TiKV to accelerate computation.
// 2. Cached table use UnionScan to read the cache data, and push to cop is not supported when an UnionScan exists.
// Once aggregation is pushed to cop, the cache data can't be use any more.
return false
}
case *LogicalUnionAll:
if storeTp != kv.TiFlash {
return false
}
ret = ret && CanPushToCopImpl(&c.BaseLogicalPlan, storeTp, true)
case *LogicalSort:
if storeTp != kv.TiFlash {
return false
}
ret = ret && CanPushToCopImpl(&c.BaseLogicalPlan, storeTp, true)
case *LogicalProjection:
if storeTp != kv.TiFlash {
return false
}
ret = ret && CanPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual)
case *LogicalExpand:
// Expand itself only contains simple col ref and literal projection. (always ok, check its child)
if storeTp != kv.TiFlash {
return false
}
ret = ret && CanPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual)
case *LogicalTableDual:
return storeTp == kv.TiFlash && considerDual
case *LogicalAggregation, *LogicalSelection, *LogicalJoin, *LogicalWindow:
if storeTp != kv.TiFlash {
return false
}
ret = ret && c.CanPushToCop(storeTp)
// These operators can be partially push down to TiFlash, so we don't raise warning for them.
case *LogicalLimit, *LogicalTopN:
return false
case *LogicalSequence:
return storeTp == kv.TiFlash
case *LogicalCTE:
if storeTp != kv.TiFlash {
return false
}
if c.Cte.RecursivePartLogicalPlan != nil || !c.Cte.SeedPartLogicalPlan.CanPushToCop(storeTp) {
return false
}
return true
default:
p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced(
"MPP mode may be blocked because operator `" + c.TP() + "` is not supported now.")
return false
}
}
return ret
}
2 changes: 1 addition & 1 deletion pkg/planner/core/operator/logicalop/logical_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (ls *LogicalSort) ReplaceExprColumns(replace map[string]*expression.Column)
// we do prune them. Note that we can't prune the expressions contain non-deterministic functions, such as rand().
func (ls *LogicalSort) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
var cols []*expression.Column
ls.ByItems, cols = utilfuncp.PruneByItems(ls, ls.ByItems, opt)
ls.ByItems, cols = pruneByItems(ls, ls.ByItems, opt)
parentUsedCols = append(parentUsedCols, cols...)
var err error
ls.Children()[0], err = ls.Children()[0].PruneColumns(parentUsedCols, opt)
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/operator/logicalop/logical_top_n.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (lt *LogicalTopN) ReplaceExprColumns(replace map[string]*expression.Column)
func (lt *LogicalTopN) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
child := lt.Children()[0]
var cols []*expression.Column
lt.ByItems, cols = utilfuncp.PruneByItems(lt, lt.ByItems, opt)
lt.ByItems, cols = pruneByItems(lt, lt.ByItems, opt)
parentUsedCols = append(parentUsedCols, cols...)
var err error
lt.Children()[0], err = child.PruneColumns(parentUsedCols, opt)
Expand Down
35 changes: 0 additions & 35 deletions pkg/planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@ import (
"context"
"slices"

"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace/logicaltrace"
"github.com/pingcap/tidb/pkg/util/intest"
)

Expand Down Expand Up @@ -64,37 +60,6 @@ func noZeroColumnLayOut(p base.LogicalPlan) bool {
return true
}

func pruneByItems(p base.LogicalPlan, old []*util.ByItems, opt *optimizetrace.LogicalOptimizeOp) (byItems []*util.ByItems,
parentUsedCols []*expression.Column) {
prunedByItems := make([]*util.ByItems, 0)
byItems = make([]*util.ByItems, 0, len(old))
seen := make(map[string]struct{}, len(old))
for _, byItem := range old {
pruned := true
hash := string(byItem.Expr.HashCode())
_, hashMatch := seen[hash]
seen[hash] = struct{}{}
cols := expression.ExtractColumns(byItem.Expr)
if !hashMatch {
if len(cols) == 0 {
if !expression.IsRuntimeConstExpr(byItem.Expr) {
pruned = false
byItems = append(byItems, byItem)
}
} else if byItem.Expr.GetType(p.SCtx().GetExprCtx().GetEvalCtx()).GetType() != mysql.TypeNull {
pruned = false
parentUsedCols = append(parentUsedCols, cols...)
byItems = append(byItems, byItem)
}
}
if pruned {
prunedByItems = append(prunedByItems, byItem)
}
}
logicaltrace.AppendByItemsPruneTraceStep(p, prunedByItems, opt)
return
}

// Name implements base.LogicalOptRule.<1st> interface.
func (*ColumnPruner) Name() string {
return "column_prune"
Expand Down
Loading