Open
Description
I found a problem, if i set runtime.GOMAXPROCS(1), blow code has a litter problem, aggregation function can't work accurately:
The aggregation function directly enters the final state,
physicalplan.go
func Build(
ctx context.Context,
pool memory.Allocator,
tracer trace.Tracer,
s *dynparquet.Schema,
plan *logicalplan.LogicalPlan,
options ...Option,
) (*OutputPlan, error) {
......
case plan.Aggregation != nil:
ordered, err := shouldPlanOrderedAggregate(execOpts, oInfo, plan.Aggregation)
if err != nil {
// TODO(asubiotto): Log the error.
ordered = false
}
var sync PhysicalPlan
if len(prev) > 1 { // because len(prev) == 1, so can't execute it
// These aggregate operators need to be synchronized.
if ordered && len(plan.Aggregation.GroupExprs) > 0 {
sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs)
} else {
sync = Synchronize(len(prev))
}
}
seed := maphash.MakeSeed()
for i := 0; i < len(prev); i++ {
a, err := Aggregate(pool, tracer, plan.Aggregation, sync == nil, ordered, seed)
// because sync is nil, so aggregation directly entering the final state, so it cannot work correctly
if err != nil {
visitErr = err
return false
}
prev[i].SetNext(a)
prev[i] = a
if sync != nil {
a.SetNext(sync)
}
}
if sync != nil {
// Plan an aggregate operator to run an aggregation on all the
// aggregations.
a, err := Aggregate(pool, tracer, plan.Aggregation, true, ordered, seed)
if err != nil {
visitErr = err
return false
}
sync.SetNext(a)
prev = prev[0:1]
prev[0] = a
}
if ordered {
oInfo.nodeMaintainsOrdering()
}
default:
panic("Unsupported plan")
}
}