Skip to content

Commit a227495

Browse files
committed
[spark] Spark write supports 'clustering.columns'
1 parent dbc8042 commit a227495

File tree

10 files changed

+118
-76
lines changed

10 files changed

+118
-76
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@
5656
<td>Integer</td>
5757
<td>Bucket number for file store.<br />It should either be equal to -1 (dynamic bucket mode), -2 (postpone bucket mode), or it must be greater than 0 (fixed bucket mode).</td>
5858
</tr>
59+
<tr>
60+
<td><h5>bucket-append-ordered</h5></td>
61+
<td style="word-wrap: break-word;">true</td>
62+
<td>Boolean</td>
63+
<td>Whether to ignore the order of the buckets when reading data from an append-only table.</td>
64+
</tr>
5965
<tr>
6066
<td><h5>bucket-function.type</h5></td>
6167
<td style="word-wrap: break-word;">default</td>
@@ -68,12 +74,6 @@
6874
<td>String</td>
6975
<td>Specify the paimon distribution policy. Data is assigned to each bucket according to the hash value of bucket-key.<br />If you specify multiple fields, delimiter is ','.<br />If not specified, the primary key will be used; if there is no primary key, the full row will be used.</td>
7076
</tr>
71-
<tr>
72-
<td><h5>bucket-append-ordered</h5></td>
73-
<td style="word-wrap: break-word;">true</td>
74-
<td>Boolean</td>
75-
<td>Whether to ignore the order of the buckets when reading data from an append-only table.</td>
76-
</tr>
7777
<tr>
7878
<td><h5>cache-page-size</h5></td>
7979
<td style="word-wrap: break-word;">64 kb</td>
@@ -140,6 +140,18 @@
140140
<td>Duration</td>
141141
<td>The maximum time of completed changelog to retain.</td>
142142
</tr>
143+
<tr>
144+
<td><h5>clustering.columns</h5></td>
145+
<td style="word-wrap: break-word;">(none)</td>
146+
<td>String</td>
147+
<td>Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. This option will be effective only for append table without primary keys and batch execution mode.</td>
148+
</tr>
149+
<tr>
150+
<td><h5>clustering.strategy</h5></td>
151+
<td style="word-wrap: break-word;">"auto"</td>
152+
<td>String</td>
153+
<td>Specifies the comparison algorithm used for range partitioning, including 'zorder', 'hilbert', and 'order', corresponding to the z-order curve algorithm, hilbert curve algorithm, and basic type comparison algorithm, respectively. When not configured, it will automatically determine the algorithm based on the number of columns in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, and 'hilbert' for 5 or more columns.</td>
154+
</tr>
143155
<tr>
144156
<td><h5>commit.callback.#.param</h5></td>
145157
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/flink_connector_configuration.html

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -230,12 +230,6 @@
230230
<td>Duration</td>
231231
<td>If no records flow in a partition of a stream for that amount of time, then that partition is considered "idle" and will not hold back the progress of watermarks in downstream operators.</td>
232232
</tr>
233-
<tr>
234-
<td><h5>sink.clustering.by-columns</h5></td>
235-
<td style="word-wrap: break-word;">(none)</td>
236-
<td>String</td>
237-
<td>Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. This option will be effective only for bucket unaware table without primary keys and batch execution mode.</td>
238-
</tr>
239233
<tr>
240234
<td><h5>sink.clustering.sample-factor</h5></td>
241235
<td style="word-wrap: break-word;">100</td>
@@ -248,12 +242,6 @@
248242
<td>Boolean</td>
249243
<td>Indicates whether to further sort data belonged to each sink task after range partitioning.</td>
250244
</tr>
251-
<tr>
252-
<td><h5>sink.clustering.strategy</h5></td>
253-
<td style="word-wrap: break-word;">"auto"</td>
254-
<td>String</td>
255-
<td>Specifies the comparison algorithm used for range partitioning, including 'zorder', 'hilbert', and 'order', corresponding to the z-order curve algorithm, hilbert curve algorithm, and basic type comparison algorithm, respectively. When not configured, it will automatically determine the algorithm based on the number of columns in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, and 'hilbert' for 5 or more columns.</td>
256-
</tr>
257245
<tr>
258246
<td><h5>sink.committer-cpu</h5></td>
259247
<td style="word-wrap: break-word;">1.0</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@
5757
import java.util.stream.Collectors;
5858

5959
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
60+
import static org.apache.paimon.CoreOptions.OrderType.HILBERT;
61+
import static org.apache.paimon.CoreOptions.OrderType.ORDER;
62+
import static org.apache.paimon.CoreOptions.OrderType.ZORDER;
6063
import static org.apache.paimon.options.ConfigOptions.key;
6164
import static org.apache.paimon.options.MemorySize.VALUE_128_MB;
6265
import static org.apache.paimon.options.MemorySize.VALUE_256_MB;
@@ -1835,6 +1838,28 @@ public InlineElement getDescription() {
18351838
+ "starting from the snapshot after this one. If found, commit will be aborted. "
18361839
+ "If the value of this option is -1, committer will not check for its first commit.");
18371840

1841+
public static final ConfigOption<String> CLUSTERING_COLUMNS =
1842+
key("clustering.columns")
1843+
.stringType()
1844+
.noDefaultValue()
1845+
.withFallbackKeys("sink.clustering.by-columns")
1846+
.withDescription(
1847+
"Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. "
1848+
+ "If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. "
1849+
+ "This option will be effective only for append table without primary keys and batch execution mode.");
1850+
1851+
public static final ConfigOption<String> CLUSTERING_STRATEGY =
1852+
key("clustering.strategy")
1853+
.stringType()
1854+
.defaultValue("auto")
1855+
.withFallbackKeys("sink.clustering.strategy")
1856+
.withDescription(
1857+
"Specifies the comparison algorithm used for range partitioning, including 'zorder', 'hilbert', and 'order', "
1858+
+ "corresponding to the z-order curve algorithm, hilbert curve algorithm, and basic type comparison algorithm, "
1859+
+ "respectively. When not configured, it will automatically determine the algorithm based on the number of columns "
1860+
+ "in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, "
1861+
+ "and 'hilbert' for 5 or more columns.");
1862+
18381863
private final Options options;
18391864

18401865
public CoreOptions(Map<String, String> options) {
@@ -2803,6 +2828,35 @@ public Optional<Long> commitStrictModeLastSafeSnapshot() {
28032828
return options.getOptional(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT);
28042829
}
28052830

2831+
public List<String> clusteringColumns() {
2832+
return clusteringColumns(options.get(CLUSTERING_COLUMNS));
2833+
}
2834+
2835+
public OrderType clusteringStrategy(int columnSize) {
2836+
return clusteringStrategy(options.get(CLUSTERING_STRATEGY), columnSize);
2837+
}
2838+
2839+
public static List<String> clusteringColumns(String clusteringColumns) {
2840+
if (clusteringColumns == null || clusteringColumns.isEmpty()) {
2841+
return Collections.emptyList();
2842+
}
2843+
return Arrays.asList(clusteringColumns.split(","));
2844+
}
2845+
2846+
public static OrderType clusteringStrategy(String clusteringStrategy, int columnSize) {
2847+
if (clusteringStrategy.equals(CLUSTERING_STRATEGY.defaultValue())) {
2848+
if (columnSize == 1) {
2849+
return ORDER;
2850+
} else if (columnSize < 5) {
2851+
return ZORDER;
2852+
} else {
2853+
return HILBERT;
2854+
}
2855+
} else {
2856+
return OrderType.of(clusteringStrategy);
2857+
}
2858+
}
2859+
28062860
/** Specifies the merge engine for table with primary key. */
28072861
public enum MergeEngine implements DescribedEnum {
28082862
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -404,26 +404,6 @@ public class FlinkConnectorOptions {
404404
.withDescription(
405405
"Whether trigger partition mark done when recover from state.");
406406

407-
public static final ConfigOption<String> CLUSTERING_COLUMNS =
408-
key("sink.clustering.by-columns")
409-
.stringType()
410-
.noDefaultValue()
411-
.withDescription(
412-
"Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. "
413-
+ "If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. "
414-
+ "This option will be effective only for bucket unaware table without primary keys and batch execution mode.");
415-
416-
public static final ConfigOption<String> CLUSTERING_STRATEGY =
417-
key("sink.clustering.strategy")
418-
.stringType()
419-
.defaultValue("auto")
420-
.withDescription(
421-
"Specifies the comparison algorithm used for range partitioning, including 'zorder', 'hilbert', and 'order', "
422-
+ "corresponding to the z-order curve algorithm, hilbert curve algorithm, and basic type comparison algorithm, "
423-
+ "respectively. When not configured, it will automatically determine the algorithm based on the number of columns "
424-
+ "in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, "
425-
+ "and 'hilbert' for 5 or more columns.");
426-
427407
public static final ConfigOption<Boolean> CLUSTERING_SORT_IN_CLUSTER =
428408
key("sink.clustering.sort-in-cluster")
429409
.booleanType()

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.paimon.flink.sink;
2020

21-
import org.apache.paimon.CoreOptions.OrderType;
21+
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.CoreOptions.PartitionSinkStrategy;
2323
import org.apache.paimon.annotation.Public;
2424
import org.apache.paimon.data.InternalRow;
@@ -48,17 +48,13 @@
4848
import javax.annotation.Nullable;
4949

5050
import java.util.ArrayList;
51-
import java.util.Arrays;
5251
import java.util.HashMap;
5352
import java.util.HashSet;
5453
import java.util.List;
5554
import java.util.Map;
5655

57-
import static org.apache.paimon.CoreOptions.OrderType.HILBERT;
58-
import static org.apache.paimon.CoreOptions.OrderType.ORDER;
59-
import static org.apache.paimon.CoreOptions.OrderType.ZORDER;
56+
import static org.apache.paimon.CoreOptions.clusteringStrategy;
6057
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
61-
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
6258
import static org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
6359
import static org.apache.paimon.flink.sink.FlinkSink.isStreaming;
6460
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
@@ -146,7 +142,8 @@ public FlinkSinkBuilder clusteringIfPossible(
146142
int sampleFactor) {
147143
// The clustering will be skipped if the clustering columns are empty or the execution
148144
// mode is STREAMING or the table type is illegal.
149-
if (clusteringColumns == null || clusteringColumns.isEmpty()) {
145+
List<String> columns = CoreOptions.clusteringColumns(clusteringColumns);
146+
if (columns.isEmpty()) {
150147
return this;
151148
}
152149
checkState(input != null, "The input stream should be specified earlier.");
@@ -159,7 +156,6 @@ public FlinkSinkBuilder clusteringIfPossible(
159156
}
160157
// If the clustering is not skipped, check the clustering column names and sample
161158
// factor value.
162-
List<String> columns = Arrays.asList(clusteringColumns.split(","));
163159
List<String> fieldNames = table.schema().fieldNames();
164160
checkState(
165161
new HashSet<>(fieldNames).containsAll(new HashSet<>(columns)),
@@ -174,17 +170,7 @@ public FlinkSinkBuilder clusteringIfPossible(
174170
+ MIN_CLUSTERING_SAMPLE_FACTOR
175171
+ ".");
176172
TableSortInfo.Builder sortInfoBuilder = new TableSortInfo.Builder();
177-
if (clusteringStrategy.equals(CLUSTERING_STRATEGY.defaultValue())) {
178-
if (columns.size() == 1) {
179-
sortInfoBuilder.setSortStrategy(ORDER);
180-
} else if (columns.size() < 5) {
181-
sortInfoBuilder.setSortStrategy(ZORDER);
182-
} else {
183-
sortInfoBuilder.setSortStrategy(HILBERT);
184-
}
185-
} else {
186-
sortInfoBuilder.setSortStrategy(OrderType.of(clusteringStrategy));
187-
}
173+
sortInfoBuilder.setSortStrategy(clusteringStrategy(clusteringStrategy, columns.size()));
188174
int upstreamParallelism = input.getParallelism();
189175
String sinkParallelismValue =
190176
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@
4343
import java.util.Map;
4444

4545
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
46+
import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS;
47+
import static org.apache.paimon.CoreOptions.CLUSTERING_STRATEGY;
4648
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
4749
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
48-
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_COLUMNS;
4950
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
5051
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SORT_IN_CLUSTER;
51-
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
5252
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
5353

5454
/** Table sink to create sink. */

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import org.apache.spark.sql.types.StringType;
3838
import org.apache.spark.sql.types.TimestampType;
3939

40-
import java.io.IOException;
41-
import java.io.ObjectInputStream;
4240
import java.io.Serializable;
4341
import java.nio.ByteBuffer;
4442

@@ -75,14 +73,10 @@ public SparkZOrderUDF(int numCols, int varTypeSize, int maxOutputSize) {
7573
this.maxOutputSize = maxOutputSize;
7674
}
7775

78-
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
79-
in.defaultReadObject();
80-
inputBuffers = ThreadLocal.withInitial(() -> new ByteBuffer[numCols]);
81-
inputHolder = ThreadLocal.withInitial(() -> new byte[numCols][]);
82-
outputBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(totalOutputBytes));
83-
}
84-
8576
private ByteBuffer inputBuffer(int position, int size) {
77+
if (inputBuffers == null) {
78+
inputBuffers = ThreadLocal.withInitial(() -> new ByteBuffer[numCols]);
79+
}
8680
ByteBuffer buffer = inputBuffers.get()[position];
8781
if (buffer == null) {
8882
buffer = ByteBuffer.allocate(size);
@@ -92,6 +86,13 @@ private ByteBuffer inputBuffer(int position, int size) {
9286
}
9387

9488
byte[] interleaveBits(Seq<byte[]> scalaBinary) {
89+
if (inputHolder == null) {
90+
inputHolder = ThreadLocal.withInitial(() -> new byte[numCols][]);
91+
}
92+
if (outputBuffer == null) {
93+
outputBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(totalOutputBytes));
94+
}
95+
9596
byte[][] columnsBinary =
9697
JavaConverters.seqAsJavaList(scalaBinary).toArray(inputHolder.get());
9798
return ZOrderByteUtils.interleaveBits(columnsBinary, totalOutputBytes, outputBuffer.get());

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public ZorderSorter(FileStoreTable table, List<String> zOrderColNames) {
3838
checkNotEmpty();
3939
}
4040

41+
@Override
4142
public Dataset<Row> sort(Dataset<Row> df) {
4243
Column zColumn = zValue(df);
4344
Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zColumn);

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.paimon.manifest.FileKind
3131
import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils}
3232
import org.apache.paimon.spark.catalog.functions.BucketFunction
3333
import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, ROW_KIND_COL}
34+
import org.apache.paimon.spark.sort.TableSorter
3435
import org.apache.paimon.spark.util.OptionUtils.paimonExtensionEnabled
3536
import org.apache.paimon.spark.util.SparkRowUtils
3637
import org.apache.paimon.spark.write.WriteHelper
@@ -233,15 +234,20 @@ case class PaimonSparkWriter(table: FileStoreTable) extends WriteHelper {
233234
}
234235

235236
case BUCKET_UNAWARE | POSTPONE_MODE =>
236-
if (
237-
coreOptions.partitionSinkStrategy().equals(PartitionSinkStrategy.HASH) && !tableSchema
238-
.partitionKeys()
239-
.isEmpty
240-
) {
241-
writeWithoutBucket(data.repartition(partitionCols(data): _*))
242-
} else {
243-
writeWithoutBucket(data)
237+
var input = data
238+
if (tableSchema.partitionKeys().size() > 0) {
239+
coreOptions.partitionSinkStrategy match {
240+
case PartitionSinkStrategy.HASH =>
241+
input = data.repartition(partitionCols(data): _*)
242+
}
243+
}
244+
val clusteringColumns = coreOptions.clusteringColumns()
245+
if (!clusteringColumns.isEmpty) {
246+
val strategy = coreOptions.clusteringStrategy(tableSchema.fields().size())
247+
val sorter = TableSorter.getSorter(table, strategy, clusteringColumns)
248+
input = sorter.sort(data)
244249
}
250+
writeWithoutBucket(input)
245251

246252
case HASH_FIXED =>
247253
if (paimonExtensionEnabled && BucketFunction.supportsTable(table)) {

paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.junit.jupiter.api.Test;
3636
import org.junit.jupiter.api.TestInstance;
3737
import org.junit.jupiter.api.io.TempDir;
38+
import org.junit.jupiter.params.ParameterizedTest;
39+
import org.junit.jupiter.params.provider.ValueSource;
3840

3941
import java.io.IOException;
4042
import java.util.Arrays;
@@ -124,6 +126,18 @@ public void testWriteWithDefaultValue() {
124126
"[[1,2,my_value], [2,2,my_value], [3,2,my_value], [4,2,my_value], [5,3,my_value]]");
125127
}
126128

129+
@ParameterizedTest
130+
@ValueSource(strings = {"order", "zorder", "hilbert"})
131+
public void testWriteWithClustering(String clusterStrategy) {
132+
spark.sql(
133+
"CREATE TABLE T (a INT, b INT) TBLPROPERTIES ("
134+
+ "'clustering.columns'='a,b',"
135+
+ String.format("'clustering.strategy'='%s')", clusterStrategy));
136+
spark.sql("INSERT INTO T VALUES (2, 2), (1, 1), (3, 3)").collectAsList();
137+
List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
138+
assertThat(rows.toString()).isEqualTo("[[1,1], [2,2], [3,3]]");
139+
}
140+
127141
@Test
128142
public void testWrite() {
129143
spark.sql(

0 commit comments

Comments
 (0)