Skip to content

Commit 6da7f39

Browse files
committed
[core] Refactory evolved and not evolved filters in file store scan
1 parent 2a70b56 commit 6da7f39

File tree

6 files changed

+102
-64
lines changed

6 files changed

+102
-64
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,13 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
4444

4545
private final boolean fileIndexReadEnabled;
4646

47-
private Predicate filter;
47+
private Predicate inputFilter;
4848

49-
// just cache.
50-
private final Map<Long, Predicate> dataFilterMapping = new ConcurrentHashMap<>();
49+
// cache not evolved filter by schema id
50+
private final Map<Long, Predicate> notEvolvedFilterMapping = new ConcurrentHashMap<>();
51+
52+
// cache evolved filter by schema id
53+
private final Map<Long, Predicate> evolvedFilterMapping = new ConcurrentHashMap<>();
5154

5255
public AppendOnlyFileStoreScan(
5356
ManifestsReader manifestsReader,
@@ -72,17 +75,23 @@ public AppendOnlyFileStoreScan(
7275
}
7376

7477
public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
75-
this.filter = predicate;
78+
this.inputFilter = predicate;
7679
this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter);
7780
return this;
7881
}
7982

8083
/** Note: Keep this thread-safe. */
8184
@Override
8285
protected boolean filterByStats(ManifestEntry entry) {
83-
Predicate safeFilter =
84-
simpleStatsEvolutions.toEvolutionSafeStatsFilter(entry.file().schemaId(), filter);
85-
if (safeFilter == null) {
86+
Predicate notEvolvedFilter =
87+
notEvolvedFilterMapping.computeIfAbsent(
88+
entry.file().schemaId(),
89+
id ->
90+
// keepNewFieldFilter to handle add field
91+
// for example, add field 'c', 'c > 3': old files can be filtered
92+
simpleStatsEvolutions.filterUnsafeFilter(
93+
entry.file().schemaId(), inputFilter, true));
94+
if (notEvolvedFilter == null) {
8695
return true;
8796
}
8897

@@ -93,12 +102,23 @@ protected boolean filterByStats(ManifestEntry entry) {
93102
entry.file().rowCount(),
94103
entry.file().valueStatsCols());
95104

96-
return safeFilter.test(
105+
// filter by min max
106+
boolean result =
107+
notEvolvedFilter.test(
97108
entry.file().rowCount(),
98109
stats.minValues(),
99110
stats.maxValues(),
100-
stats.nullCounts())
101-
&& (!fileIndexReadEnabled || testFileIndex(entry.file().embeddedIndex(), entry));
111+
stats.nullCounts());
112+
113+
if (!result) {
114+
return false;
115+
}
116+
117+
if (!fileIndexReadEnabled) {
118+
return true;
119+
}
120+
121+
return testFileIndex(entry.file().embeddedIndex(), entry);
102122
}
103123

104124
private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry entry) {
@@ -109,11 +129,11 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry
109129
RowType dataRowType = scanTableSchema(entry.file().schemaId()).logicalRowType();
110130

111131
Predicate dataPredicate =
112-
dataFilterMapping.computeIfAbsent(
132+
evolvedFilterMapping.computeIfAbsent(
113133
entry.file().schemaId(),
114134
id ->
115-
simpleStatsEvolutions.toEvolutionSafeStatsFilter(
116-
entry.file().schemaId(), filter));
135+
simpleStatsEvolutions.tryDevolveFilter(
136+
entry.file().schemaId(), inputFilter));
117137

118138
try (FileIndexPredicate predicate =
119139
new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,14 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {
6161

6262
private Predicate keyFilter;
6363
private Predicate valueFilter;
64-
private final Map<Long, Predicate> schemaId2DataFilter = new ConcurrentHashMap<>();
6564
private boolean valueFilterForceEnabled = false;
6665

66+
// cache not evolved filter by schema id
67+
private final Map<Long, Predicate> notEvolvedFilterMapping = new ConcurrentHashMap<>();
68+
69+
// cache evolved filter by schema id
70+
private final Map<Long, Predicate> evolvedFilterMapping = new ConcurrentHashMap<>();
71+
6772
public KeyValueFileStoreScan(
6873
ManifestsReader manifestsReader,
6974
BucketSelectConverter bucketSelectConverter,
@@ -125,8 +130,7 @@ protected boolean filterByStats(ManifestEntry entry) {
125130
}
126131

127132
Predicate safeKeyFilter =
128-
fieldKeyStatsConverters.toEvolutionSafeStatsFilter(
129-
entry.file().schemaId(), keyFilter);
133+
fieldKeyStatsConverters.tryDevolveFilter(entry.file().schemaId(), keyFilter);
130134
if (safeKeyFilter == null) {
131135
return true;
132136
}
@@ -157,10 +161,10 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE
157161
try (FileIndexPredicate predicate =
158162
new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
159163
Predicate dataPredicate =
160-
schemaId2DataFilter.computeIfAbsent(
164+
evolvedFilterMapping.computeIfAbsent(
161165
entry.file().schemaId(),
162166
id ->
163-
fieldValueStatsConverters.toEvolutionSafeStatsFilter(
167+
fieldValueStatsConverters.tryDevolveFilter(
164168
entry.file().schemaId(), valueFilter));
165169
return predicate.evaluate(dataPredicate).remain();
166170
} catch (IOException e) {
@@ -229,10 +233,15 @@ private boolean filterByValueFilter(ManifestEntry entry) {
229233
return ((FilteredManifestEntry) entry).selected();
230234
}
231235

232-
Predicate safeValueFilter =
233-
fieldValueStatsConverters.toEvolutionSafeStatsFilter(
234-
entry.file().schemaId(), valueFilter);
235-
if (safeValueFilter == null) {
236+
Predicate notEvolvedFilter =
237+
notEvolvedFilterMapping.computeIfAbsent(
238+
entry.file().schemaId(),
239+
id ->
240+
// keepNewFieldFilter to handle add field
241+
// for example, add field 'c', 'c > 3': old files can be filtered
242+
fieldValueStatsConverters.filterUnsafeFilter(
243+
entry.file().schemaId(), valueFilter, true));
244+
if (notEvolvedFilter == null) {
236245
return true;
237246
}
238247

@@ -241,7 +250,7 @@ private boolean filterByValueFilter(ManifestEntry entry) {
241250
fieldValueStatsConverters
242251
.getOrCreate(file.schemaId())
243252
.evolution(file.valueStats(), file.rowCount(), file.valueStatsCols());
244-
return safeValueFilter.test(
253+
return notEvolvedFilter.test(
245254
file.rowCount(),
246255
result.minValues(),
247256
result.maxValues(),

paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java

Lines changed: 16 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import javax.annotation.Nullable;
4343

4444
import java.util.ArrayList;
45+
import java.util.Collections;
4546
import java.util.HashMap;
4647
import java.util.LinkedHashMap;
4748
import java.util.List;
@@ -131,18 +132,17 @@ public CastFieldGetter[] getCastMapping() {
131132
* @param tableFields the table fields
132133
* @param dataFields the underlying data fields
133134
* @param filters the filters
134-
* @param forData true if devolve the filters for filtering data file, otherwise, for filtering
135-
* manifest entry
135+
* @param keepNewFieldFilter true if keep new field filter, the new field filter needs to be
136+
* properly handled
136137
* @return the data filters
137138
*/
138-
@Nullable
139139
public static List<Predicate> devolveFilters(
140140
List<DataField> tableFields,
141141
List<DataField> dataFields,
142142
List<Predicate> filters,
143-
boolean forData) {
143+
boolean keepNewFieldFilter) {
144144
if (filters == null) {
145-
return null;
145+
return Collections.emptyList();
146146
}
147147

148148
Map<String, DataField> nameToTableFields =
@@ -159,36 +159,19 @@ public static List<Predicate> devolveFilters(
159159
String.format("Find no field %s", predicate.fieldName()));
160160
DataField dataField = idToDataFields.get(tableField.id());
161161
if (dataField == null) {
162-
// For example, add field b and filter b, the filter is safe for old file
163-
// meta without field b because the index mapping array can handle null
164-
return forData ? Optional.empty() : Optional.of(predicate);
162+
return keepNewFieldFilter ? Optional.of(predicate) : Optional.empty();
165163
}
166164

167-
Optional<List<Object>> castedLiterals =
168-
CastExecutors.castLiteralsWithEvolution(
169-
predicate.literals(), predicate.type(), dataField.type());
170-
171-
// unsafe
172-
if (!castedLiterals.isPresent()) {
173-
return Optional.empty();
174-
}
175-
176-
if (forData) {
177-
// For data, the filter will be pushdown to data file, so must use the index
178-
// and literal type of data file
179-
return Optional.of(
180-
new LeafPredicate(
181-
predicate.function(),
182-
dataField.type(),
183-
indexOf(dataField, idToDataFields),
184-
dataField.name(),
185-
castedLiterals.get()));
186-
} else {
187-
// For meta, the index mapping array will map the index the cast the
188-
// literals, so just return self
189-
// In other words, return it if it's safe
190-
return Optional.of(predicate);
191-
}
165+
return CastExecutors.castLiteralsWithEvolution(
166+
predicate.literals(), predicate.type(), dataField.type())
167+
.map(
168+
literals ->
169+
new LeafPredicate(
170+
predicate.function(),
171+
dataField.type(),
172+
indexOf(dataField, idToDataFields),
173+
dataField.name(),
174+
literals));
192175
};
193176

194177
for (Predicate predicate : filters) {

paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,21 @@
2121
import org.apache.paimon.predicate.Predicate;
2222
import org.apache.paimon.predicate.PredicateBuilder;
2323
import org.apache.paimon.schema.IndexCastMapping;
24-
import org.apache.paimon.schema.SchemaEvolutionUtil;
2524
import org.apache.paimon.types.DataField;
2625
import org.apache.paimon.types.RowType;
2726

2827
import javax.annotation.Nullable;
2928

29+
import java.util.ArrayList;
3030
import java.util.List;
31-
import java.util.Objects;
3231
import java.util.concurrent.ConcurrentHashMap;
3332
import java.util.concurrent.ConcurrentMap;
3433
import java.util.concurrent.atomic.AtomicReference;
3534
import java.util.function.Function;
3635

36+
import static java.util.Collections.singletonList;
3737
import static org.apache.paimon.schema.SchemaEvolutionUtil.createIndexCastMapping;
38+
import static org.apache.paimon.schema.SchemaEvolutionUtil.devolveFilters;
3839

3940
/** Converters to create col stats array serializer. */
4041
public class SimpleStatsEvolutions {
@@ -82,7 +83,7 @@ public SimpleStatsEvolution getOrCreate(long dataSchemaId) {
8283
* filter or null if can't.
8384
*/
8485
@Nullable
85-
public Predicate toEvolutionSafeStatsFilter(long dataSchemaId, @Nullable Predicate filter) {
86+
public Predicate tryDevolveFilter(long dataSchemaId, @Nullable Predicate filter) {
8687
if (filter == null || dataSchemaId == tableSchemaId) {
8788
return filter;
8889
}
@@ -91,13 +92,38 @@ public Predicate toEvolutionSafeStatsFilter(long dataSchemaId, @Nullable Predica
9192
// compute engine to perform p2.
9293
List<Predicate> filters = PredicateBuilder.splitAnd(filter);
9394
List<Predicate> devolved =
94-
Objects.requireNonNull(
95-
SchemaEvolutionUtil.devolveFilters(
96-
tableDataFields, schemaFields.apply(dataSchemaId), filters, false));
95+
devolveFilters(tableDataFields, schemaFields.apply(dataSchemaId), filters, false);
9796

9897
return devolved.isEmpty() ? null : PredicateBuilder.and(devolved);
9998
}
10099

100+
/**
101+
* Filter unsafe filter, for example, filter is 'a > 9', old type is String, new type is Int, if
102+
* records are 9, 10 and 11, the evolved filter is not safe.
103+
*/
104+
@Nullable
105+
public Predicate filterUnsafeFilter(
106+
long dataSchemaId, @Nullable Predicate filter, boolean keepNewFieldFilter) {
107+
if (filter == null || dataSchemaId == tableSchemaId) {
108+
return filter;
109+
}
110+
111+
List<Predicate> filters = PredicateBuilder.splitAnd(filter);
112+
List<DataField> oldSchema = schemaFields.apply(dataSchemaId);
113+
List<Predicate> result = new ArrayList<>();
114+
for (Predicate predicate : filters) {
115+
if (!devolveFilters(
116+
tableDataFields,
117+
oldSchema,
118+
singletonList(predicate),
119+
keepNewFieldFilter)
120+
.isEmpty()) {
121+
result.add(predicate);
122+
}
123+
}
124+
return result.isEmpty() ? null : PredicateBuilder.and(result);
125+
}
126+
101127
public List<DataField> tableDataFields() {
102128
return tableDataFields;
103129
}

paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ private List<Predicate> readFilters(
309309
tableSchema.id() == dataSchema.id()
310310
? filters
311311
: SchemaEvolutionUtil.devolveFilters(
312-
tableSchema.fields(), dataSchema.fields(), filters, true);
312+
tableSchema.fields(), dataSchema.fields(), filters, false);
313313

314314
// Skip pushing down partition filters to reader.
315315
return excludePredicateWithFields(

paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void testDevolveDataFilters() {
9191
IsNull.INSTANCE, DataTypes.INT(), 7, "a", Collections.emptyList()));
9292

9393
List<Predicate> filters =
94-
SchemaEvolutionUtil.devolveFilters(tableFields2, dataFields, predicates, true);
94+
SchemaEvolutionUtil.devolveFilters(tableFields2, dataFields, predicates, false);
9595
assertThat(filters).isNotNull();
9696
assertThat(filters.size()).isEqualTo(1);
9797

0 commit comments

Comments
 (0)