Skip to content

Spark Hive connector supports dynamic partition prunning #6943

Open
@jackyjfhu

Description

@jackyjfhu

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the bug

kyuuiby:1.9.0
Spark:3.4.2

Currently, when connecting to hivemetastore and Spark through kyuubi-hive-connector and testing performance with TPC-DS, it is found that some SQL cannot be dynamically partitioned.

q33.sql

with ss as (
 select
          i_manufact_id,sum(ss_ext_sales_price) total_sales
 from
    store_sales,
    date_dim,
         customer_address,
         item
 where
         i_manufact_id in (select
  i_manufact_id
from
 item
where i_category in ('Home'))
 and     ss_item_sk              = i_item_sk
 and     ss_sold_date_sk         = d_date_sk
 and     d_year                  = 1998
 and     d_moy                   = 5
 and     ss_addr_sk              = ca_address_sk
 and     ca_gmt_offset           = -6 
 group by i_manufact_id),
 cs as (
 select
          i_manufact_id,sum(cs_ext_sales_price) total_sales
 from
    catalog_sales,
    date_dim,
         customer_address,
         item
 where
         i_manufact_id               in (select
  i_manufact_id
from
 item
where i_category in ('Home'))
 and     cs_item_sk              = i_item_sk
 and     cs_sold_date_sk         = d_date_sk
 and     d_year                  = 1998
 and     d_moy                   = 5
 and     cs_bill_addr_sk         = ca_address_sk
 and     ca_gmt_offset           = -6 
 group by i_manufact_id),
 ws as (
 select
          i_manufact_id,sum(ws_ext_sales_price) total_sales
 from
    web_sales,
    date_dim,
         customer_address,
         item
 where
         i_manufact_id               in (select
  i_manufact_id
from
 item
where i_category in ('Home'))
 and     ws_item_sk              = i_item_sk
 and     ws_sold_date_sk         = d_date_sk
 and     d_year                  = 1998
 and     d_moy                   = 5
 and     ws_bill_addr_sk         = ca_address_sk
 and     ca_gmt_offset           = -6
 group by i_manufact_id)
  select  i_manufact_id ,sum(total_sales) total_sales
 from  (select * from ss 
        union all
        select * from cs 
        union all
        select * from ws) tmp1
 group by i_manufact_id
 order by total_sales
limit 100

If kyuubi-hive-connecotor is not used, the execution plan will have dynamic partition pruning

Image

If kyuubi-hive-connecotor is used,

there will be no dynamic partition pruning in the execution plan

Image

Affects Version(s)

1.9.0

Kyuubi Server Log Output

Kyuubi Engine Log Output

Kyuubi Server Configurations

Kyuubi Engine Configurations

spark.plugins com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin
spark.sql.catalog.cos8lcu4e_hive.hadoop.hive.metastore.token.signature thrift://xxx:7004,thrift://xxxx:7004
spark.sql.catalog.cos8lcu4e_iceberg.hadoop.hive.metastore.token.signature thrift://xxxx:7004,thrift://xxxx:7004
spark.sql.catalog.default_catalog_hive_0o29qw56.hadoop.hive.metastore.token.signature thrift://xxx:40016
spark.sql.catalog.default_catalog_iceberg_0o29qw56.hadoop.hive.metastore.token.signature thrift://xxx:40016
spark.sql.catalog.kyuubi_cos8lcu4e_hive org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
spark.sql.catalog.kyuubi_cos8lcu4e_hive.hadoop.hive.metastore.token.signature thrift://xxx:7004,thrift://xxx:7004
spark.sql.catalog.kyuubi_cos8lcu4e_hive.hive.metastore.kerberos.principal hadoop/_HOST@TBDS-0O29QW56
spark.sql.catalog.kyuubi_cos8lcu4e_hive.hive.metastore.uris thrift://xxx:7004,thrift://xxxx:7004
spark.sql.catalog.kyuubi_default_catalog_hive_0o29qw56 org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
spark.sql.catalog.kyuubi_default_catalog_hive_0o29qw56.hadoop.hive.metastore.token.signature thrift://xxx:40016
spark.sql.catalog.kyuubi_default_catalog_hive_0o29qw56.hive.metastore.kerberos.principal hadoop/xxxx@xxx
spark.sql.catalog.kyuubi_default_catalog_hive_0o29qw56.hive.metastore.uris thrift://xxx:xxx
spark.sql.catalog.origin_cos8lcu4e_iceberg org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.origin_cos8lcu4e_iceberg.hadoop.hive.metastore.token.signature thrift://xxxx:7004,thrift://xxx:7004
spark.sql.catalog.origin_cos8lcu4e_iceberg.type hive
spark.sql.catalog.origin_cos8lcu4e_iceberg.uri thrift://xxx:7004,thrift://xxx:7004
spark.sql.catalog.origin_default_catalog_iceberg_0o29qw56 org.apache.iceberg.spark.SparkCatalog

Additional context

Dynamic partition cutting core logic:

private def prune(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
.......
var filterableScan = getFilterableTableScan(l, left)
if (filterableScan.isDefined && canPruneLeft(joinType) &&
hasPartitionPruningFilter(right)) {
newLeft = insertPredicate(l, newLeft, r, right, rightKeys, filterableScan.get)
} else {
filterableScan = getFilterableTableScan(r, right)
if (filterableScan.isDefined && canPruneRight(joinType) &&
hasPartitionPruningFilter(left) ) {
newRight = insertPredicate(r, newRight, l, left, leftKeys, filterableScan.get)
}
}
case _ =>
}
.......
Join(newLeft, newRight, joinType, Some(condition), hint)
}
}

def getFilterableTableScan(a: Expression, plan: LogicalPlan): Option[LogicalPlan] = {
val srcInfo: Option[(Expression, LogicalPlan)] = findExpressionAndTrackLineageDown(a, plan)
srcInfo.flatMap {
case (resExp, l: LogicalRelation) =>
l.relation match {
case fs: HadoopFsRelation =>
val partitionColumns = AttributeSet(
l.resolve(fs.partitionSchema, fs.sparkSession.sessionState.analyzer.resolver))
if (resExp.references.subsetOf(partitionColumns)) {
return Some(l)
} else {
None
}
case _ => None
}
case (resExp, l: HiveTableRelation) =>
if (resExp.references.subsetOf(AttributeSet(l.partitionCols))) {
return Some(l)
} else {
None
}
case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _)) =>
val filterAttrs = V2ExpressionUtils.resolveRefs[Attribute](scan.filterAttributes, r)
if (resExp.references.subsetOf(AttributeSet(filterAttrs))) {
Some(r)
} else {
None
}
case _ => None
}
}

Image

Image

Image

Image

Are you willing to submit PR?

  • Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
  • No. I cannot submit a PR at this time.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions