Skip to content

Commit

Permalink
[CALCITE-6846] Support basic dphyp join reorder algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
silundong committed Feb 26, 2025
1 parent 5a1b15c commit 6aa88b2
Show file tree
Hide file tree
Showing 9 changed files with 1,291 additions and 0 deletions.
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/calcite/rel/rules/CoreRules.java
Original file line number Diff line number Diff line change
Expand Up @@ -816,4 +816,15 @@ private CoreRules() {}
WINDOW_REDUCE_EXPRESSIONS =
ReduceExpressionsRule.WindowReduceExpressionsRule.WindowReduceExpressionsRuleConfig
.DEFAULT.toRule();

/** Rule that flattens a tree of {@link LogicalJoin}s
* into a single {@link HyperGraph} with N inputs. */
public static final JoinToHyperGraphRule JOIN_TO_HYPER_GRAPH =
JoinToHyperGraphRule.Config.DEFAULT.toRule();

/** Rule that re-orders a {@link Join} tree using dphyp algorithm.
*
* @see #JOIN_TO_HYPER_GRAPH */
public static final DphypJoinReorderRule HYPER_GRAPH_OPTIMIZE =
DphypJoinReorderRule.Config.DEFAULT.toRule();
}
181 changes: 181 additions & 0 deletions core/src/main/java/org/apache/calcite/rel/rules/DpHyp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.rel.rules;

import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;

import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.HashMap;
import java.util.List;

/**
* The core process of dphyp enumeration algorithm.
*/
public class DpHyp {

private HyperGraph hyperGraph;

private HashMap<Long, RelNode> dpTable;

private RelBuilder builder;

private RelMetadataQuery mq;

public DpHyp(HyperGraph hyperGraph, RelBuilder builder, RelMetadataQuery relMetadataQuery) {
this.hyperGraph = hyperGraph;
this.dpTable = new HashMap<>();
this.builder = builder;
this.mq = relMetadataQuery;
}

public void startEnumerateJoin() {
int size = hyperGraph.getInputs().size();
for (int i = 0; i < size; i++) {
long singleNode = LongBitmap.newBitmap(i);
dpTable.put(singleNode, hyperGraph.getInput(i));
hyperGraph.initEdgeBitMap(singleNode);
}

// start enumerating from the second to last
for (int i = size - 2; i >= 0; i--) {
long csg = LongBitmap.newBitmap(i);
long forbidden = csg - 1;
emitCsg(csg);
enumerateCsgRec(csg, forbidden);
}
}

private void emitCsg(long csg) {
long forbidden = csg | LongBitmap.getBvBitmap(csg);
long neighbors = hyperGraph.getNeighborBitmap(csg, forbidden);

LongBitmap.ReverseIterator reverseIterator = new LongBitmap.ReverseIterator(neighbors);
for (long cmp : reverseIterator) {
List<HyperEdge> edges = hyperGraph.connectCsgCmp(csg, cmp);
if (!edges.isEmpty()) {
emitCsgCmp(csg, cmp, edges);
}
// forbidden the nodes that smaller than current cmp when extend cmp, e.g.
// neighbors = {t1, t2}, t1 and t2 are connected.
// when extented t2, we will get (t1, t2)
// when extented t1, we will get (t1, t2) repeated
long newForbidden =
(cmp | LongBitmap.getBvBitmap(cmp)) & neighbors;
newForbidden = newForbidden | forbidden;
enumerateCmpRec(csg, cmp, newForbidden);
}
}

private void enumerateCsgRec(long csg, long forbidden) {
long neighbors = hyperGraph.getNeighborBitmap(csg, forbidden);
LongBitmap.SubsetIterator subsetIterator = new LongBitmap.SubsetIterator(neighbors);
for (long subNeighbor : subsetIterator) {
hyperGraph.updateEdgesForUnion(csg, subNeighbor);
long newCsg = csg | subNeighbor;
if (dpTable.containsKey(newCsg)) {
emitCsg(newCsg);
}
}
long newForbidden = forbidden | neighbors;
subsetIterator.reset();
for (long subNeighbor : subsetIterator) {
long newCsg = csg | subNeighbor;
enumerateCsgRec(newCsg, newForbidden);
}
}

private void enumerateCmpRec(long csg, long cmp, long forbidden) {
long neighbors = hyperGraph.getNeighborBitmap(cmp, forbidden);
LongBitmap.SubsetIterator subsetIterator = new LongBitmap.SubsetIterator(neighbors);
for (long subNeighbor : subsetIterator) {
long newCmp = cmp | subNeighbor;
hyperGraph.updateEdgesForUnion(cmp, subNeighbor);
if (dpTable.containsKey(newCmp)) {
List<HyperEdge> edges = hyperGraph.connectCsgCmp(csg, newCmp);
if (!edges.isEmpty()) {
emitCsgCmp(csg, newCmp, edges);
}
}
}
long newForbidden = forbidden | neighbors;
subsetIterator.reset();
for (long subNeighbor : subsetIterator) {
long newCmp = cmp | subNeighbor;
enumerateCmpRec(csg, newCmp, newForbidden);
}
}

private void emitCsgCmp(long csg, long cmp, List<HyperEdge> edges) {
RelNode child1 = dpTable.get(csg);
RelNode child2 = dpTable.get(cmp);
if (child1 == null || child2 == null) {
throw new IllegalArgumentException(
"csg and cmp were not enumerated in the previous dp process");
}

JoinRelType joinType = hyperGraph.extractJoinType(edges);
if (joinType == null) {
return;
}
RexNode joinCond1 = hyperGraph.extractJoinCond(child1, child2, edges);
RelNode newPlan1 = builder
.push(child1)
.push(child2)
.join(joinType, joinCond1)
.build();

// swap left and right
RexNode joinCond2 = hyperGraph.extractJoinCond(child2, child1, edges);
RelNode newPlan2 = builder
.push(child2)
.push(child1)
.join(joinType, joinCond2)
.build();
RelNode winPlan = chooseBetterPlan(newPlan1, newPlan2);

RelNode oriPlan = dpTable.get(csg | cmp);
if (oriPlan != null) {
winPlan = chooseBetterPlan(winPlan, oriPlan);
}
dpTable.put(csg | cmp, winPlan);
}

public @Nullable RelNode getBestPlan() {
int size = hyperGraph.getInputs().size();
long wholeGraph = LongBitmap.newBitmapBetween(0, size);
return dpTable.get(wholeGraph);
}

private RelNode chooseBetterPlan(RelNode plan1, RelNode plan2) {
RelOptCost cost1 = mq.getCumulativeCost(plan1);
RelOptCost cost2 = mq.getCumulativeCost(plan2);
if (cost1 != null && cost2 != null) {
return cost1.isLt(cost2) ? plan1 : plan2;
} else if (cost1 != null) {
return plan1;
} else {
return plan2;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.rel.rules;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;

import org.immutables.value.Value;

import java.util.ArrayList;
import java.util.List;

/** Rule that re-orders a {@link Join} tree using dphyp algorithm.
*
* @see CoreRules#HYPER_GRAPH_OPTIMIZE */
@Value.Enclosing
public class DphypJoinReorderRule
extends RelRule<DphypJoinReorderRule.Config>
implements TransformationRule {

protected DphypJoinReorderRule(Config config) {
super(config);
}

@Override public void onMatch(RelOptRuleCall call) {
HyperGraph hyperGraph = call.rel(0);
RelBuilder relBuilder = call.builder();
// make all field name unique and convert the
// HyperEdge condition from RexInputRef to RexInputFieldName
hyperGraph.convertHyperEdgeCond();

// enumerate by Dphyp
DpHyp dpHyp = new DpHyp(hyperGraph, relBuilder, call.getMetadataQuery());
dpHyp.startEnumerateJoin();
RelNode orderedJoin = dpHyp.getBestPlan();
if (orderedJoin == null) {
return;
}

// permute field to origin order
List<String> oriNames = hyperGraph.getRowType().getFieldNames();
List<String> newNames = orderedJoin.getRowType().getFieldNames();
List<RexNode> projects = new ArrayList<>();
RexBuilder rexBuilder = hyperGraph.getCluster().getRexBuilder();
for (String oriName : oriNames) {
projects.add(rexBuilder.makeInputRef(orderedJoin, newNames.indexOf(oriName)));
}

RelNode result = call.builder()
.push(orderedJoin)
.project(projects)
.build();
call.transformTo(result);
}

/** Rule configuration. */
@Value.Immutable
public interface Config extends RelRule.Config {
Config DEFAULT = ImmutableDphypJoinReorderRule.Config.of()
.withOperandSupplier(b1 ->
b1.operand(HyperGraph.class).anyInputs());

@Override default DphypJoinReorderRule toRule() {
return new DphypJoinReorderRule(this);
}
}
}
82 changes: 82 additions & 0 deletions core/src/main/java/org/apache/calcite/rel/rules/HyperEdge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.rel.rules;

import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rex.RexNode;

/**
* Edge in HyperGraph, that represents a join predicate.
*/
public class HyperEdge {

private long leftNodeBits;

private long rightNodeBits;

private JoinRelType joinType;

private RexNode condition;

public HyperEdge(long leftNodeBits, long rightNodeBits, JoinRelType joinType, RexNode condition) {
this.leftNodeBits = leftNodeBits;
this.rightNodeBits = rightNodeBits;
this.joinType = joinType;
this.condition = condition;
}

public long getNodeBitmap() {
return leftNodeBits | rightNodeBits;
}

public long getLeftNodeBitmap() {
return leftNodeBits;
}

public long getRightNodeBitmap() {
return rightNodeBits;
}

// hyperedge (u, v) is simple if |u| = |v| = 1
public boolean isSimple() {
boolean leftSimple = (leftNodeBits & (leftNodeBits - 1)) == 0;
boolean rightSimple = (rightNodeBits & (rightNodeBits - 1)) == 0;
return leftSimple && rightSimple;
}

public JoinRelType getJoinType() {
return joinType;
}

public RexNode getCondition() {
return condition;
}

@Override public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(LongBitmap.printBitmap(leftNodeBits))
.append("——").append(joinType).append("——")
.append(LongBitmap.printBitmap(rightNodeBits));
return sb.toString();
}

// before starting dphyp, replace RexInputRef to RexInputFieldName
public void replaceCondition(RexNode fieldNameCond) {
this.condition = fieldNameCond;
}

}
Loading

0 comments on commit 6aa88b2

Please sign in to comment.