Skip to content

Commit 2f5d361

Browse files
committed
[spark][hive] Spark & Hive support writing to postpone bucket
1 parent 9422995 commit 2f5d361

File tree

4 files changed

+72
-3
lines changed

4 files changed

+72
-3
lines changed

docs/content/primary-key-table/data-distribution.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ Postpone bucket mode is configured by `'bucket' = '-2'`.
9393
This mode aims to solve the difficulty to determine a fixed number of buckets
9494
and support different buckets for different partitions.
9595

96-
Currently, only Flink supports this mode.
97-
9896
When writing records into the table,
9997
all records will first be stored in the `bucket-postpone` directory of each partition
10098
and are not available to readers.

paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.hive;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.Snapshot;
2223
import org.apache.paimon.catalog.Identifier;
2324
import org.apache.paimon.data.BinaryString;
2425
import org.apache.paimon.data.Decimal;
@@ -28,8 +29,10 @@
2829
import org.apache.paimon.data.InternalRow;
2930
import org.apache.paimon.hive.mapred.PaimonOutputFormat;
3031
import org.apache.paimon.hive.objectinspector.PaimonObjectInspectorFactory;
32+
import org.apache.paimon.io.DataFileMeta;
3133
import org.apache.paimon.options.CatalogOptions;
3234
import org.apache.paimon.options.Options;
35+
import org.apache.paimon.table.FileStoreTable;
3336
import org.apache.paimon.table.Table;
3437
import org.apache.paimon.table.sink.StreamTableCommit;
3538
import org.apache.paimon.table.sink.StreamTableWrite;
@@ -1171,4 +1174,40 @@ public void testInsertMapOfRowType() throws Exception {
11711174
List<Object[]> expect = hiveShell.executeStatement("select * from " + tableName);
11721175
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
11731176
}
1177+
1178+
@Test
1179+
public void testInsertIntoPostponeBucket() throws Exception {
1180+
Options options = new Options();
1181+
options.set(CatalogOptions.WAREHOUSE, folder.newFolder().toURI().toString());
1182+
options.set(CoreOptions.BUCKET, -2);
1183+
options.set(CoreOptions.FILE_FORMAT, "parquet");
1184+
Identifier identifier = Identifier.create(DATABASE_NAME, "postpone_bucket");
1185+
FileStoreTable table =
1186+
(FileStoreTable)
1187+
FileStoreTestUtils.createFileStoreTable(
1188+
options,
1189+
RowType.of(DataTypes.INT(), DataTypes.INT(), DataTypes.INT()),
1190+
Collections.emptyList(),
1191+
Collections.singletonList("f0"),
1192+
identifier);
1193+
String tableName =
1194+
writeData(
1195+
table,
1196+
table.location().toString(),
1197+
Collections.singletonList(GenericRow.of(1, 2, 3)));
1198+
hiveShell.execute("insert into " + tableName + " values (1,2,3),(4,5,6)");
1199+
1200+
Snapshot snapshot = table.latestSnapshot().get();
1201+
DataFileMeta file =
1202+
table.manifestFileReader()
1203+
.read(
1204+
table.manifestListReader()
1205+
.read(snapshot.deltaManifestList())
1206+
.get(0)
1207+
.fileName())
1208+
.get(0)
1209+
.file();
1210+
// default format for postpone bucket is avro
1211+
assertThat(file.fileName()).endsWith(".avro");
1212+
}
11741213
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,9 @@ case class PaimonSparkWriter(table: FileStoreTable) {
233233
writeWithoutBucket(data)
234234

235235
case HASH_FIXED =>
236-
if (!paimonExtensionEnabled) {
236+
if (table.bucketSpec().getNumBuckets == -2) {
237+
writeWithoutBucket(data)
238+
} else if (!paimonExtensionEnabled) {
237239
// Topology: input -> bucket-assigner -> shuffle by partition & bucket
238240
writeWithBucketProcessor(
239241
withInitBucketCol,

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818

1919
package org.apache.paimon.spark.sql
2020

21+
import org.apache.paimon.Snapshot
22+
import org.apache.paimon.io.DataFileMeta
2123
import org.apache.paimon.spark.PaimonSparkTestBase
2224

2325
import org.apache.spark.SparkConf
2426
import org.apache.spark.sql.Row
27+
import org.assertj.core.api.Assertions.assertThat
2528
import org.junit.jupiter.api.Assertions
2629

2730
import java.sql.Timestamp
@@ -38,6 +41,33 @@ class SparkWriteWithNoExtensionITCase extends SparkWriteITCase {
3841

3942
class SparkWriteITCase extends PaimonSparkTestBase {
4043

44+
test("Paimon Write : Postpone Bucket") {
45+
withTable("PostponeTable") {
46+
spark.sql("""
47+
|CREATE TABLE PostponeTable (
48+
| id INT,
49+
| v1 INT,
50+
| v2 INT
51+
|) TBLPROPERTIES (
52+
| 'bucket' = '-2',
53+
| 'primary-key' = 'id',
54+
| 'file.format' = 'parquet'
55+
|)
56+
|""".stripMargin)
57+
58+
spark.sql("INSERT INTO PostponeTable VALUES (1, 1, 1)")
59+
60+
val table = loadTable("PostponeTable")
61+
val snapshot = table.latestSnapshot.get
62+
val file = table.manifestFileReader
63+
.read(table.manifestListReader.read(snapshot.deltaManifestList).get(0).fileName)
64+
.get(0)
65+
.file
66+
// default format for postpone bucket is avro
67+
assertThat(file.fileName).endsWith(".avro")
68+
}
69+
}
70+
4171
test("Paimon Write: AllTypes") {
4272
withTable("AllTypesTable") {
4373
val createTableSQL =

0 commit comments

Comments
 (0)