Skip to content

Commit 84d3fdc

Browse files
committed
[spark] Throw exception when to sort compact data evolution table
1 parent bdcd3ec commit 84d3fdc

File tree

6 files changed

+25
-3
lines changed

6 files changed

+25
-3
lines changed

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,9 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options)
599599
checkArgument(
600600
!options.deletionVectorsEnabled(),
601601
"Data evolution config must disabled with deletion-vectors.enabled");
602+
checkArgument(
603+
!options.clusteringIncrementalEnabled(),
604+
"Data evolution config must disabled with clustering.incremental");
602605
}
603606

604607
Pair<RowType, RowType> normalAndBlobType = BlobType.splitBlob(schema.logicalRowType());

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ protected boolean buildImpl() throws Exception {
144144
if (fileStoreTable.coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) {
145145
buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming);
146146
} else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
147-
148147
if (fileStoreTable.coreOptions().dataEvolutionEnabled()) {
149148
buildForDataEvolutionTableCompact(env, fileStoreTable, isStreaming);
150149
} else if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ public SortCompactAction(
5656
Map<String, String> catalogConfig,
5757
Map<String, String> tableConf) {
5858
super(database, tableName, catalogConfig, tableConf);
59-
6059
table = table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
6160
}
6261

@@ -77,6 +76,10 @@ public void build() throws Exception {
7776
}
7877
FileStoreTable fileStoreTable = (FileStoreTable) table;
7978

79+
if (fileStoreTable.coreOptions().dataEvolutionEnabled()) {
80+
throw new UnsupportedOperationException("Data Evolution table cannot be sorted!");
81+
}
82+
8083
if (fileStoreTable.bucketMode() != BucketMode.BUCKET_UNAWARE
8184
&& fileStoreTable.bucketMode() != BucketMode.HASH_DYNAMIC) {
8285
throw new IllegalArgumentException("Sort Compact only supports bucket=-1 yet.");

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,7 @@ public void testDataEvolutionTableCompact() throws Exception {
849849

850850
FileStoreTable table =
851851
prepareTable(
852-
Arrays.asList("k"),
852+
Collections.singletonList("k"),
853853
Collections.emptyList(),
854854
Collections.emptyList(),
855855
tableOptions);

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,9 @@ private void sortCompactUnAwareBucketTable(
608608
List<String> sortColumns,
609609
DataSourceV2Relation relation,
610610
@Nullable PartitionPredicate partitionPredicate) {
611+
if (table.coreOptions().dataEvolutionEnabled()) {
612+
throw new UnsupportedOperationException("Data Evolution table cannot be sorted!");
613+
}
611614
SnapshotReader snapshotReader = table.newSnapshotReader();
612615
if (partitionPredicate != null) {
613616
snapshotReader.withPartitionFilter(partitionPredicate);

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,20 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase {
651651
}
652652
}
653653

654+
test("Data Evolution: compact sort throw exception") {
655+
withTable("s", "t") {
656+
sql(
657+
s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
658+
sql("INSERT INTO t VALUES (1, 1, 1)")
659+
assert(
660+
intercept[Exception](
661+
sql("CALL sys.compact(table => 't', order_strategy => 'order', order_by => 'id')")
662+
.collect()).getMessage
663+
.contains("Data Evolution table cannot be sorted!"))
664+
665+
}
666+
}
667+
654668
test("Data Evolution: test global indexed column update action -- throw error") {
655669
withTable("T") {
656670
spark.sql("""

0 commit comments

Comments
 (0)