Skip to content

Commit f72de45

Browse files
committed
[core] should not push topN with file schema evolution
1 parent 53b3c2c commit f72de45

File tree

4 files changed

+86
-28
lines changed

4 files changed

+86
-28
lines changed

paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,16 @@ public List<String> fieldNames() {
143143
return fields.stream().map(DataField::name).collect(Collectors.toList());
144144
}
145145

146+
public Map<String, DataField> nameToFieldMap() {
147+
return fields.stream()
148+
.collect(Collectors.toMap(DataField::name, field -> field, (a, b) -> b));
149+
}
150+
151+
public Map<Integer, DataField> idToFieldMap() {
152+
return fields.stream()
153+
.collect(Collectors.toMap(DataField::id, field -> field, (a, b) -> b));
154+
}
155+
146156
public int highestFieldId() {
147157
return highestFieldId;
148158
}
@@ -156,14 +166,14 @@ public List<String> primaryKeys() {
156166
}
157167

158168
public List<String> trimmedPrimaryKeys() {
159-
if (primaryKeys.size() > 0) {
169+
if (!primaryKeys.isEmpty()) {
160170
List<String> adjusted =
161171
primaryKeys.stream()
162172
.filter(pk -> !partitionKeys.contains(pk))
163173
.collect(Collectors.toList());
164174

165175
Preconditions.checkState(
166-
adjusted.size() > 0,
176+
!adjusted.isEmpty(),
167177
String.format(
168178
"Primary key constraint %s should not be same with partition fields %s,"
169179
+ " this will result in only one record in a partition",
@@ -192,7 +202,7 @@ public boolean crossPartitionUpdate() {
192202
return false;
193203
}
194204

195-
return !primaryKeys.containsAll(partitionKeys);
205+
return notContainsAll(primaryKeys, partitionKeys);
196206
}
197207

198208
/** Original bucket keys, maybe empty. */
@@ -202,7 +212,7 @@ private List<String> originalBucketKeys() {
202212
return Collections.emptyList();
203213
}
204214
List<String> bucketKeys = Arrays.asList(key.split(","));
205-
if (!containsAll(fieldNames(), bucketKeys)) {
215+
if (notContainsAll(fieldNames(), bucketKeys)) {
206216
throw new RuntimeException(
207217
String.format(
208218
"Field names %s should contains all bucket keys %s.",
@@ -214,8 +224,8 @@ private List<String> originalBucketKeys() {
214224
"Bucket keys %s should not in partition keys %s.",
215225
bucketKeys, partitionKeys));
216226
}
217-
if (primaryKeys.size() > 0) {
218-
if (!containsAll(primaryKeys, bucketKeys)) {
227+
if (!primaryKeys.isEmpty()) {
228+
if (notContainsAll(primaryKeys, bucketKeys)) {
219229
throw new RuntimeException(
220230
String.format(
221231
"Primary keys %s should contains all bucket keys %s.",
@@ -225,8 +235,8 @@ private List<String> originalBucketKeys() {
225235
return bucketKeys;
226236
}
227237

228-
private boolean containsAll(List<String> all, List<String> contains) {
229-
return new HashSet<>(all).containsAll(new HashSet<>(contains));
238+
private boolean notContainsAll(List<String> all, List<String> contains) {
239+
return !new HashSet<>(all).containsAll(new HashSet<>(contains));
230240
}
231241

232242
public @Nullable String comment() {

paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.paimon.schema.TableSchema;
2828
import org.apache.paimon.utils.ListUtils;
2929

30+
import javax.annotation.Nullable;
31+
3032
import java.io.IOException;
3133
import java.util.List;
3234
import java.util.stream.Collectors;
@@ -38,7 +40,7 @@ public static FileIndexResult evaluate(
3840
FileIO fileIO,
3941
TableSchema dataSchema,
4042
List<Predicate> dataFilter,
41-
TopN topN,
43+
@Nullable TopN topN,
4244
DataFilePathFactory dataFilePathFactory,
4345
DataFileMeta file)
4446
throws IOException {

paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.format.FormatReaderFactory;
2424
import org.apache.paimon.partition.PartitionUtils;
2525
import org.apache.paimon.predicate.Predicate;
26+
import org.apache.paimon.predicate.SortValue;
2627
import org.apache.paimon.predicate.TopN;
2728
import org.apache.paimon.schema.IndexCastMapping;
2829
import org.apache.paimon.schema.SchemaEvolutionUtil;
@@ -41,6 +42,7 @@
4142
import java.util.HashSet;
4243
import java.util.List;
4344
import java.util.Map;
45+
import java.util.Objects;
4446
import java.util.function.Function;
4547

4648
import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
@@ -221,7 +223,25 @@ public FormatReaderMapping build(
221223
dataSchema,
222224
readFilters,
223225
systemFields,
224-
topN);
226+
evolutionTopN(tableSchema, dataSchema));
227+
}
228+
229+
@Nullable
230+
private TopN evolutionTopN(TableSchema tableSchema, TableSchema dataSchema) {
231+
TopN pushTopN = topN;
232+
if (pushTopN != null) {
233+
Map<String, DataField> tableFields = tableSchema.nameToFieldMap();
234+
Map<Integer, DataField> dataFields = dataSchema.idToFieldMap();
235+
for (SortValue value : pushTopN.orders()) {
236+
DataField tableField = tableFields.get(value.field().name());
237+
DataField dataField = dataFields.get(tableField.id());
238+
if (!Objects.equals(tableField, dataField)) {
239+
pushTopN = null;
240+
break;
241+
}
242+
}
243+
}
244+
return pushTopN;
225245
}
226246

227247
public FormatReaderMapping build(

paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.paimon.predicate.TopN;
4747
import org.apache.paimon.reader.RecordReader;
4848
import org.apache.paimon.schema.Schema;
49+
import org.apache.paimon.schema.SchemaChange;
4950
import org.apache.paimon.schema.SchemaManager;
5051
import org.apache.paimon.schema.SchemaUtils;
5152
import org.apache.paimon.schema.TableSchema;
@@ -831,25 +832,23 @@ public void testRangeBitmapIndexTopNFilter() throws Exception {
831832
.field("event", DataTypes.STRING())
832833
.field("price", DataTypes.INT())
833834
.build();
835+
Consumer<Options> configure =
836+
options -> {
837+
options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
838+
options.set(WRITE_ONLY, true);
839+
options.set(
840+
FileIndexOptions.FILE_INDEX
841+
+ "."
842+
+ RangeBitmapFileIndexFactory.RANGE_BITMAP
843+
+ "."
844+
+ CoreOptions.COLUMNS,
845+
"price");
846+
options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576");
847+
options.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
848+
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
849+
};
834850
// in unaware-bucket mode, we split files into splits all the time
835-
FileStoreTable table =
836-
createUnawareBucketFileStoreTable(
837-
rowType,
838-
options -> {
839-
options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
840-
options.set(WRITE_ONLY, true);
841-
options.set(
842-
FileIndexOptions.FILE_INDEX
843-
+ "."
844-
+ RangeBitmapFileIndexFactory.RANGE_BITMAP
845-
+ "."
846-
+ CoreOptions.COLUMNS,
847-
"price");
848-
options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576");
849-
options.set(
850-
ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
851-
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
852-
});
851+
FileStoreTable table = createUnawareBucketFileStoreTable(rowType, configure);
853852

854853
int bound = 300000;
855854
int rowCount = 1000000;
@@ -918,6 +917,33 @@ public void testRangeBitmapIndexTopNFilter() throws Exception {
918917
assertThat(cnt.get()).isEqualTo(rowCount);
919918
reader.close();
920919
}
920+
921+
// test should not push topN with index and evolution
922+
{
923+
table.schemaManager()
924+
.commitChanges(SchemaChange.updateColumnType("price", DataTypes.BIGINT()));
925+
rowType =
926+
RowType.builder()
927+
.field("id", DataTypes.STRING())
928+
.field("event", DataTypes.STRING())
929+
.field("price", DataTypes.BIGINT())
930+
.build();
931+
table = createUnawareBucketFileStoreTable(rowType, configure);
932+
DataField field = rowType.getField("price");
933+
SortValue sort =
934+
new SortValue(
935+
new FieldRef(field.id(), field.name(), field.type()),
936+
SortValue.SortDirection.DESCENDING,
937+
SortValue.NullOrdering.NULLS_LAST);
938+
TopN topN = new TopN(Collections.singletonList(sort), k);
939+
TableScan.Plan plan = table.newScan().plan();
940+
RecordReader<InternalRow> reader =
941+
table.newRead().withTopN(topN).createReader(plan.splits());
942+
AtomicInteger cnt = new AtomicInteger(0);
943+
reader.forEachRemaining(row -> cnt.incrementAndGet());
944+
assertThat(cnt.get()).isEqualTo(rowCount);
945+
reader.close();
946+
}
921947
}
922948

923949
@Test

0 commit comments

Comments
 (0)