Skip to content

Commit 9f0fbf5

Browse files
authored
[lake/iceberg] Iceberg read implementation supports filter push down (apache#1715)
1 parent a8cef41 commit 9f0fbf5

File tree

14 files changed

+853
-155
lines changed

14 files changed

+853
-155
lines changed
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.iceberg.source;
19+
20+
import org.apache.fluss.row.InternalRow;
21+
import org.apache.fluss.types.BigIntType;
22+
import org.apache.fluss.types.BinaryType;
23+
import org.apache.fluss.types.BooleanType;
24+
import org.apache.fluss.types.BytesType;
25+
import org.apache.fluss.types.CharType;
26+
import org.apache.fluss.types.DataType;
27+
import org.apache.fluss.types.DateType;
28+
import org.apache.fluss.types.DecimalType;
29+
import org.apache.fluss.types.DoubleType;
30+
import org.apache.fluss.types.FloatType;
31+
import org.apache.fluss.types.IntType;
32+
import org.apache.fluss.types.LocalZonedTimestampType;
33+
import org.apache.fluss.types.RowType;
34+
import org.apache.fluss.types.SmallIntType;
35+
import org.apache.fluss.types.StringType;
36+
import org.apache.fluss.types.TimeType;
37+
import org.apache.fluss.types.TimestampType;
38+
import org.apache.fluss.types.TinyIntType;
39+
import org.apache.fluss.utils.DateTimeUtils;
40+
41+
import org.apache.iceberg.data.Record;
42+
import org.apache.iceberg.types.Types;
43+
44+
import java.nio.ByteBuffer;
45+
import java.time.Instant;
46+
import java.time.OffsetDateTime;
47+
import java.time.ZoneOffset;
48+
import java.util.Map;
49+
50+
/** Wrap Fluss {@link InternalRow} as Iceberg {@link Record}. */
51+
public class FlussRowAsIcebergRecord implements Record {
52+
53+
protected InternalRow internalRow;
54+
protected final Types.StructType structType;
55+
protected final RowType flussRowType;
56+
private final FlussRowToIcebergFieldConverter[] fieldConverters;
57+
58+
public FlussRowAsIcebergRecord(Types.StructType structType, RowType flussRowType) {
59+
this.structType = structType;
60+
this.flussRowType = flussRowType;
61+
fieldConverters = new FlussRowToIcebergFieldConverter[flussRowType.getFieldCount()];
62+
for (int pos = 0; pos < flussRowType.getFieldCount(); pos++) {
63+
DataType flussType = flussRowType.getTypeAt(pos);
64+
fieldConverters[pos] = createTypeConverter(flussType, pos);
65+
}
66+
}
67+
68+
public FlussRowAsIcebergRecord(
69+
Types.StructType structType, RowType flussRowType, InternalRow internalRow) {
70+
this(structType, flussRowType);
71+
this.internalRow = internalRow;
72+
}
73+
74+
@Override
75+
public Types.StructType struct() {
76+
return structType;
77+
}
78+
79+
@Override
80+
public Object getField(String name) {
81+
return get(structType.fields().indexOf(structType.field(name)));
82+
}
83+
84+
@Override
85+
public void setField(String name, Object value) {
86+
throw new UnsupportedOperationException("method setField is not supported.");
87+
}
88+
89+
@Override
90+
public Object get(int pos) {
91+
// handle normal columns
92+
if (internalRow.isNullAt(pos)) {
93+
return null;
94+
}
95+
return fieldConverters[pos].convert(internalRow);
96+
}
97+
98+
@Override
99+
public Record copy() {
100+
throw new UnsupportedOperationException("method copy is not supported.");
101+
}
102+
103+
@Override
104+
public Record copy(Map<String, Object> overwriteValues) {
105+
throw new UnsupportedOperationException("method copy is not supported.");
106+
}
107+
108+
@Override
109+
public int size() {
110+
return structType.fields().size();
111+
}
112+
113+
@Override
114+
public <T> T get(int pos, Class<T> javaClass) {
115+
Object value = get(pos);
116+
if (value == null || javaClass.isInstance(value)) {
117+
return javaClass.cast(value);
118+
} else {
119+
throw new IllegalStateException(
120+
"Not an instance of " + javaClass.getName() + ": " + value);
121+
}
122+
}
123+
124+
@Override
125+
public <T> void set(int pos, T value) {
126+
throw new UnsupportedOperationException("method set is not supported.");
127+
}
128+
129+
private interface FlussRowToIcebergFieldConverter {
130+
Object convert(InternalRow value);
131+
}
132+
133+
private FlussRowToIcebergFieldConverter createTypeConverter(DataType flussType, int pos) {
134+
if (flussType instanceof BooleanType) {
135+
return row -> row.getBoolean(pos);
136+
} else if (flussType instanceof TinyIntType) {
137+
return row -> (int) row.getByte(pos);
138+
} else if (flussType instanceof SmallIntType) {
139+
return row -> (int) row.getShort(pos);
140+
} else if (flussType instanceof IntType) {
141+
return row -> row.getInt(pos);
142+
} else if (flussType instanceof BigIntType) {
143+
return row -> row.getLong(pos);
144+
} else if (flussType instanceof FloatType) {
145+
return row -> row.getFloat(pos);
146+
} else if (flussType instanceof DoubleType) {
147+
return row -> row.getDouble(pos);
148+
} else if (flussType instanceof StringType) {
149+
return row -> row.getString(pos).toString();
150+
} else if (flussType instanceof CharType) {
151+
CharType charType = (CharType) flussType;
152+
return row -> row.getChar(pos, charType.getLength()).toString();
153+
} else if (flussType instanceof BytesType || flussType instanceof BinaryType) {
154+
return row -> ByteBuffer.wrap(row.getBytes(pos));
155+
} else if (flussType instanceof DecimalType) {
156+
DecimalType decimalType = (DecimalType) flussType;
157+
return row ->
158+
row.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale())
159+
.toBigDecimal();
160+
} else if (flussType instanceof LocalZonedTimestampType) {
161+
LocalZonedTimestampType ltzType = (LocalZonedTimestampType) flussType;
162+
return row ->
163+
toIcebergTimestampLtz(
164+
row.getTimestampLtz(pos, ltzType.getPrecision()).toInstant());
165+
} else if (flussType instanceof TimestampType) {
166+
TimestampType tsType = (TimestampType) flussType;
167+
return row -> row.getTimestampNtz(pos, tsType.getPrecision()).toLocalDateTime();
168+
} else if (flussType instanceof DateType) {
169+
return row -> DateTimeUtils.toLocalDate(row.getInt(pos));
170+
} else if (flussType instanceof TimeType) {
171+
return row -> DateTimeUtils.toLocalTime(row.getInt(pos));
172+
} else {
173+
throw new UnsupportedOperationException(
174+
"Unsupported data type conversion for Fluss type: "
175+
+ flussType.getClass().getSimpleName());
176+
}
177+
}
178+
179+
private OffsetDateTime toIcebergTimestampLtz(Instant instant) {
180+
return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
181+
}
182+
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.fluss.lake.iceberg.source;
2020

2121
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.lake.iceberg.utils.FlussToIcebergPredicateConverter;
2223
import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
2324
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
2425
import org.apache.fluss.lake.source.LakeSource;
@@ -27,14 +28,18 @@
2728
import org.apache.fluss.metadata.TablePath;
2829
import org.apache.fluss.predicate.Predicate;
2930

31+
import org.apache.iceberg.Schema;
3032
import org.apache.iceberg.Table;
3133
import org.apache.iceberg.catalog.Catalog;
34+
import org.apache.iceberg.expressions.Expression;
35+
import org.apache.iceberg.expressions.Expressions;
3236

3337
import javax.annotation.Nullable;
3438

3539
import java.io.IOException;
36-
import java.util.Collections;
40+
import java.util.ArrayList;
3741
import java.util.List;
42+
import java.util.Optional;
3843

3944
import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
4045

@@ -44,6 +49,7 @@ public class IcebergLakeSource implements LakeSource<IcebergSplit> {
4449
private final Configuration icebergConfig;
4550
private final TablePath tablePath;
4651
private @Nullable int[][] project;
52+
private @Nullable Expression filter;
4753

4854
public IcebergLakeSource(Configuration icebergConfig, TablePath tablePath) {
4955
this.icebergConfig = icebergConfig;
@@ -62,13 +68,29 @@ public void withLimit(int limit) {
6268

6369
@Override
6470
public FilterPushDownResult withFilters(List<Predicate> predicates) {
65-
// TODO: Support filter push down. #1676
66-
return FilterPushDownResult.of(Collections.emptyList(), predicates);
71+
List<Predicate> unConsumedPredicates = new ArrayList<>();
72+
List<Predicate> consumedPredicates = new ArrayList<>();
73+
List<Expression> converted = new ArrayList<>();
74+
Schema schema = getSchema(tablePath);
75+
for (Predicate predicate : predicates) {
76+
Optional<Expression> optPredicate =
77+
FlussToIcebergPredicateConverter.convert(schema, predicate);
78+
if (optPredicate.isPresent()) {
79+
consumedPredicates.add(predicate);
80+
converted.add(optPredicate.get());
81+
} else {
82+
unConsumedPredicates.add(predicate);
83+
}
84+
}
85+
if (!converted.isEmpty()) {
86+
filter = converted.stream().reduce(Expressions::and).orElse(null);
87+
}
88+
return FilterPushDownResult.of(consumedPredicates, unConsumedPredicates);
6789
}
6890

6991
@Override
7092
public Planner<IcebergSplit> createPlanner(PlannerContext context) throws IOException {
71-
return new IcebergSplitPlanner(icebergConfig, tablePath, context.snapshotId());
93+
return new IcebergSplitPlanner(icebergConfig, tablePath, context.snapshotId(), filter);
7294
}
7395

7496
@Override
@@ -82,4 +104,9 @@ public RecordReader createRecordReader(ReaderContext<IcebergSplit> context) thro
82104
public SimpleVersionedSerializer<IcebergSplit> getSplitSerializer() {
83105
return new IcebergSplitSerializer();
84106
}
107+
108+
private Schema getSchema(TablePath tablePath) {
109+
Catalog catalog = IcebergCatalogUtils.createIcebergCatalog(icebergConfig);
110+
return catalog.loadTable(toIceberg(tablePath)).schema();
111+
}
85112
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,13 @@
4545
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
4646
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
4747

48-
/** Iceberg record reader. */
48+
/**
49+
* Iceberg record reader. The filter is applied during the plan phase of IcebergSplitPlanner, so the
50+
* RecordReader does not need to apply the filter again.
51+
*
52+
* <p>Refer to {@link org.apache.iceberg.data.GenericReader#open(FileScanTask)} and {@link
53+
* org.apache.iceberg.Scan#ignoreResiduals()} for details.
54+
*/
4955
public class IcebergRecordReader implements RecordReader {
5056
protected IcebergRecordAsFlussRecordIterator iterator;
5157
protected @Nullable int[][] project;

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@
2727
import org.apache.iceberg.PartitionField;
2828
import org.apache.iceberg.PartitionSpec;
2929
import org.apache.iceberg.Table;
30+
import org.apache.iceberg.TableScan;
3031
import org.apache.iceberg.catalog.Catalog;
32+
import org.apache.iceberg.expressions.Expression;
3133
import org.apache.iceberg.io.CloseableIterable;
3234

35+
import javax.annotation.Nullable;
36+
3337
import java.io.IOException;
3438
import java.util.ArrayList;
3539
import java.util.Collections;
@@ -47,11 +51,14 @@ public class IcebergSplitPlanner implements Planner<IcebergSplit> {
4751
private final Configuration icebergConfig;
4852
private final TablePath tablePath;
4953
private final long snapshotId;
54+
private final @Nullable Expression filter;
5055

51-
public IcebergSplitPlanner(Configuration icebergConfig, TablePath tablePath, long snapshotId) {
56+
public IcebergSplitPlanner(
57+
Configuration icebergConfig, TablePath tablePath, long snapshotId, Expression filter) {
5258
this.icebergConfig = icebergConfig;
5359
this.tablePath = tablePath;
5460
this.snapshotId = snapshotId;
61+
this.filter = filter;
5562
}
5663

5764
@Override
@@ -61,12 +68,11 @@ public List<IcebergSplit> plan() throws IOException {
6168
Table table = catalog.loadTable(toIceberg(tablePath));
6269
Function<FileScanTask, List<String>> partitionExtract = createPartitionExtractor(table);
6370
Function<FileScanTask, Integer> bucketExtractor = createBucketExtractor(table);
64-
try (CloseableIterable<FileScanTask> tasks =
65-
table.newScan()
66-
.useSnapshot(snapshotId)
67-
.includeColumnStats()
68-
.ignoreResiduals()
69-
.planFiles()) {
71+
TableScan tableScan = table.newScan().useSnapshot(snapshotId).includeColumnStats();
72+
if (filter != null) {
73+
tableScan = tableScan.filter(filter);
74+
}
75+
try (CloseableIterable<FileScanTask> tasks = tableScan.planFiles()) {
7076
tasks.forEach(
7177
task ->
7278
splits.add(

0 commit comments

Comments
 (0)