Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package com.microsoft.hyperspace.index.rules

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
import com.microsoft.hyperspace.index.rules.PlanUtils._
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}
Expand Down Expand Up @@ -53,19 +54,16 @@ object FilterIndexRule
case ExtractFilterNode(originalPlan, filter, outputColumns, filterColumns) =>
try {
val candidateIndexes =
findCoveringIndexes(filter, outputColumns, filterColumns)
findCoveringIndexes(filter, outputColumns, filterColumns, plan)
FilterIndexRanker.rank(spark, filter, candidateIndexes) match {
case Some(index) =>
// As FilterIndexRule is not intended to support bucketed scan, we set
// useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause
// unnecessary shuffle for appended data to apply BucketUnion for merging data.
val transformedPlan =
RuleUtils.transformPlanToUseIndex(
spark,
index,
originalPlan,
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
useBucketUnionForAppended = false)
val ruleHelper = IndexPlanApplyHelper(index)
val transformedPlan = ruleHelper.transformPlanToUseIndex(
index,
originalPlan,
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
useBucketUnionForAppended = false)

logEvent(
HyperspaceIndexUsageEvent(
AppInfo(
Expand Down Expand Up @@ -99,8 +97,9 @@ object FilterIndexRule
private def findCoveringIndexes(
filter: Filter,
outputColumns: Seq[String],
filterColumns: Seq[String]): Seq[IndexLogEntry] = {
RuleUtils.getRelation(spark, filter) match {
filterColumns: Seq[String],
plan: LogicalPlan): Seq[IndexLogEntry] = {
RuleUtils.getRelation(filter) match {
case Some(r) =>
val indexManager = Hyperspace
.getContext(spark)
Expand All @@ -111,20 +110,39 @@ object FilterIndexRule
// See https://github.com/microsoft/hyperspace/issues/65
val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE))

val resolvedOutputColumns = ResolverUtils.resolve(
spark,
outputColumns,
plan,
ResolverUtils.resolveWithChildren,
throwIfNotInSchema = false)
.map(_.map(_.normalizedName))
.getOrElse(outputColumns)
val resolvedFilterColumns = ResolverUtils.resolve(
spark,
filterColumns,
plan,
ResolverUtils.resolveWithChildren,
throwIfNotInSchema = false)
.map(_.map(_.normalizedName))
.getOrElse(filterColumns)

val candidateIndexes = allIndexes.filter { index =>
indexCoversPlan(
outputColumns,
filterColumns,
resolvedOutputColumns,
resolvedFilterColumns,
index.indexedColumns,
index.includedColumns)
}

// Get candidate via file-level metadata validation. This is performed after pruning
// by column schema, as this might be expensive when there are numerous files in the
// relation or many indexes to be checked.
RuleUtils.getCandidateIndexes(spark, candidateIndexes, r)
new BaseRuleHelper(spark).getCandidateIndexes(candidateIndexes, r)

case None => Nil // There is zero or more than one supported relations in Filter's sub-plan.
case _ =>
// There is zero or more than one supported relations in Filter's sub-plan.
Seq.empty
}
}

Expand All @@ -136,7 +154,6 @@ object FilterIndexRule
* @param filterColumns List of columns in filter predicate.
* @param indexedColumns List of indexed columns (e.g. from an index being checked)
* @param includedColumns List of included columns (e.g. from an index being checked)
* @param fileFormat FileFormat for input relation in original logical plan.
* @return 'true' if
* 1. Index fully covers output and filter columns, and
* 2. Filter predicate contains first column in index's 'indexed' columns.
Expand Down Expand Up @@ -168,15 +185,15 @@ object ExtractFilterNode {
val projectColumnNames = CleanupAliases(project)
.asInstanceOf[Project]
.projectList
.map(_.references.map(_.asInstanceOf[AttributeReference].name))
.map(i => extractNamesFromExpression(i).toKeep)
.flatMap(_.toSeq)
val filterColumnNames = condition.references.map(_.name).toSeq
val filterColumnNames = extractNamesFromExpression(condition).toKeep.toSeq

Some(project, filter, projectColumnNames, filterColumnNames)

case filter @ Filter(condition: Expression, ExtractRelation(relation))
if !RuleUtils.isIndexApplied(relation) =>
val relationColumnsName = relation.output.map(_.name)
val relationColumnsName = relation.plan.output.map(_.name)
val filterColumnNames = condition.references.map(_.name).toSeq

Some(filter, filter, relationColumnsName, filterColumnNames)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.rules

import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rules.FilterIndexRule.spark
import com.microsoft.hyperspace.util.ResolverUtils

object IndexPlanApplyHelper {
def apply(indexLogEntry: IndexLogEntry): BaseRuleHelper = {
// Detect whether the index contains nested fields.
val indexHasNestedColumns = (indexLogEntry.indexedColumns ++ indexLogEntry.includedColumns)
.exists(ResolverUtils.ResolvedColumn(_).isNested)
if (indexHasNestedColumns) {
new NestedRuleHelper(spark)
} else {
new BaseRuleHelper(spark)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,16 @@ object JoinIndexRule
getBestIndexPair(l, r, condition)
.map {
case (lIndex, rIndex) =>
val ruleHelper = new BaseRuleHelper(spark)
Copy link
Contributor Author

@andrei-ionescu andrei-ionescu Apr 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this file are mainly due to RuleUtils changes.

val updatedPlan =
join
.copy(
left = RuleUtils.transformPlanToUseIndex(
spark,
left = ruleHelper.transformPlanToUseIndex(
lIndex,
l,
useBucketSpec = true,
useBucketUnionForAppended = true),
right = RuleUtils.transformPlanToUseIndex(
spark,
right = ruleHelper.transformPlanToUseIndex(
rIndex,
r,
useBucketSpec = true,
Expand Down Expand Up @@ -109,11 +108,11 @@ object JoinIndexRule
private def isApplicable(l: LogicalPlan, r: LogicalPlan, condition: Expression): Boolean = {
// The given plan is eligible if it is supported and index has not been applied.
def isEligible(optRel: Option[FileBasedRelation]): Boolean = {
optRel.map(!RuleUtils.isIndexApplied(_)).getOrElse(false)
optRel.exists(!RuleUtils.isIndexApplied(_))
}

lazy val optLeftRel = RuleUtils.getRelation(spark, l)
lazy val optRightRel = RuleUtils.getRelation(spark, r)
lazy val optLeftRel = RuleUtils.getRelation(l)
lazy val optRightRel = RuleUtils.getRelation(r)

isJoinConditionSupported(condition) &&
isPlanLinear(l) && isPlanLinear(r) &&
Expand Down Expand Up @@ -284,7 +283,8 @@ object JoinIndexRule
left: LogicalPlan,
right: LogicalPlan,
joinCondition: Expression): Option[(IndexLogEntry, IndexLogEntry)] = {
val indexManager = Hyperspace.getContext(spark).indexCollectionManager
val hyperspaceContext = Hyperspace.getContext(spark)
val indexManager = hyperspaceContext.indexCollectionManager

// TODO: the following check only considers indexes in ACTIVE state for usage. Update
// the code to support indexes in transitioning states as well.
Expand All @@ -294,10 +294,10 @@ object JoinIndexRule
// TODO: we can write an extractor that applies `isApplicable` so that we don't have to
// get relations twice. Note that `getRelation` should always succeed since this has
// been already checked in `isApplicable`.
val leftRelation = RuleUtils.getRelation(spark, left).get
val rightRelation = RuleUtils.getRelation(spark, right).get
val lBaseAttrs = leftRelation.output.map(_.name)
val rBaseAttrs = rightRelation.output.map(_.name)
val leftRelation = RuleUtils.getRelation(left).get
val rightRelation = RuleUtils.getRelation(right).get
val lBaseAttrs = leftRelation.plan.output.map(_.name)
val rBaseAttrs = rightRelation.plan.output.map(_.name)

// Map of left resolved columns with their corresponding right resolved
// columns from condition.
Expand All @@ -316,14 +316,15 @@ object JoinIndexRule
val lUsable = getUsableIndexes(allIndexes, lRequiredIndexedCols, lRequiredAllCols)
val rUsable = getUsableIndexes(allIndexes, rRequiredIndexedCols, rRequiredAllCols)

val leftRel = RuleUtils.getRelation(spark, left).get
val rightRel = RuleUtils.getRelation(spark, right).get
val leftRel = RuleUtils.getRelation(left).get
val rightRel = RuleUtils.getRelation(right).get

// Get candidate via file-level metadata validation. This is performed after pruning
// by column schema, as this might be expensive when there are numerous files in the
// relation or many indexes to be checked.
val lIndexes = RuleUtils.getCandidateIndexes(spark, lUsable, leftRel)
val rIndexes = RuleUtils.getCandidateIndexes(spark, rUsable, rightRel)
val ruleHelper = new BaseRuleHelper(spark)
val lIndexes = ruleHelper.getCandidateIndexes(lUsable, leftRel)
val rIndexes = ruleHelper.getCandidateIndexes(rUsable, rightRel)

val compatibleIndexPairs = getCompatibleIndexPairs(lIndexes, rIndexes, lRMap)

Expand Down
Loading