Skip to content

Commit 5d7d7ca

Browse files
committed
fix
1 parent dbc8042 commit 5d7d7ca

File tree

8 files changed

+93
-44
lines changed

8 files changed

+93
-44
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@
5757
import java.util.stream.Collectors;
5858

5959
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
60+
import static org.apache.paimon.CoreOptions.OrderType.HILBERT;
61+
import static org.apache.paimon.CoreOptions.OrderType.ORDER;
62+
import static org.apache.paimon.CoreOptions.OrderType.ZORDER;
6063
import static org.apache.paimon.options.ConfigOptions.key;
6164
import static org.apache.paimon.options.MemorySize.VALUE_128_MB;
6265
import static org.apache.paimon.options.MemorySize.VALUE_256_MB;
@@ -1835,6 +1838,28 @@ public InlineElement getDescription() {
18351838
+ "starting from the snapshot after this one. If found, commit will be aborted. "
18361839
+ "If the value of this option is -1, committer will not check for its first commit.");
18371840

1841+
public static final ConfigOption<String> CLUSTERING_COLUMNS =
1842+
key("clustering.columns")
1843+
.stringType()
1844+
.noDefaultValue()
1845+
.withFallbackKeys("sink.clustering.by-columns")
1846+
.withDescription(
1847+
"Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. "
1848+
+ "If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. "
1849+
+ "This option will be effective only for append table without primary keys and batch execution mode.");
1850+
1851+
public static final ConfigOption<String> CLUSTERING_STRATEGY =
1852+
key("clustering.strategy")
1853+
.stringType()
1854+
.defaultValue("auto")
1855+
.withFallbackKeys("sink.clustering.strategy")
1856+
.withDescription(
1857+
"Specifies the comparison algorithm used for range partitioning, including 'zorder', 'hilbert', and 'order', "
1858+
+ "corresponding to the z-order curve algorithm, hilbert curve algorithm, and basic type comparison algorithm, "
1859+
+ "respectively. When not configured, it will automatically determine the algorithm based on the number of columns "
1860+
+ "in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, "
1861+
+ "and 'hilbert' for 5 or more columns.");
1862+
18381863
private final Options options;
18391864

18401865
public CoreOptions(Map<String, String> options) {
@@ -2803,6 +2828,35 @@ public Optional<Long> commitStrictModeLastSafeSnapshot() {
28032828
return options.getOptional(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT);
28042829
}
28052830

2831+
public List<String> clusteringColumns() {
2832+
return clusteringColumns(options.get(CLUSTERING_COLUMNS));
2833+
}
2834+
2835+
public OrderType clusteringStrategy(int columnSize) {
2836+
return clusteringStrategy(options.get(CLUSTERING_STRATEGY), columnSize);
2837+
}
2838+
2839+
public static List<String> clusteringColumns(String clusteringColumns) {
2840+
if (clusteringColumns == null || clusteringColumns.isEmpty()) {
2841+
return Collections.emptyList();
2842+
}
2843+
return Arrays.asList(clusteringColumns.split(","));
2844+
}
2845+
2846+
public static OrderType clusteringStrategy(String clusteringStrategy, int columnSize) {
2847+
if (clusteringStrategy.equals(CLUSTERING_STRATEGY.defaultValue())) {
2848+
if (columnSize == 1) {
2849+
return ORDER;
2850+
} else if (columnSize < 5) {
2851+
return ZORDER;
2852+
} else {
2853+
return HILBERT;
2854+
}
2855+
} else {
2856+
return OrderType.of(clusteringStrategy);
2857+
}
2858+
}
2859+
28062860
/** Specifies the merge engine for table with primary key. */
28072861
public enum MergeEngine implements DescribedEnum {
28082862
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -404,26 +404,6 @@ public class FlinkConnectorOptions {
404404
.withDescription(
405405
"Whether trigger partition mark done when recover from state.");
406406

407-
public static final ConfigOption<String> CLUSTERING_COLUMNS =
408-
key("sink.clustering.by-columns")
409-
.stringType()
410-
.noDefaultValue()
411-
.withDescription(
412-
"Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. "
413-
+ "If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. "
414-
+ "This option will be effective only for bucket unaware table without primary keys and batch execution mode.");
415-
416-
public static final ConfigOption<String> CLUSTERING_STRATEGY =
417-
key("sink.clustering.strategy")
418-
.stringType()
419-
.defaultValue("auto")
420-
.withDescription(
421-
"Specifies the comparison algorithm used for range partitioning, including 'zorder', 'hilbert', and 'order', "
422-
+ "corresponding to the z-order curve algorithm, hilbert curve algorithm, and basic type comparison algorithm, "
423-
+ "respectively. When not configured, it will automatically determine the algorithm based on the number of columns "
424-
+ "in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, "
425-
+ "and 'hilbert' for 5 or more columns.");
426-
427407
public static final ConfigOption<Boolean> CLUSTERING_SORT_IN_CLUSTER =
428408
key("sink.clustering.sort-in-cluster")
429409
.booleanType()

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.flink.sink;
2020

21+
import org.apache.paimon.CoreOptions;
2122
import org.apache.paimon.CoreOptions.OrderType;
2223
import org.apache.paimon.CoreOptions.PartitionSinkStrategy;
2324
import org.apache.paimon.annotation.Public;
@@ -54,11 +55,12 @@
5455
import java.util.List;
5556
import java.util.Map;
5657

58+
import static org.apache.paimon.CoreOptions.CLUSTERING_STRATEGY;
5759
import static org.apache.paimon.CoreOptions.OrderType.HILBERT;
5860
import static org.apache.paimon.CoreOptions.OrderType.ORDER;
5961
import static org.apache.paimon.CoreOptions.OrderType.ZORDER;
62+
import static org.apache.paimon.CoreOptions.clusteringStrategy;
6063
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
61-
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
6264
import static org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
6365
import static org.apache.paimon.flink.sink.FlinkSink.isStreaming;
6466
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
@@ -146,7 +148,8 @@ public FlinkSinkBuilder clusteringIfPossible(
146148
int sampleFactor) {
147149
// The clustering will be skipped if the clustering columns are empty or the execution
148150
// mode is STREAMING or the table type is illegal.
149-
if (clusteringColumns == null || clusteringColumns.isEmpty()) {
151+
List<String> columns = CoreOptions.clusteringColumns(clusteringColumns);
152+
if (columns.isEmpty()) {
150153
return this;
151154
}
152155
checkState(input != null, "The input stream should be specified earlier.");
@@ -159,7 +162,6 @@ public FlinkSinkBuilder clusteringIfPossible(
159162
}
160163
// If the clustering is not skipped, check the clustering column names and sample
161164
// factor value.
162-
List<String> columns = Arrays.asList(clusteringColumns.split(","));
163165
List<String> fieldNames = table.schema().fieldNames();
164166
checkState(
165167
new HashSet<>(fieldNames).containsAll(new HashSet<>(columns)),
@@ -174,17 +176,7 @@ public FlinkSinkBuilder clusteringIfPossible(
174176
+ MIN_CLUSTERING_SAMPLE_FACTOR
175177
+ ".");
176178
TableSortInfo.Builder sortInfoBuilder = new TableSortInfo.Builder();
177-
if (clusteringStrategy.equals(CLUSTERING_STRATEGY.defaultValue())) {
178-
if (columns.size() == 1) {
179-
sortInfoBuilder.setSortStrategy(ORDER);
180-
} else if (columns.size() < 5) {
181-
sortInfoBuilder.setSortStrategy(ZORDER);
182-
} else {
183-
sortInfoBuilder.setSortStrategy(HILBERT);
184-
}
185-
} else {
186-
sortInfoBuilder.setSortStrategy(OrderType.of(clusteringStrategy));
187-
}
179+
sortInfoBuilder.setSortStrategy(clusteringStrategy(clusteringStrategy, columns.size()));
188180
int upstreamParallelism = input.getParallelism();
189181
String sinkParallelismValue =
190182
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@
4343
import java.util.Map;
4444

4545
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
46+
import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS;
47+
import static org.apache.paimon.CoreOptions.CLUSTERING_STRATEGY;
4648
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
4749
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
48-
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_COLUMNS;
4950
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
5051
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SORT_IN_CLUSTER;
51-
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
5252
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
5353

5454
/** Table sink to create sink. */

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,10 @@ public SparkZOrderUDF(int numCols, int varTypeSize, int maxOutputSize) {
7575
this.maxOutputSize = maxOutputSize;
7676
}
7777

78-
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
79-
in.defaultReadObject();
80-
inputBuffers = ThreadLocal.withInitial(() -> new ByteBuffer[numCols]);
81-
inputHolder = ThreadLocal.withInitial(() -> new byte[numCols][]);
82-
outputBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(totalOutputBytes));
83-
}
84-
8578
private ByteBuffer inputBuffer(int position, int size) {
79+
if (inputBuffers == null) {
80+
inputBuffers = ThreadLocal.withInitial(() -> new ByteBuffer[numCols]);
81+
}
8682
ByteBuffer buffer = inputBuffers.get()[position];
8783
if (buffer == null) {
8884
buffer = ByteBuffer.allocate(size);
@@ -92,6 +88,13 @@ private ByteBuffer inputBuffer(int position, int size) {
9288
}
9389

9490
byte[] interleaveBits(Seq<byte[]> scalaBinary) {
91+
if (inputHolder == null) {
92+
inputHolder = ThreadLocal.withInitial(() -> new byte[numCols][]);
93+
}
94+
if (outputBuffer == null) {
95+
outputBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(totalOutputBytes));
96+
}
97+
9598
byte[][] columnsBinary =
9699
JavaConverters.seqAsJavaList(scalaBinary).toArray(inputHolder.get());
97100
return ZOrderByteUtils.interleaveBits(columnsBinary, totalOutputBytes, outputBuffer.get());

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public ZorderSorter(FileStoreTable table, List<String> zOrderColNames) {
3838
checkNotEmpty();
3939
}
4040

41+
@Override
4142
public Dataset<Row> sort(Dataset<Row> df) {
4243
Column zColumn = zValue(df);
4344
Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zColumn);

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
2222
import org.apache.paimon.options.Options
2323
import org.apache.paimon.spark._
2424
import org.apache.paimon.spark.schema.SparkSystemColumns
25+
import org.apache.paimon.spark.sort.TableSorter
2526
import org.apache.paimon.table.FileStoreTable
26-
2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.sql.{DataFrame, PaimonUtils, Row, SparkSession}
2929
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -67,6 +67,11 @@ case class WriteIntoPaimonTable(
6767
}
6868
}
6969

70+
val clusteringColumns = table.coreOptions().clusteringColumns()
71+
if (!clusteringColumns.isEmpty) {
72+
data = clusteringInput(data, clusteringColumns)
73+
}
74+
7075
val (dynamicPartitionOverwriteMode, overwritePartition) = parseSaveMode()
7176
// use the extra options to rebuild the table object
7277
updateTableWithOptions(
@@ -82,6 +87,12 @@ case class WriteIntoPaimonTable(
8287
Seq.empty
8388
}
8489

90+
private def clusteringInput(data: DataFrame, clusteringColumns: java.util.List[String]): DataFrame = {
91+
val strategy = table.coreOptions().clusteringStrategy(table.schema().fields().size())
92+
val sorter = TableSorter.getSorter(table, strategy, clusteringColumns)
93+
sorter.sort(data)
94+
}
95+
8596
private def parseSaveMode(): (Boolean, Map[String, String]) = {
8697
var dynamicPartitionOverwriteMode = false
8798
val overwritePartition = saveMode match {

paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ public void testWriteWithDefaultValue() {
124124
"[[1,2,my_value], [2,2,my_value], [3,2,my_value], [4,2,my_value], [5,3,my_value]]");
125125
}
126126

127+
@Test
128+
public void testWriteWithClustering() {
129+
spark.sql("CREATE TABLE T (a INT, b INT) TBLPROPERTIES ('clustering.columns'='a')");
130+
spark.sql("INSERT INTO T VALUES (1, 1), (3, 3), (2, 2)").collectAsList();
131+
List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
132+
assertThat(rows.toString()).isEqualTo("[[1,1], [2,2], [3,3]]");
133+
}
134+
127135
@Test
128136
public void testWrite() {
129137
spark.sql(

0 commit comments

Comments
 (0)