Skip to content

Commit ad8b70b

Browse files
authored
[flink] Support partition pushdown (only equals) in Flink connector (#937)
1 parent e096170 commit ad8b70b

File tree

8 files changed

+325
-49
lines changed

8 files changed

+325
-49
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.alibaba.fluss.flink.source.split.SourceSplitSerializer;
3030
import com.alibaba.fluss.flink.source.state.FlussSourceEnumeratorStateSerializer;
3131
import com.alibaba.fluss.flink.source.state.SourceEnumeratorState;
32+
import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual;
3233
import com.alibaba.fluss.metadata.TablePath;
3334
import com.alibaba.fluss.types.RowType;
3435

@@ -46,6 +47,10 @@
4647

4748
import javax.annotation.Nullable;
4849

50+
import java.util.List;
51+
52+
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
53+
4954
/** Flink source for Fluss. */
5055
public class FlinkSource<OUT>
5156
implements Source<OUT, SourceSplitBase, SourceEnumeratorState>, ResultTypeQueryable {
@@ -62,6 +67,8 @@ public class FlinkSource<OUT>
6267
private final boolean streaming;
6368
private final FlussDeserializationSchema<OUT> deserializationSchema;
6469

70+
private final List<FieldEqual> partitionFilters;
71+
6572
public FlinkSource(
6673
Configuration flussConf,
6774
TablePath tablePath,
@@ -72,7 +79,8 @@ public FlinkSource(
7279
OffsetsInitializer offsetsInitializer,
7380
long scanPartitionDiscoveryIntervalMs,
7481
FlussDeserializationSchema<OUT> deserializationSchema,
75-
boolean streaming) {
82+
boolean streaming,
83+
List<FieldEqual> partitionFilters) {
7684
this.flussConf = flussConf;
7785
this.tablePath = tablePath;
7886
this.hasPrimaryKey = hasPrimaryKey;
@@ -83,6 +91,7 @@ public FlinkSource(
8391
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
8492
this.deserializationSchema = deserializationSchema;
8593
this.streaming = streaming;
94+
this.partitionFilters = checkNotNull(partitionFilters);
8695
}
8796

8897
@Override
@@ -101,7 +110,8 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> createEnumerator(
101110
splitEnumeratorContext,
102111
offsetsInitializer,
103112
scanPartitionDiscoveryIntervalMs,
104-
streaming);
113+
streaming,
114+
partitionFilters);
105115
}
106116

107117
@Override
@@ -118,7 +128,8 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
118128
sourceEnumeratorState.getAssignedPartitions(),
119129
offsetsInitializer,
120130
scanPartitionDiscoveryIntervalMs,
121-
streaming);
131+
streaming,
132+
partitionFilters);
122133
}
123134

124135
@Override

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils;
2727
import com.alibaba.fluss.flink.utils.FlinkConversions;
2828
import com.alibaba.fluss.flink.utils.PushdownUtils;
29-
import com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion;
29+
import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual;
3030
import com.alibaba.fluss.metadata.MergeEngineType;
3131
import com.alibaba.fluss.metadata.TablePath;
3232
import com.alibaba.fluss.types.RowType;
@@ -74,6 +74,7 @@
7474
import java.util.List;
7575
import java.util.Map;
7676

77+
import static com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
7778
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
7879

7980
/** Flink table source to scan Fluss data. */
@@ -124,6 +125,8 @@ public class FlinkTableSource
124125

125126
private long limit = -1;
126127

128+
private List<FieldEqual> partitionFilters = Collections.emptyList();
129+
127130
public FlinkTableSource(
128131
TablePath tablePath,
129132
Configuration flussConfig,
@@ -263,7 +266,8 @@ public boolean isBounded() {
263266
offsetsInitializer,
264267
scanPartitionDiscoveryIntervalMs,
265268
new RowDataDeserializationSchema(),
266-
streaming);
269+
streaming,
270+
partitionFilters);
267271

268272
if (!streaming) {
269273
// return a bounded source provide to make planner happy,
@@ -357,6 +361,7 @@ public DynamicTableSource copy() {
357361
source.projectedFields = projectedFields;
358362
source.singleRowFilter = singleRowFilter;
359363
source.modificationScanType = modificationScanType;
364+
source.partitionFilters = partitionFilters;
360365
return source;
361366
}
362367

@@ -378,41 +383,57 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)
378383

379384
@Override
380385
public Result applyFilters(List<ResolvedExpression> filters) {
381-
// only apply pk equal filters when all the condition satisfied:
386+
List<ResolvedExpression> acceptedFilters = new ArrayList<>();
387+
List<ResolvedExpression> remainingFilters = new ArrayList<>();
388+
389+
// primary pushdown
382390
// (1) batch execution mode,
383391
// (2) default (full) startup mode,
384392
// (3) the table is a pk table,
385393
// (4) all filters are pk field equal expression
386-
if (streaming
387-
|| startupOptions.startupMode != FlinkConnectorOptions.ScanStartupMode.FULL
388-
|| !hasPrimaryKey()
389-
|| filters.size() != primaryKeyIndexes.length) {
390-
return Result.of(Collections.emptyList(), filters);
391-
}
392-
393-
List<ResolvedExpression> acceptedFilters = new ArrayList<>();
394-
List<ResolvedExpression> remainingFilters = new ArrayList<>();
395-
Map<Integer, LogicalType> primaryKeyTypes = getPrimaryKeyTypes();
396-
List<PushdownUtils.FieldEqual> fieldEquals =
397-
PushdownUtils.extractFieldEquals(
398-
filters,
399-
primaryKeyTypes,
400-
acceptedFilters,
401-
remainingFilters,
402-
ValueConversion.FLINK_INTERNAL_VALUE);
403-
int[] keyRowProjection = getKeyRowProjection();
404-
HashSet<Integer> visitedPkFields = new HashSet<>();
405-
GenericRowData lookupRow = new GenericRowData(primaryKeyIndexes.length);
406-
for (PushdownUtils.FieldEqual fieldEqual : fieldEquals) {
407-
lookupRow.setField(keyRowProjection[fieldEqual.fieldIndex], fieldEqual.equalValue);
408-
visitedPkFields.add(fieldEqual.fieldIndex);
409-
}
410-
// if not all primary key fields are in condition, we skip to pushdown
411-
if (!visitedPkFields.equals(primaryKeyTypes.keySet())) {
394+
if (!streaming
395+
&& startupOptions.startupMode == FlinkConnectorOptions.ScanStartupMode.FULL
396+
&& hasPrimaryKey()
397+
&& filters.size() == primaryKeyIndexes.length) {
398+
Map<Integer, LogicalType> primaryKeyTypes = getPrimaryKeyTypes();
399+
List<FieldEqual> fieldEquals =
400+
PushdownUtils.extractFieldEquals(
401+
filters,
402+
primaryKeyTypes,
403+
acceptedFilters,
404+
remainingFilters,
405+
FLINK_INTERNAL_VALUE);
406+
int[] keyRowProjection = getKeyRowProjection();
407+
HashSet<Integer> visitedPkFields = new HashSet<>();
408+
GenericRowData lookupRow = new GenericRowData(primaryKeyIndexes.length);
409+
for (FieldEqual fieldEqual : fieldEquals) {
410+
lookupRow.setField(keyRowProjection[fieldEqual.fieldIndex], fieldEqual.equalValue);
411+
visitedPkFields.add(fieldEqual.fieldIndex);
412+
}
413+
// if not all primary key fields are in condition, we skip to pushdown
414+
if (!visitedPkFields.equals(primaryKeyTypes.keySet())) {
415+
return Result.of(Collections.emptyList(), filters);
416+
}
417+
singleRowFilter = lookupRow;
418+
return Result.of(acceptedFilters, remainingFilters);
419+
} else if (isPartitioned()) {
420+
// dynamic partition pushdown
421+
Map<Integer, LogicalType> partitionKeyTypes = getPartitionKeyTypes();
422+
List<FieldEqual> fieldEquals =
423+
PushdownUtils.extractFieldEquals(
424+
filters,
425+
partitionKeyTypes,
426+
acceptedFilters,
427+
remainingFilters,
428+
FLINK_INTERNAL_VALUE);
429+
// partitions are filtered by string representations, convert the equals to string first
430+
fieldEquals = stringifyFieldEquals(fieldEquals);
431+
432+
this.partitionFilters = fieldEquals;
433+
return Result.of(acceptedFilters, remainingFilters);
434+
} else {
412435
return Result.of(Collections.emptyList(), filters);
413436
}
414-
singleRowFilter = lookupRow;
415-
return Result.of(acceptedFilters, remainingFilters);
416437
}
417438

418439
@Override
@@ -468,6 +489,24 @@ private Map<Integer, LogicalType> getPrimaryKeyTypes() {
468489
return pkTypes;
469490
}
470491

492+
private Map<Integer, LogicalType> getPartitionKeyTypes() {
493+
Map<Integer, LogicalType> partitionKeyTypes = new HashMap<>();
494+
for (int index : partitionKeyIndexes) {
495+
partitionKeyTypes.put(index, tableOutputType.getTypeAt(index));
496+
}
497+
return partitionKeyTypes;
498+
}
499+
500+
private List<FieldEqual> stringifyFieldEquals(List<FieldEqual> fieldEquals) {
501+
List<FieldEqual> serialize = new ArrayList<>();
502+
for (FieldEqual fieldEqual : fieldEquals) {
503+
// revisit this again when we support more data types for partition key
504+
serialize.add(
505+
new FieldEqual(fieldEqual.fieldIndex, (fieldEqual.equalValue).toString()));
506+
}
507+
return serialize;
508+
}
509+
471510
// projection from pk_field_index to index_in_pk
472511
private int[] getKeyRowProjection() {
473512
int[] projection = new int[tableOutputType.getFieldCount()];

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlussSource.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
import javax.annotation.Nullable;
2727

28+
import java.util.Collections;
29+
2830
/**
2931
* A Flink DataStream source implementation for reading data from Fluss tables.
3032
*
@@ -67,6 +69,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
6769
long scanPartitionDiscoveryIntervalMs,
6870
FlussDeserializationSchema<OUT> deserializationSchema,
6971
boolean streaming) {
72+
// TODO: Support partition pushDown in datastream
7073
super(
7174
flussConf,
7275
tablePath,
@@ -77,7 +80,8 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
7780
offsetsInitializer,
7881
scanPartitionDiscoveryIntervalMs,
7982
deserializationSchema,
80-
streaming);
83+
streaming,
84+
Collections.emptyList());
8185
}
8286

8387
/**

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@
3434
import com.alibaba.fluss.flink.source.split.LogSplit;
3535
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
3636
import com.alibaba.fluss.flink.source.state.SourceEnumeratorState;
37+
import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual;
3738
import com.alibaba.fluss.metadata.PartitionInfo;
3839
import com.alibaba.fluss.metadata.TableBucket;
3940
import com.alibaba.fluss.metadata.TableInfo;
4041
import com.alibaba.fluss.metadata.TablePath;
42+
import com.alibaba.fluss.types.DataField;
4143
import com.alibaba.fluss.utils.ExceptionUtils;
4244

4345
import org.apache.flink.annotation.VisibleForTesting;
@@ -125,6 +127,8 @@ public class FlinkSourceEnumerator
125127

126128
private volatile boolean closed = false;
127129

130+
private final List<FieldEqual> partitionFilters;
131+
128132
public FlinkSourceEnumerator(
129133
TablePath tablePath,
130134
Configuration flussConf,
@@ -133,7 +137,8 @@ public FlinkSourceEnumerator(
133137
SplitEnumeratorContext<SourceSplitBase> context,
134138
OffsetsInitializer startingOffsetsInitializer,
135139
long scanPartitionDiscoveryIntervalMs,
136-
boolean streaming) {
140+
boolean streaming,
141+
List<FieldEqual> partitionFilters) {
137142
this(
138143
tablePath,
139144
flussConf,
@@ -144,7 +149,8 @@ public FlinkSourceEnumerator(
144149
Collections.emptyMap(),
145150
startingOffsetsInitializer,
146151
scanPartitionDiscoveryIntervalMs,
147-
streaming);
152+
streaming,
153+
partitionFilters);
148154
}
149155

150156
public FlinkSourceEnumerator(
@@ -157,7 +163,8 @@ public FlinkSourceEnumerator(
157163
Map<Long, String> assignedPartitions,
158164
OffsetsInitializer startingOffsetsInitializer,
159165
long scanPartitionDiscoveryIntervalMs,
160-
boolean streaming) {
166+
boolean streaming,
167+
List<FieldEqual> partitionFilters) {
161168
this.tablePath = checkNotNull(tablePath);
162169
this.flussConf = checkNotNull(flussConf);
163170
this.hasPrimaryKey = hasPrimaryKey;
@@ -169,6 +176,7 @@ public FlinkSourceEnumerator(
169176
this.assignedPartitions = new HashMap<>(assignedPartitions);
170177
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
171178
this.streaming = streaming;
179+
this.partitionFilters = checkNotNull(partitionFilters);
172180
this.stoppingOffsetsInitializer =
173181
streaming ? new NoStoppingOffsetsInitializer() : OffsetsInitializer.latest();
174182
}
@@ -255,6 +263,7 @@ private List<SourceSplitBase> initNonPartitionedSplits() {
255263
private Set<PartitionInfo> listPartitions() {
256264
try {
257265
List<PartitionInfo> partitionInfos = flussAdmin.listPartitionInfos(tablePath).get();
266+
partitionInfos = applyPartitionFilter(partitionInfos);
258267
return new HashSet<>(partitionInfos);
259268
} catch (Exception e) {
260269
throw new FlinkRuntimeException(
@@ -263,6 +272,34 @@ private Set<PartitionInfo> listPartitions() {
263272
}
264273
}
265274

275+
/** Apply partition filter. */
276+
private List<PartitionInfo> applyPartitionFilter(List<PartitionInfo> partitionInfos) {
277+
if (!partitionFilters.isEmpty()) {
278+
return partitionInfos.stream()
279+
.filter(
280+
partitionInfo -> {
281+
Map<String, String> specMap =
282+
partitionInfo.getPartitionSpec().getSpecMap();
283+
// use getFields() instead of getFieldNames() to
284+
// avoid collection construction
285+
List<DataField> fields = tableInfo.getRowType().getFields();
286+
for (FieldEqual filter : partitionFilters) {
287+
String fieldName = fields.get(filter.fieldIndex).getName();
288+
String partitionValue = specMap.get(fieldName);
289+
if (partitionValue == null
290+
|| !filter.equalValue
291+
.toString()
292+
.equals(partitionValue)) {
293+
return false;
294+
}
295+
}
296+
return true;
297+
})
298+
.collect(Collectors.toList());
299+
}
300+
return partitionInfos;
301+
}
302+
266303
/** Init the splits for Fluss. */
267304
private void checkPartitionChanges(Set<PartitionInfo> partitionInfos, Throwable t) {
268305
if (closed) {

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/PushdownUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161

6262
import javax.annotation.Nullable;
6363

64+
import java.io.Serializable;
6465
import java.math.BigDecimal;
6566
import java.time.Instant;
6667
import java.time.LocalDateTime;
@@ -437,7 +438,8 @@ private static ListOffsetsResult listOffsets(
437438
// ------------------------------------------------------------------------------------------
438439

439440
/** A structure represents a source field equal literal expression. */
440-
public static class FieldEqual {
441+
public static class FieldEqual implements Serializable {
442+
private static final long serialVersionUID = 1L;
441443
public final int fieldIndex;
442444
public final Object equalValue;
443445

0 commit comments

Comments
 (0)