diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index a8654e27291c06..567124a73425fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids.processor.post; import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.processor.post.setRuntimeFilter.SetRuntimeFilterGenerator; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TRuntimeFilterMode; @@ -71,6 +72,7 @@ public List getProcessors() { builder.add(new PushTopnToAgg()); } builder.add(new TopNScanOpt()); + builder.add(new SetRuntimeFilterGenerator()); builder.add(new FragmentProcessor()); if (!cascadesContext.getConnectContext().getSessionVariable().getRuntimeFilterMode() .toUpperCase().equals(TRuntimeFilterMode.OFF.name())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/PushDownContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/PushDownContext.java new file mode 100644 index 00000000000000..828c38f53e5b5e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/PushDownContext.java @@ -0,0 +1,41 @@ +package org.apache.doris.nereids.processor.post.setRuntimeFilter; + +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; + +import com.alibaba.google.common.collect.ImmutableList; + +import java.util.List; + +public class PushDownContext { + private SRFContext sRFContext; + private PhysicalSetOperation node; + private Expression source; + private Expression target; + + public PushDownContext( + SRFContext srfContext, + PhysicalSetOperation node, + Expression source, Expression target) { + this.sRFContext = srfContext; + this.node = node; + this.source = source; + this.target = target; + } + + public PhysicalSetOperation getNode() { + return node; + } + + public Expression getSource() { + return source; + } + + public Expression getTarget() { + return target; + } + + public SRFContext getSRFContext() { + return sRFContext; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/SRFContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/SRFContext.java new file mode 100644 index 00000000000000..d41a66e34cb93a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/SRFContext.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.processor.post.setRuntimeFilter; + +import org.apache.doris.common.IdGenerator; +import org.apache.doris.planner.RuntimeFilterId; + +import java.util.ArrayList; +import java.util.List; + +public class SRFContext { + private final IdGenerator generator = RuntimeFilterId.createGenerator(); + private final List srfs = new ArrayList<>(); + + public void addSrf(SetRuntimeFilter srf) { + srfs.add(srf); + } + + public RuntimeFilterId nextId() { + return generator.getNextId(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/SRFPushDownVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/SRFPushDownVisitor.java new file mode 100644 index 00000000000000..ff9fee8fd58ea3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/SRFPushDownVisitor.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.processor.post.setRuntimeFilter; + +import org.apache.doris.nereids.processor.post.setRuntimeFilter.PushDownContext; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; + +public class SRFPushDownVisitor extends PlanVisitor { + @Override + public Boolean visit(Plan plan, PushDownContext ctx) { + boolean pushed = false; + for (Plan child : plan.children()) { + pushed |= child.accept(this, ctx); + } + return pushed; + } + + @Override + public Boolean visitPhysicalCatalogRelation(PhysicalCatalogRelation relation, PushDownContext ctx) { + if (relation.getOutputSet().contains(ctx.getTarget())) { + SetRuntimeFilter srf = new SetRuntimeFilter( + ctx.getSRFContext().nextId(), + ctx.getNode(), ctx.getSource(), + relation, ctx.getTarget()); + ctx.getSRFContext().addSrf(srf); + relation.addSetRuntimeFilter(srf); + ctx.getNode().addSetRuntimeFilter(srf); + return true; + } + return false; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/SetRuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/SetRuntimeFilter.java new file mode 100644 index 00000000000000..da9a3e45afacc3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/SetRuntimeFilter.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.processor.post.setRuntimeFilter; + +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.plans.algebra.SetOperation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation; +import org.apache.doris.planner.RuntimeFilterId; + +public class SetRuntimeFilter { + private RuntimeFilterId id; + private SetOperation node; + private Expression source; + private PhysicalCatalogRelation relation; + private Expression target; + + public SetRuntimeFilter(RuntimeFilterId id, SetOperation node, Expression source, + PhysicalCatalogRelation relation, Expression target) { + this.node = node; + this.source = source; + this.relation = relation; + this.target = target; + this.id = id; + } + + public RuntimeFilterId getId() { + return id; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("SRF").append(id.asInt()).append("(").append(source).append("->").append(target).append(")"); + return sb.toString(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/SetRuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/SetRuntimeFilterGenerator.java new file mode 100644 index 00000000000000..b6f375341283fa --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/setRuntimeFilter/SetRuntimeFilterGenerator.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.processor.post.setRuntimeFilter; + +import org.apache.doris.common.IdGenerator; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.processor.post.PlanPostProcessor; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect; +import org.apache.doris.planner.RuntimeFilterId; + +import java.util.List; +import java.util.stream.Collectors; + +public class SetRuntimeFilterGenerator extends PlanPostProcessor { + private SRFContext srfContext = new SRFContext(); + @Override + public Plan visitPhysicalIntersect(PhysicalIntersect intersect, CascadesContext context) { + Plan src = intersect.child(1); + int slotIdx = chooseSourceSlots(intersect); + for (int childId = 1; childId < intersect.children().size(); childId++) { + Plan child = intersect.children().get(childId); + PushDownContext pushDownContext = new PushDownContext( + srfContext, + intersect, + intersect.child(0).getOutput().get(slotIdx), + intersect.child(childId).getOutput().get(slotIdx)); + SRFPushDownVisitor visitor = new SRFPushDownVisitor(); + child.accept(visitor, pushDownContext); + } + return visitPhysicalSetOperation(intersect, context); + } + + private int chooseSourceSlots(PhysicalIntersect intersect) { + // TODO: choose best slots by ndv + return 0; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java index 2146b9fe5de04a..b7d02e3fb39752 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids.trees.plans.physical; import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.processor.post.setRuntimeFilter.SetRuntimeFilter; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.plans.AbstractPlan; @@ -41,6 +42,7 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements PhysicalPlan, Explainable { protected final PhysicalProperties physicalProperties; private final List appliedRuntimeFilters = Lists.newArrayList(); + protected final List setRuntimeFilters = Lists.newArrayList(); public AbstractPhysicalPlan(PlanType type, LogicalProperties logicalProperties, Plan... children) { this(type, Optional.empty(), logicalProperties, children); @@ -85,6 +87,10 @@ public void addAppliedRuntimeFilter(RuntimeFilter filter) { appliedRuntimeFilters.add(filter); } + public void addSetRuntimeFilter(SetRuntimeFilter filter) { + setRuntimeFilters.add(filter); + } + public void removeAppliedRuntimeFilter(RuntimeFilter filter) { appliedRuntimeFilters.remove(filter); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIntersect.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIntersect.java index 3194939a9e6481..049db3e996cdec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIntersect.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIntersect.java @@ -80,7 +80,9 @@ public String toString() { "qualifier", qualifier, "outputs", outputs, "regularChildrenOutputs", regularChildrenOutputs, - "stats", statistics); + "stats", statistics, + "SRFs", setRuntimeFilters + ); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java index 698a189aa265ba..b02a48ad44f298 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java @@ -118,9 +118,10 @@ public List getBaseOutputs() { @Override public String toString() { - StringBuilder builder = new StringBuilder(); + StringBuilder jrfBuilder = new StringBuilder(); if (!getAppliedRuntimeFilters().isEmpty()) { - getAppliedRuntimeFilters().forEach(rf -> builder.append(" RF").append(rf.getId().asInt())); + getAppliedRuntimeFilters().forEach( + jrf -> jrfBuilder.append(" RF").append(jrf.getId().asInt())); } String index = ""; if (selectedIndexId != getTable().getBaseIndexId()) { @@ -133,7 +134,8 @@ public String toString() { } return Utils.toSqlString("PhysicalOlapScan[" + table.getName() + index + partitions + "]" + getGroupIdWithPrefix(), - "stats", statistics, "RFs", builder + "stats", statistics, "JRFs", jrfBuilder, + "SRFs", setRuntimeFilters ); }