Skip to content

Commit 8661097

Browse files
committed
spark overwrite功能:支持带schema的表、分区子表、包含特殊字符的表名;spark支持repartition
1 parent 9bc5f90 commit 8661097

21 files changed

Lines changed: 653 additions & 119 deletions

File tree

hologres-connector-spark-2.x/README.md

Lines changed: 38 additions & 37 deletions
Large diffs are not rendered by default.

hologres-connector-spark-2.x/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<parent>
99
<groupId>com.alibaba.hologres</groupId>
1010
<artifactId>hologres-connector-parent</artifactId>
11-
<version>1.4.0-SNAPSHOT</version>
11+
<version>1.4.1-SNAPSHOT</version>
1212
</parent>
1313

1414
<artifactId>hologres-connector-spark-2.x</artifactId>

hologres-connector-spark-2.x/src/main/scala/com/alibaba/hologres/spark2/sink/HoloWriter.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,21 @@ class HoloWriter(
2626
logger.info("HoloWriter begin: " + LocalDateTime.now())
2727
val hologresConfigs: HologresConfigs = new HologresConfigs(holoOptions)
2828
var is_overwrite: Boolean = mode == SaveMode.Overwrite
29+
private var partitionInfo: (String, String) = _
30+
2931
if (is_overwrite) {
30-
hologresConfigs.tempTableForOverwrite = hologresConfigs.table + new Random().nextInt(Int.MaxValue) + "_temp"
32+
hologresConfigs.tempTableForOverwrite = JDBCUtil.generateTempTableNameForOverwrite(hologresConfigs)
3133
JDBCUtil.createTempTableForOverWrite(hologresConfigs)
3234
}
3335

3436
override def commit(messages: Array[WriterCommitMessage]): Unit = {
3537
logger.info("HoloWriter commit: " + LocalDateTime.now())
3638
if (is_overwrite) {
37-
JDBCUtil.renameTempTableForOverWrite(hologresConfigs)
39+
if (partitionInfo.eq(null)) {
40+
JDBCUtil.renameTempTableForOverWrite(hologresConfigs)
41+
} else {
42+
JDBCUtil.renameTempTableForOverWrite(hologresConfigs, partitionInfo._1, partitionInfo._2)
43+
}
3844
}
3945
}
4046

@@ -56,12 +62,15 @@ class HoloWriter(
5662
override def createWriterFactory(): DataWriterFactory[InternalRow] = {
5763
val holoClient: HoloClient = new HoloClient(hologresConfigs.holoConfig)
5864
try {
59-
var holoSchema: TableSchema = null
65+
var holoSchema = holoClient.getTableSchema(TableName.valueOf(hologresConfigs.table))
66+
if (holoSchema.isPartitionParentTable && is_overwrite) {
67+
throw new IOException("Partition parent table can not be insert overwrite now.")
68+
}
69+
partitionInfo = JDBCUtil.getChildTablePartitionInfo(hologresConfigs)
70+
6071
if (is_overwrite) {
6172
// insert overwrite 会先写在一张临时表中,写入成功时替换原表。
6273
holoSchema = holoClient.getTableSchema(TableName.valueOf(hologresConfigs.tempTableForOverwrite))
63-
} else {
64-
holoSchema = holoClient.getTableSchema(TableName.valueOf(hologresConfigs.table))
6574
}
6675

6776
var holoVersion: HoloVersion = null

hologres-connector-spark-2.x/src/test/scala/com/alibaba/hologres/spark2/SparkHoloReadWriteSuite.scala

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,100 @@ class SparkHoloReadWriteSuite extends SparkHoloSuiteBase {
234234
}
235235

236236

237+
test("SaveMode = overwrite child table with schema") {
238+
val parentTable = "test.\"Table-Parent\""
239+
val partitionValue = "20240527"
240+
val table = "test.\"Table-Child_20240527\""
241+
242+
testUtils.dropTable(parentTable)
243+
testUtils.createSchema("test")
244+
testUtils.createPartitionTable(defaultCreateHoloParentTableDDL, parentTable, table, partitionValue)
245+
246+
val byteA = Array(4.toByte, 5.toByte, 6.toByte, 'q'.toByte, 'e'.toByte)
247+
val intA = Array(4, 5, 6)
248+
val doubleA = Array(2.333, 3.444, 4.555)
249+
val date = Date.valueOf("2024-05-27")
250+
251+
val data1 = Seq(
252+
Row(0L, -7L, 20, "phone1", 6.7F, Timestamp.valueOf("2021-03-29 00:00:00"), byteA, intA, doubleA, date),
253+
Row(1L, 6L, -30, "phone2", 7.8F, Timestamp.valueOf("2021-04-01 12:00:00"), byteA, intA, doubleA, date)
254+
)
255+
256+
val data2 = Seq(
257+
Row(0L, -7L, 20, "phone1", 6.7F, Timestamp.valueOf("2021-03-29 00:00:00"), byteA, intA, doubleA, date),
258+
Row(1L, -7L, 20, "phone1", 6.7F, Timestamp.valueOf("2021-03-29 00:00:00"), byteA, intA, doubleA, date),
259+
Row(2L, 6L, -30, "phone2", 7.8F, Timestamp.valueOf("2021-04-01 12:00:00"), byteA, intA, doubleA, date),
260+
Row(3L, 6L, -30, "phone2", 7.8F, Timestamp.valueOf("2021-04-01 12:00:00"), byteA, intA, doubleA, date)
261+
)
262+
263+
val newSchema = StructType(Array(
264+
StructField("pk", LongType),
265+
StructField("id", LongType),
266+
StructField("count", IntegerType),
267+
StructField("name", StringType),
268+
StructField("thick", FloatType),
269+
StructField("time", TimestampType),
270+
StructField("by", BinaryType),
271+
StructField("inta", ArrayType(IntegerType)),
272+
StructField("doublea", ArrayType(DoubleType)),
273+
StructField("dt", DateType)
274+
))
275+
276+
var df = spark.createDataFrame(
277+
spark.sparkContext.parallelize(data1),
278+
newSchema
279+
).orderBy("pk").cache()
280+
281+
df.write
282+
.format("hologres")
283+
.option(SourceProvider.USERNAME, testUtils.username)
284+
.option(SourceProvider.PASSWORD, testUtils.password)
285+
.option(SourceProvider.JDBCURL, testUtils.jdbcUrl)
286+
.option(SourceProvider.TABLE, table)
287+
.option(SourceProvider.WRITE_MODE, "insertOrUpdate")
288+
.option(SourceProvider.COPY_WRITE_MODE, "true")
289+
.option(SourceProvider.ENABLE_TARGET_SHARDS, "true")
290+
.option(SourceProvider.COPY_WRITE_DIRTY_DATA_CHECK, "true")
291+
.mode(SaveMode.Overwrite)
292+
.save()
293+
294+
df = spark.createDataFrame(
295+
spark.sparkContext.parallelize(data2),
296+
newSchema
297+
).orderBy("pk").cache()
298+
299+
df.write
300+
.format("hologres")
301+
.option(SourceProvider.USERNAME, testUtils.username)
302+
.option(SourceProvider.PASSWORD, testUtils.password)
303+
.option(SourceProvider.JDBCURL, testUtils.jdbcUrl)
304+
.option(SourceProvider.TABLE, table)
305+
.option(SourceProvider.WRITE_MODE, "insertOrUpdate")
306+
.option(SourceProvider.COPY_WRITE_MODE, "true")
307+
.option(SourceProvider.BULK_LOAD, "true")
308+
.option(SourceProvider.ENABLE_TARGET_SHARDS, "true")
309+
.option(SourceProvider.COPY_WRITE_DIRTY_DATA_CHECK, "true")
310+
.mode(SaveMode.Overwrite)
311+
.save()
312+
313+
val readDf = spark.read
314+
.format("hologres")
315+
.schema(newSchema) // 指定读取哪些字段
316+
.option(SourceProvider.USERNAME, testUtils.username)
317+
.option(SourceProvider.PASSWORD, testUtils.password)
318+
.option(SourceProvider.JDBCURL, testUtils.jdbcUrl)
319+
.option(SourceProvider.TABLE, table)
320+
.load().orderBy("pk").cache()
321+
322+
assert(df.count() == 4)
323+
// compare read and write
324+
if (df.except(readDf).count() > 0) {
325+
df.show()
326+
readDf.show()
327+
throw new Exception("The data read is inconsistent with the data written!!!")
328+
}
329+
}
330+
237331
test("write or read not exists columns.") {
238332
val table = "table_for_holo_test_1"
239333
val data = Seq(

hologres-connector-spark-2.x/src/test/scala/com/alibaba/hologres/spark2/SparkHoloSuiteBase.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,32 @@ abstract class SparkHoloSuiteBase extends QueryTest with SharedSparkSession {
7878
" json_column json," +
7979
" jsonb_column jsonb," +
8080
" rb_column roaringbitmap);"
81+
82+
val defaultCreateHoloParentTableDDL =
83+
"create table PARENT_TABLE_NAME (" +
84+
" pk bigint," +
85+
" st smallint," +
86+
" id bigint," +
87+
" count int," +
88+
" name text," +
89+
" price numeric(38, 12)," +
90+
" out_of_stock bool," +
91+
" weight double precision," +
92+
" thick float4," +
93+
" time timestamptz," +
94+
" dt date," +
95+
" by bytea," +
96+
" inta int4[]," +
97+
" longa int8[]," +
98+
" floata float4[]," +
99+
" doublea float8[]," +
100+
" boola boolean[]," +
101+
" stringa text[]," +
102+
" json_column json," +
103+
" jsonb_column jsonb," +
104+
" rb_column roaringbitmap" +
105+
// " primary key(pk, dt)" +
106+
") PARTITION BY LIST(dt);\n" +
107+
"CREATE TABLE TABLE_NAME PARTITION OF PARENT_TABLE_NAME FOR VALUES IN ('PARTITION_VALUE');"
108+
81109
}

0 commit comments

Comments
 (0)