Skip to content

Commit 76d037e

Browse files
committed
[common]Introduce Predicate to do filter and partition push down.
1 parent c1927fc commit 76d037e

File tree

7 files changed

+99
-108
lines changed

7 files changed

+99
-108
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.alibaba.fluss.flink.source.split.SourceSplitSerializer;
3131
import com.alibaba.fluss.flink.source.state.FlussSourceEnumeratorStateSerializer;
3232
import com.alibaba.fluss.flink.source.state.SourceEnumeratorState;
33-
import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual;
3433
import com.alibaba.fluss.metadata.TablePath;
3534
import com.alibaba.fluss.predicate.Predicate;
3635
import com.alibaba.fluss.types.RowType;
@@ -49,10 +48,6 @@
4948

5049
import javax.annotation.Nullable;
5150

52-
import java.util.List;
53-
54-
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
55-
5651
/** Flink source for Fluss. */
5752
public class FlinkSource<OUT>
5853
implements Source<OUT, SourceSplitBase, SourceEnumeratorState>, ResultTypeQueryable {
@@ -69,7 +64,6 @@ public class FlinkSource<OUT>
6964
private final boolean streaming;
7065
private final FlussDeserializationSchema<OUT> deserializationSchema;
7166

72-
private final List<FieldEqual> partitionFilters;
7367
private Predicate predicate;
7468

7569
public FlinkSource(
@@ -83,7 +77,6 @@ public FlinkSource(
8377
long scanPartitionDiscoveryIntervalMs,
8478
FlussDeserializationSchema<OUT> deserializationSchema,
8579
boolean streaming,
86-
List<FieldEqual> partitionFilters,
8780
Predicate predicate) {
8881
this.flussConf = flussConf;
8982
this.tablePath = tablePath;
@@ -95,9 +88,7 @@ public FlinkSource(
9588
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
9689
this.deserializationSchema = deserializationSchema;
9790
this.streaming = streaming;
98-
this.partitionFilters = checkNotNull(partitionFilters);
9991
this.predicate = predicate;
100-
10192
}
10293

10394
@Override
@@ -117,7 +108,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> createEnumerator(
117108
offsetsInitializer,
118109
scanPartitionDiscoveryIntervalMs,
119110
streaming,
120-
partitionFilters,predicate);
111+
predicate);
121112
}
122113

123114
@Override
@@ -135,7 +126,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
135126
offsetsInitializer,
136127
scanPartitionDiscoveryIntervalMs,
137128
streaming,
138-
partitionFilters,predicate);
129+
predicate);
139130
}
140131

141132
@Override

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@
8383
import java.util.Optional;
8484
import java.util.stream.Collectors;
8585

86-
import static com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
87-
import static com.alibaba.fluss.flink.utils.PushdownUtils.extractFieldEquals;
8886
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
8987

9088
/** Flink table source to scan Fluss data. */
@@ -371,7 +369,6 @@ public DynamicTableSource copy() {
371369
source.projectedFields = projectedFields;
372370
source.singleRowFilter = singleRowFilter;
373371
source.modificationScanType = modificationScanType;
374-
source.partitionFilters = partitionFilters;
375372
return source;
376373
}
377374

@@ -414,7 +411,7 @@ && hasPrimaryKey()
414411
primaryKeyTypes,
415412
acceptedFilters,
416413
remainingFilters,
417-
ValueConversion.FLINK_INTERNAL_VALUE);
414+
PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE);
418415
int[] keyRowProjection = getKeyRowProjection();
419416
HashSet<Integer> visitedPkFields = new HashSet<>();
420417
GenericRowData lookupRow = new GenericRowData(primaryKeyIndexes.length);

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlussSource.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626

2727
import javax.annotation.Nullable;
2828

29-
import java.util.Collections;
30-
3129
/**
3230
* A Flink DataStream source implementation for reading data from Fluss tables.
3331
*
@@ -82,7 +80,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
8280
scanPartitionDiscoveryIntervalMs,
8381
deserializationSchema,
8482
streaming,
85-
Collections.emptyList());
83+
null);
8684
}
8785

8886
/**

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/PredicateConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.alibaba.fluss.connector.flink.source;
19+
package com.alibaba.fluss.flink.source;
2020

21-
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
2221
import com.alibaba.fluss.connector.flink.utils.TypeUtils;
22+
import com.alibaba.fluss.flink.utils.FlinkConversions;
2323
import com.alibaba.fluss.predicate.Predicate;
2424
import com.alibaba.fluss.predicate.PredicateBuilder;
2525
import com.alibaba.fluss.row.BinaryString;

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 21 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import com.alibaba.fluss.flink.source.split.LogSplit;
3636
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
3737
import com.alibaba.fluss.flink.source.state.SourceEnumeratorState;
38-
import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual;
3938
import com.alibaba.fluss.metadata.PartitionInfo;
4039
import com.alibaba.fluss.metadata.TableBucket;
4140
import com.alibaba.fluss.metadata.TableInfo;
@@ -44,7 +43,6 @@
4443
import com.alibaba.fluss.row.BinaryString;
4544
import com.alibaba.fluss.row.GenericRow;
4645
import com.alibaba.fluss.row.InternalRow;
47-
import com.alibaba.fluss.types.DataField;
4846
import com.alibaba.fluss.utils.ExceptionUtils;
4947

5048
import org.apache.flink.annotation.VisibleForTesting;
@@ -168,8 +166,7 @@ public FlinkSourceEnumerator(
168166
Map<Long, String> assignedPartitions,
169167
OffsetsInitializer startingOffsetsInitializer,
170168
long scanPartitionDiscoveryIntervalMs,
171-
boolean streaming,
172-
List<FieldEqual> partitionFilters) {
169+
boolean streaming) {
173170
this(
174171
tablePath,
175172
flussConf,
@@ -207,7 +204,6 @@ public FlinkSourceEnumerator(
207204
this.assignedPartitions = new HashMap<>(assignedPartitions);
208205
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
209206
this.streaming = streaming;
210-
this.partitionFilters = checkNotNull(partitionFilters);
211207
this.stoppingOffsetsInitializer =
212208
streaming ? new NoStoppingOffsetsInitializer() : OffsetsInitializer.latest();
213209
this.predicate = predicate;
@@ -296,24 +292,8 @@ private Set<PartitionInfo> listPartitions() {
296292

297293
try {
298294
List<PartitionInfo> partitionInfos = flussAdmin.listPartitionInfos(tablePath).get();
299-
if (predicate == null) {
300-
return new HashSet<>(partitionInfos);
301-
} else {
302-
Set<PartitionInfo> filteredPartitionInfos =
303-
partitionInfos.stream()
304-
.filter(
305-
partitionInfo ->
306-
predicate.test(
307-
convertPartitionInfoToInternalRow(
308-
partitionInfo)))
309-
.collect(Collectors.toSet());
310-
LOG.info(
311-
"Filtered partitions {} for table {} with predicate: {}",
312-
filteredPartitionInfos,
313-
tablePath,
314-
predicate);
315-
return filteredPartitionInfos;
316-
}
295+
partitionInfos = applyPartitionFilter(partitionInfos);
296+
return new HashSet<>(partitionInfos);
317297
} catch (Exception e) {
318298
throw new FlinkRuntimeException(
319299
String.format("Failed to list partitions for %s", tablePath),
@@ -323,30 +303,24 @@ private Set<PartitionInfo> listPartitions() {
323303

324304
/** Apply partition filter. */
325305
private List<PartitionInfo> applyPartitionFilter(List<PartitionInfo> partitionInfos) {
326-
if (!partitionFilters.isEmpty()) {
327-
return partitionInfos.stream()
328-
.filter(
329-
partitionInfo -> {
330-
Map<String, String> specMap =
331-
partitionInfo.getPartitionSpec().getSpecMap();
332-
// use getFields() instead of getFieldNames() to
333-
// avoid collection construction
334-
List<DataField> fields = tableInfo.getRowType().getFields();
335-
for (FieldEqual filter : partitionFilters) {
336-
String fieldName = fields.get(filter.fieldIndex).getName();
337-
String partitionValue = specMap.get(fieldName);
338-
if (partitionValue == null
339-
|| !filter.equalValue
340-
.toString()
341-
.equals(partitionValue)) {
342-
return false;
343-
}
344-
}
345-
return true;
346-
})
347-
.collect(Collectors.toList());
348-
}
349-
return partitionInfos;
306+
if (predicate == null) {
307+
return partitionInfos;
308+
} else {
309+
List<PartitionInfo> filteredPartitionInfos =
310+
partitionInfos.stream()
311+
.filter(
312+
partitionInfo ->
313+
predicate.test(
314+
convertPartitionInfoToInternalRow(
315+
partitionInfo)))
316+
.collect(Collectors.toList());
317+
LOG.info(
318+
"Filtered partitions {} for table {} with predicate: {}",
319+
filteredPartitionInfos,
320+
tablePath,
321+
predicate);
322+
return filteredPartitionInfos;
323+
}
350324
}
351325

352326
private InternalRow convertPartitionInfoToInternalRow(PartitionInfo partitionInfo) {

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlinkTableSourceITCase.java

Lines changed: 64 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -707,39 +707,6 @@ void testReadTimestampGreaterThanMaxTimestamp() throws Exception {
707707
}
708708
}
709709

710-
@Test
711-
void testStreamingReadPartitionPushDown() throws Exception {
712-
713-
tEnv.executeSql(
714-
"create table partitioned_table"
715-
+ " (a int not null, b varchar, c string, primary key (a, c) NOT ENFORCED) partitioned by (c) "
716-
+ "with ('table.auto-partition.enabled' = 'true', 'table.auto-partition.time-unit' = 'year')");
717-
TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table");
718-
719-
// write data into partitions and wait snapshot is done
720-
Map<Long, String> partitionNameById =
721-
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath);
722-
List<String> expectedRowValues =
723-
writeRowsToPartition(tablePath, partitionNameById.values()).stream()
724-
.filter(s -> s.contains("2025"))
725-
.collect(Collectors.toList());
726-
waitUtilAllBucketFinishSnapshot(admin, tablePath, partitionNameById.values());
727-
728-
org.apache.flink.util.CloseableIterator<Row> rowIter =
729-
tEnv.executeSql("select * from partitioned_table where c in( '2000','2001','2025')")
730-
.collect();
731-
732-
assertResultsIgnoreOrder(rowIter, expectedRowValues, false);
733-
734-
// then create some new partitions, and write rows to the new partitions
735-
List<String> newPartitions = Arrays.asList("2000", "2001");
736-
FlinkTestBase.createPartitions(
737-
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath, newPartitions);
738-
// write data to the new partitions
739-
expectedRowValues = writeRowsToPartition(tablePath, newPartitions);
740-
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
741-
}
742-
743710
// -------------------------------------------------------------------------------------
744711
// Fluss look source tests
745712
// -------------------------------------------------------------------------------------
@@ -1183,6 +1150,70 @@ private List<String> writeRowsToTwoPartition(TablePath tablePath, Collection<Str
11831150
return expectedRowValues;
11841151
}
11851152

1153+
@Test
1154+
void testStreamingReadPartitionPushDownWithInExpr() throws Exception {
1155+
1156+
// tEnv.executeSql(
1157+
// "create table partitioned_table"
1158+
// + " (a int not null, b varchar, c string, primary key (a, c) NOT
1159+
// ENFORCED) partitioned by (c) "
1160+
// + "with ('table.auto-partition.enabled' = 'true',
1161+
// 'table.auto-partition.time-unit' = 'year')");
1162+
// TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table");
1163+
//
1164+
// // write data into partitions and wait snapshot is done
1165+
// Map<Long, String> partitionNameById =
1166+
// waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
1167+
// tablePath);
1168+
// List<String> expectedRowValues =
1169+
// writeRowsToPartition(tablePath, partitionNameById.values()).stream()
1170+
// .filter(s -> s.contains("2025"))
1171+
// .collect(Collectors.toList());
1172+
// waitUtilAllBucketFinishSnapshot(admin, tablePath, partitionNameById.values());
1173+
//
1174+
// org.apache.flink.util.CloseableIterator<Row> rowIter =
1175+
// tEnv.executeSql("select * from partitioned_table where c in(
1176+
// '2000','2001','2025')")
1177+
// .collect();
1178+
//
1179+
// assertResultsIgnoreOrder(rowIter, expectedRowValues, false);
1180+
//
1181+
// // then create some new partitions, and write rows to the new partitions
1182+
// List<String> newPartitions = Arrays.asList("2000", "2001");
1183+
// FlinkTestBase.createPartitions(
1184+
// FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath, newPartitions);
1185+
// // write data to the new partitions
1186+
// expectedRowValues = writeRowsToPartition(tablePath, newPartitions);
1187+
// assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
1188+
1189+
tEnv.executeSql(
1190+
"create table partitioned_table_in"
1191+
+ " (a int not null, b varchar, c string, primary key (a, c) NOT ENFORCED) partitioned by (c) ");
1192+
TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table_in");
1193+
tEnv.executeSql("alter table partitioned_table_in add partition (c=2025)");
1194+
tEnv.executeSql("alter table partitioned_table_in add partition (c=2026)");
1195+
tEnv.executeSql("alter table partitioned_table_in add partition (c=2027)");
1196+
1197+
List<String> expectedRowValues =
1198+
writeRowsToPartition(conn, tablePath, Arrays.asList("2025", "2026", "2027"))
1199+
.stream()
1200+
.filter(s -> s.contains("2025") || s.contains("2026"))
1201+
.collect(Collectors.toList());
1202+
waitUtilAllBucketFinishSnapshot(admin, tablePath, Arrays.asList("2025", "2026", "2027"));
1203+
1204+
String plan =
1205+
tEnv.explainSql("select * from partitioned_table_in where c in ('2025','2026')");
1206+
assertThat(plan)
1207+
.contains(
1208+
"TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_in, filter=[OR(=(c, _UTF-16LE'2025'), =(c, _UTF-16LE'2026'))]]], fields=[a, b, c])");
1209+
1210+
org.apache.flink.util.CloseableIterator<Row> rowIter =
1211+
tEnv.executeSql("select * from partitioned_table_in where c in ('2025','2026')")
1212+
.collect();
1213+
1214+
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
1215+
}
1216+
11861217
private enum Caching {
11871218
ENABLE_CACHE,
11881219
DISABLE_CACHE

0 commit comments

Comments
 (0)