Skip to content

Commit 153eb90

Browse files
committed
[core] Validate merge function creation in SchemaValidation
1 parent 7a7373b commit 153eb90

File tree

2 files changed

+11
-94
lines changed

2 files changed

+11
-94
lines changed

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 6 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.paimon.format.FileFormat;
2727
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
2828
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
29-
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
3029
import org.apache.paimon.options.ConfigOption;
3130
import org.apache.paimon.options.Options;
3231
import org.apache.paimon.table.BucketMode;
@@ -44,16 +43,12 @@
4443
import org.apache.paimon.utils.Preconditions;
4544
import org.apache.paimon.utils.StringUtils;
4645

47-
import java.util.ArrayList;
4846
import java.util.Arrays;
49-
import java.util.Collection;
5047
import java.util.Collections;
5148
import java.util.HashMap;
52-
import java.util.HashSet;
5349
import java.util.List;
5450
import java.util.Map;
5551
import java.util.Optional;
56-
import java.util.Set;
5752
import java.util.stream.Collectors;
5853

5954
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
@@ -77,7 +72,7 @@
7772
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
7873
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
7974
import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
80-
import static org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
75+
import static org.apache.paimon.table.PrimaryKeyTableUtils.createMergeFunctionFactory;
8176
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
8277
import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
8378
import static org.apache.paimon.types.DataTypeRoot.ARRAY;
@@ -122,7 +117,7 @@ public static void validateTableSchema(TableSchema schema) {
122117

123118
validateSequenceField(schema, options);
124119

125-
validateSequenceGroup(schema, options);
120+
validateMergeFunction(schema);
126121

127122
ChangelogProducer changelogProducer = options.changelogProducer();
128123
if (schema.primaryKeys().isEmpty() && changelogProducer != ChangelogProducer.NONE) {
@@ -449,90 +444,12 @@ private static void validateFieldsPrefix(TableSchema schema, CoreOptions options
449444
});
450445
}
451446

452-
private static void validateSequenceGroup(TableSchema schema, CoreOptions options) {
453-
Map<String, Set<String>> fields2Group = new HashMap<>();
454-
Set<Integer> sequenceGroupFieldIndexs = new HashSet<>();
455-
List<String> fieldNames = schema.fieldNames();
456-
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
457-
String k = entry.getKey();
458-
String v = entry.getValue();
459-
if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
460-
Arrays.stream(v.split(FIELDS_SEPARATOR))
461-
.map(fieldName -> requireField(fieldName, fieldNames))
462-
.forEach(sequenceGroupFieldIndexs::add);
463-
String[] sequenceFieldNames =
464-
k.substring(
465-
FIELDS_PREFIX.length() + 1,
466-
k.length() - SEQUENCE_GROUP.length() - 1)
467-
.split(FIELDS_SEPARATOR);
468-
469-
for (String field : v.split(FIELDS_SEPARATOR)) {
470-
if (!fieldNames.contains(field)) {
471-
throw new IllegalArgumentException(
472-
String.format("Field %s can not be found in table schema.", field));
473-
}
474-
475-
List<String> sequenceFieldsList = new ArrayList<>();
476-
for (String sequenceFieldName : sequenceFieldNames) {
477-
if (!fieldNames.contains(sequenceFieldName)) {
478-
throw new IllegalArgumentException(
479-
String.format(
480-
"The sequence field group: %s can not be found in table schema.",
481-
sequenceFieldName));
482-
}
483-
sequenceFieldsList.add(sequenceFieldName);
484-
}
485-
486-
if (fields2Group.containsKey(field)) {
487-
List<List<String>> sequenceGroups = new ArrayList<>();
488-
sequenceGroups.add(new ArrayList<>(fields2Group.get(field)));
489-
sequenceGroups.add(sequenceFieldsList);
490-
491-
throw new IllegalArgumentException(
492-
String.format(
493-
"Field %s is defined repeatedly by multiple groups: %s.",
494-
field, sequenceGroups));
495-
}
496-
497-
Set<String> group = fields2Group.computeIfAbsent(field, p -> new HashSet<>());
498-
group.addAll(sequenceFieldsList);
499-
}
500-
501-
// add self
502-
Arrays.stream(sequenceFieldNames)
503-
.mapToInt(fieldName -> requireField(fieldName, fieldNames))
504-
.forEach(sequenceGroupFieldIndexs::add);
505-
}
506-
}
507-
508-
if (options.mergeEngine() == MergeEngine.PARTIAL_UPDATE) {
509-
for (String fieldName : fieldNames) {
510-
String aggFunc = options.fieldAggFunc(fieldName);
511-
String aggFuncName = aggFunc == null ? options.fieldsDefaultFunc() : aggFunc;
512-
if (schema.primaryKeys().contains(fieldName)) {
513-
continue;
514-
}
515-
if (aggFuncName != null) {
516-
// last_non_null_value doesn't require sequence group
517-
checkArgument(
518-
aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
519-
|| sequenceGroupFieldIndexs.contains(
520-
fieldNames.indexOf(fieldName)),
521-
"Must use sequence group for aggregation functions but not found for field %s.",
522-
fieldName);
523-
}
524-
}
447+
private static void validateMergeFunction(TableSchema schema) {
448+
if (schema.primaryKeys().isEmpty()) {
449+
return;
525450
}
526451

527-
Set<String> illegalGroup =
528-
fields2Group.values().stream()
529-
.flatMap(Collection::stream)
530-
.filter(g -> options.fieldAggFunc(g) != null)
531-
.collect(Collectors.toSet());
532-
if (!illegalGroup.isEmpty()) {
533-
throw new IllegalArgumentException(
534-
"Should not defined aggregation function on sequence group: " + illegalGroup);
535-
}
452+
createMergeFunctionFactory(schema);
536453
}
537454

538455
private static void validateForDeletionVectors(CoreOptions options) {

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ public void testInvalidSequenceGroup() {
304304
+ "'merge-engine'='partial-update', "
305305
+ "'fields.g_1.sequence-group'='a1,b', "
306306
+ "'fields.g_2.sequence-group'='c,d');"))
307-
.hasRootCauseMessage("Field a1 can not be found in table schema.");
307+
.hasRootCauseMessage("Field a1 can not be found in table schema");
308308

309309
Assertions.assertThatThrownBy(
310310
() ->
@@ -315,8 +315,8 @@ public void testInvalidSequenceGroup() {
315315
+ "'merge-engine'='partial-update', "
316316
+ "'fields.g_1.sequence-group'='a,b', "
317317
+ "'fields.g_2.sequence-group'='a,d');"))
318-
.hasRootCauseMessage(
319-
"Field a is defined repeatedly by multiple groups: [[g_1], [g_2]].");
318+
.rootCause()
319+
.hasMessageContaining("Field a is defined repeatedly by multiple groups");
320320

321321
Assertions.assertThatThrownBy(
322322
() ->
@@ -327,8 +327,8 @@ public void testInvalidSequenceGroup() {
327327
+ "'merge-engine'='partial-update', "
328328
+ "'fields.g_1.sequence-group'='a,b', "
329329
+ "'fields.g_2,g_3.sequence-group'='a,d');"))
330-
.hasRootCauseMessage(
331-
"Field a is defined repeatedly by multiple groups: [[g_1], [g_2, g_3]].");
330+
.rootCause()
331+
.hasMessageContaining("Field a is defined repeatedly by multiple groups");
332332
}
333333

334334
@Test

0 commit comments

Comments
 (0)