Skip to content

Commit 01072c9

Browse files
committed
[core] Add validation to upsert key
1 parent 9a41976 commit 01072c9

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,18 @@ public class SchemaValidation {
9797
* @param schema the schema to be validated
9898
*/
9999
public static void validateTableSchema(TableSchema schema) {
100+
CoreOptions options = new CoreOptions(schema.options());
101+
100102
validateOnlyContainPrimitiveType(schema.fields(), schema.primaryKeys(), "primary key");
101103
validateOnlyContainPrimitiveType(schema.fields(), schema.partitionKeys(), "partition");
104+
validateOnlyContainPrimitiveType(schema.fields(), options.upsertKey(), "upsert key");
102105

103-
CoreOptions options = new CoreOptions(schema.options());
106+
if (!options.upsertKey().isEmpty() && !schema.primaryKeys().isEmpty()) {
107+
throw new RuntimeException(
108+
String.format(
109+
"Cannot define 'upsert-key' %s with 'primary-key' %s.",
110+
options.upsertKey(), schema.primaryKeys()));
111+
}
104112

105113
validateBucket(schema, options);
106114

@@ -251,6 +259,9 @@ private static void validateOnlyContainPrimitiveType(
251259
}
252260
for (String fieldName : fieldNames) {
253261
DataField rowField = rowFields.get(fieldName);
262+
if (rowField == null) {
263+
throw new IllegalArgumentException("Cannot find field: " + fieldName);
264+
}
254265
DataType dataType = rowField.type();
255266
if (PRIMARY_KEY_UNSUPPORTED_LOGICAL_TYPES.stream()
256267
.anyMatch(c -> c.isInstance(dataType))) {

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RewriteUpsertTableTestBase.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@ import org.apache.spark.sql.Row
2424

2525
abstract class RewriteUpsertTableTestBase extends PaimonSparkTestBase {
2626

27+
test("Rewrite Upsert Table: cannot define with primary key") {
28+
assert(intercept[Exception] {
29+
sql("""
30+
|CREATE TABLE T (k INT, a INT, b STRING)
31+
|TBLPROPERTIES ('upsert-key' = 'k', 'primary-key' = 'k')
32+
|""".stripMargin)
33+
}.getMessage.contains("Cannot define 'upsert-key' [k] with 'primary-key' [k]."))
34+
}
35+
2736
test("Rewrite Upsert Table: rewrite insert without sequence field") {
2837
sql("""
2938
|CREATE TABLE T (k INT, a INT, b STRING)

0 commit comments

Comments
 (0)