Skip to content

Commit b720bd8

Browse files
committed
DynamicPartitionDataSingleWriter needs sort before write
1 parent 80a8c50 commit b720bd8

2 files changed

Lines changed: 62 additions & 2 deletions

File tree

  • extensions/spark/kyuubi-spark-connector-hive/src

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ import org.apache.spark.sql.SparkSession
3232
import org.apache.spark.sql.catalyst.InternalRow
3333
import org.apache.spark.sql.catalyst.catalog.CatalogTable
3434
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
35-
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, Write}
35+
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
36+
import org.apache.spark.sql.connector.expressions.{Expressions, SortDirection, SortOrder}
37+
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, RequiresDistributionAndOrdering}
3638
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, WriteJobDescription}
3739
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
3840
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -50,7 +52,8 @@ case class HiveWrite(
5052
info: LogicalWriteInfo,
5153
hiveTableCatalog: HiveTableCatalog,
5254
forceOverwrite: Boolean,
53-
dynamicPartition: Map[String, Option[String]]) extends Write with Logging {
55+
dynamicPartition: Map[String, Option[String]])
56+
extends RequiresDistributionAndOrdering with Logging {
5457

5558
private val options = info.options()
5659

@@ -73,6 +76,14 @@ case class HiveWrite(
7376

7477
override def description(): String = "Kyuubi-Hive-Connector"
7578

79+
override def requiredDistribution(): Distribution = Distributions.unspecified()
80+
81+
override def requiredOrdering(): Array[SortOrder] = {
82+
partColumns.map { col =>
83+
Expressions.sort(Expressions.column(col.name), SortDirection.ASCENDING)
84+
}.toArray
85+
}
86+
7687
override def toBatch: BatchWrite = {
7788
val tmpLocation = HiveWriteHelper.getExternalTmpPath(externalCatalog, hadoopConf, tableLocation)
7889

extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,55 @@ class HiveQuerySuite extends KyuubiHiveTest {
117117
}
118118
}
119119

120+
test("[KYUUBI #7335] DynamicPartitionDataSingleWriter needs sort before write") {
121+
withSparkSession(Map(
122+
"hive.exec.dynamic.partition.mode" -> "nonstrict",
123+
"spark.sql.shuffle.partitions" -> "1")) { spark =>
124+
val table = "hive.default.test_part_table"
125+
val tempTable = "hive.default.test_part_table_tmp"
126+
withTable(table, tempTable) {
127+
spark.sql(
128+
s"""
129+
| CREATE TABLE $table (
130+
| word STRING,
131+
| num BIGINT
132+
| ) PARTITIONED BY (dt STRING)
133+
| STORED AS ORC
134+
|""".stripMargin)
135+
spark.sql(
136+
s"""
137+
| CREATE TABLE $tempTable (
138+
| word STRING,
139+
| num BIGINT,
140+
| dt STRING
141+
| ) STORED AS ORC
142+
|""".stripMargin)
143+
spark.sql(
144+
s"""
145+
| INSERT INTO $tempTable VALUES
146+
| ('1', 1, '1111'),
147+
| ('2', 2, '2222'),
148+
| ('3', 4, '1111')
149+
|""".stripMargin)
150+
151+
spark.sql(
152+
s"""
153+
| INSERT OVERWRITE TABLE $table PARTITION (dt)
154+
| SELECT word, num, dt
155+
| FROM $tempTable
156+
| ORDER BY word
157+
|""".stripMargin).collect()
158+
159+
checkAnswer(
160+
spark.sql(s"SELECT * FROM $table"),
161+
Seq(
162+
Row("1", 1L, "1111"),
163+
Row("2", 2L, "2222"),
164+
Row("3", 4L, "1111")))
165+
}
166+
}
167+
}
168+
120169
test("[KYUUBI #4525] Partitioning predicates should take effect to filter data") {
121170
withSparkSession(Map("hive.exec.dynamic.partition.mode" -> "nonstrict")) { spark =>
122171
val table = "hive.default.employee"

0 commit comments

Comments
 (0)