Skip to content

Commit fb6f6d5

Browse files
authored
[paimon] Paimon implement union read interfaces (#1505)
1 parent 7f70265 commit fb6f6d5

20 files changed

+1944
-122
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import com.alibaba.fluss.config.Configuration;
2121
import com.alibaba.fluss.lake.lakestorage.LakeStorage;
22+
import com.alibaba.fluss.lake.paimon.source.PaimonLakeSource;
23+
import com.alibaba.fluss.lake.paimon.source.PaimonSplit;
2224
import com.alibaba.fluss.lake.paimon.tiering.PaimonCommittable;
2325
import com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory;
2426
import com.alibaba.fluss.lake.paimon.tiering.PaimonWriteResult;
@@ -46,7 +48,7 @@ public PaimonLakeCatalog createLakeCatalog() {
4648
}
4749

4850
@Override
49-
public LakeSource<?> createLakeSource(TablePath tablePath) {
50-
throw new UnsupportedOperationException("Not implemented");
51+
public LakeSource<PaimonSplit> createLakeSource(TablePath tablePath) {
52+
return new PaimonLakeSource(paimonConfig, tablePath);
5153
}
5254
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.alibaba.fluss.lake.paimon.source;
20+
21+
import com.alibaba.fluss.row.TimestampLtz;
22+
import com.alibaba.fluss.row.TimestampNtz;
23+
24+
import org.apache.paimon.data.BinaryString;
25+
import org.apache.paimon.data.Decimal;
26+
import org.apache.paimon.data.InternalArray;
27+
import org.apache.paimon.data.InternalMap;
28+
import org.apache.paimon.data.InternalRow;
29+
import org.apache.paimon.data.Timestamp;
30+
import org.apache.paimon.data.variant.Variant;
31+
import org.apache.paimon.types.DataType;
32+
import org.apache.paimon.types.RowKind;
33+
import org.apache.paimon.types.RowType;
34+
35+
/** Adapter class for converting Fluss row to Paimon row. */
36+
public class FlussRowAsPaimonRow implements InternalRow {
37+
38+
protected com.alibaba.fluss.row.InternalRow internalRow;
39+
protected final RowType tableRowType;
40+
41+
public FlussRowAsPaimonRow(RowType tableTowType) {
42+
this.tableRowType = tableTowType;
43+
}
44+
45+
public FlussRowAsPaimonRow(
46+
com.alibaba.fluss.row.InternalRow internalRow, RowType tableTowType) {
47+
this.internalRow = internalRow;
48+
this.tableRowType = tableTowType;
49+
}
50+
51+
@Override
52+
public int getFieldCount() {
53+
return internalRow.getFieldCount();
54+
}
55+
56+
@Override
57+
public RowKind getRowKind() {
58+
return RowKind.INSERT;
59+
}
60+
61+
@Override
62+
public void setRowKind(RowKind rowKind) {
63+
// do nothing
64+
}
65+
66+
@Override
67+
public boolean isNullAt(int pos) {
68+
return internalRow.isNullAt(pos);
69+
}
70+
71+
@Override
72+
public boolean getBoolean(int pos) {
73+
return internalRow.getBoolean(pos);
74+
}
75+
76+
@Override
77+
public byte getByte(int pos) {
78+
return internalRow.getByte(pos);
79+
}
80+
81+
@Override
82+
public short getShort(int pos) {
83+
return internalRow.getShort(pos);
84+
}
85+
86+
@Override
87+
public int getInt(int pos) {
88+
return internalRow.getInt(pos);
89+
}
90+
91+
@Override
92+
public long getLong(int pos) {
93+
return internalRow.getLong(pos);
94+
}
95+
96+
@Override
97+
public float getFloat(int pos) {
98+
return internalRow.getFloat(pos);
99+
}
100+
101+
@Override
102+
public double getDouble(int pos) {
103+
return internalRow.getDouble(pos);
104+
}
105+
106+
@Override
107+
public BinaryString getString(int pos) {
108+
return BinaryString.fromBytes(internalRow.getString(pos).toBytes());
109+
}
110+
111+
@Override
112+
public Decimal getDecimal(int pos, int precision, int scale) {
113+
com.alibaba.fluss.row.Decimal flussDecimal = internalRow.getDecimal(pos, precision, scale);
114+
if (flussDecimal.isCompact()) {
115+
return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), precision, scale);
116+
} else {
117+
return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), precision, scale);
118+
}
119+
}
120+
121+
@Override
122+
public Timestamp getTimestamp(int pos, int precision) {
123+
DataType paimonTimestampType = tableRowType.getTypeAt(pos);
124+
switch (paimonTimestampType.getTypeRoot()) {
125+
case TIMESTAMP_WITHOUT_TIME_ZONE:
126+
if (TimestampNtz.isCompact(precision)) {
127+
return Timestamp.fromEpochMillis(
128+
internalRow.getTimestampNtz(pos, precision).getMillisecond());
129+
} else {
130+
TimestampNtz timestampNtz = internalRow.getTimestampNtz(pos, precision);
131+
return Timestamp.fromEpochMillis(
132+
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
133+
}
134+
135+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
136+
if (TimestampLtz.isCompact(precision)) {
137+
return Timestamp.fromEpochMillis(
138+
internalRow.getTimestampLtz(pos, precision).getEpochMillisecond());
139+
} else {
140+
TimestampLtz timestampLtz = internalRow.getTimestampLtz(pos, precision);
141+
return Timestamp.fromEpochMillis(
142+
timestampLtz.getEpochMillisecond(),
143+
timestampLtz.getNanoOfMillisecond());
144+
}
145+
default:
146+
throw new UnsupportedOperationException(
147+
"Unsupported data type to get timestamp: " + paimonTimestampType);
148+
}
149+
}
150+
151+
@Override
152+
public byte[] getBinary(int pos) {
153+
return internalRow.getBytes(pos);
154+
}
155+
156+
@Override
157+
public Variant getVariant(int i) {
158+
throw new UnsupportedOperationException(
159+
"getVariant is not support for Fluss record currently.");
160+
}
161+
162+
@Override
163+
public InternalArray getArray(int pos) {
164+
throw new UnsupportedOperationException(
165+
"getArray is not support for Fluss record currently.");
166+
}
167+
168+
@Override
169+
public InternalMap getMap(int pos) {
170+
throw new UnsupportedOperationException(
171+
"getMap is not support for Fluss record currently.");
172+
}
173+
174+
@Override
175+
public InternalRow getRow(int pos, int pos1) {
176+
throw new UnsupportedOperationException(
177+
"getRow is not support for Fluss record currently.");
178+
}
179+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.alibaba.fluss.lake.paimon.source;
20+
21+
import com.alibaba.fluss.config.Configuration;
22+
import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
23+
import com.alibaba.fluss.lake.source.LakeSource;
24+
import com.alibaba.fluss.lake.source.Planner;
25+
import com.alibaba.fluss.lake.source.RecordReader;
26+
import com.alibaba.fluss.metadata.TablePath;
27+
import com.alibaba.fluss.predicate.Predicate;
28+
29+
import org.apache.paimon.catalog.Catalog;
30+
import org.apache.paimon.catalog.CatalogContext;
31+
import org.apache.paimon.catalog.CatalogFactory;
32+
import org.apache.paimon.options.Options;
33+
import org.apache.paimon.table.FileStoreTable;
34+
35+
import javax.annotation.Nullable;
36+
37+
import java.io.IOException;
38+
import java.util.Collections;
39+
import java.util.List;
40+
41+
import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
42+
43+
/**
44+
* Paimon Lake format implementation of {@link com.alibaba.fluss.lake.source.LakeSource} for reading
45+
* paimon table.
46+
*/
47+
public class PaimonLakeSource implements LakeSource<PaimonSplit> {
48+
49+
private final Configuration paimonConfig;
50+
private final TablePath tablePath;
51+
52+
private @Nullable int[][] project;
53+
private @Nullable org.apache.paimon.predicate.Predicate predicate;
54+
55+
public PaimonLakeSource(Configuration paimonConfig, TablePath tablePath) {
56+
this.paimonConfig = paimonConfig;
57+
this.tablePath = tablePath;
58+
}
59+
60+
@Override
61+
public void withProject(int[][] project) {
62+
this.project = project;
63+
}
64+
65+
@Override
66+
public void withLimit(int limit) {
67+
throw new UnsupportedOperationException("Not impl.");
68+
}
69+
70+
@Override
71+
public FilterPushDownResult withFilters(List<Predicate> predicates) {
72+
return FilterPushDownResult.of(Collections.emptyList(), predicates);
73+
}
74+
75+
@Override
76+
public Planner<PaimonSplit> createPlanner(PlannerContext plannerContext) {
77+
return new PaimonSplitPlanner(
78+
paimonConfig, tablePath, predicate, plannerContext.snapshotId());
79+
}
80+
81+
@Override
82+
public RecordReader createRecordReader(ReaderContext<PaimonSplit> context) throws IOException {
83+
try (Catalog catalog = getCatalog()) {
84+
FileStoreTable fileStoreTable = getTable(catalog, tablePath);
85+
if (fileStoreTable.primaryKeys().isEmpty()) {
86+
return new PaimonRecordReader(
87+
fileStoreTable, context.lakeSplit(), project, predicate);
88+
} else {
89+
return new PaimonSortedRecordReader(
90+
fileStoreTable, context.lakeSplit(), project, predicate);
91+
}
92+
} catch (Exception e) {
93+
throw new IOException("Fail to create record reader.", e);
94+
}
95+
}
96+
97+
@Override
98+
public SimpleVersionedSerializer<PaimonSplit> getSplitSerializer() {
99+
return new PaimonSplitSerializer();
100+
}
101+
102+
private Catalog getCatalog() {
103+
return CatalogFactory.createCatalog(
104+
CatalogContext.create(Options.fromMap(paimonConfig.toMap())));
105+
}
106+
107+
private FileStoreTable getTable(Catalog catalog, TablePath tablePath) throws Exception {
108+
return (FileStoreTable) catalog.getTable(toPaimon(tablePath));
109+
}
110+
}

0 commit comments

Comments
 (0)