Skip to content

Commit 6c15772

Browse files
authored
[Paimon]support projection for paimon source (#6343)
1 parent a399ef4 commit 6c15772

File tree

13 files changed

+337
-61
lines changed

13 files changed

+337
-61
lines changed

docs/en/connector-v2/source/Paimon.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ source {
104104
warehouse = "/tmp/paimon"
105105
database = "full_type"
106106
table = "st_test"
107-
query = "select * from st_test where c_boolean= 'true' and c_tinyint > 116 and c_smallint = 15987 or c_decimal='2924137191386439303744.39292213'"
107+
query = "select c_boolean, c_tinyint from st_test where c_boolean= 'true' and c_tinyint > 116 and c_smallint = 15987 or c_decimal='2924137191386439303744.39292213'"
108108
}
109109
}
110110
```
@@ -161,4 +161,5 @@ source {
161161
### next version
162162

163163
- Add Paimon Source Connector
164+
- Support projection for Paimon Source
164165

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@
4545
import java.io.Closeable;
4646
import java.io.IOException;
4747
import java.util.List;
48+
import java.util.Map;
49+
import java.util.Objects;
50+
import java.util.stream.Collectors;
51+
import java.util.stream.IntStream;
4852

4953
@Slf4j
5054
public class PaimonCatalog implements Catalog, PaimonTable {
@@ -124,6 +128,16 @@ public CatalogTable getTable(TablePath tablePath)
124128
}
125129
}
126130

131+
public CatalogTable getTableWithProjection(TablePath tablePath, int[] projectionIndex)
132+
throws CatalogException, TableNotExistException {
133+
try {
134+
FileStoreTable paimonFileStoreTableTable = (FileStoreTable) getPaimonTable(tablePath);
135+
return toCatalogTable(paimonFileStoreTableTable, tablePath, projectionIndex);
136+
} catch (Exception e) {
137+
throw new TableNotExistException(this.catalogName, tablePath);
138+
}
139+
}
140+
127141
@Override
128142
public Table getPaimonTable(TablePath tablePath)
129143
throws CatalogException, TableNotExistException {
@@ -181,8 +195,26 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
181195

182196
private CatalogTable toCatalogTable(
183197
FileStoreTable paimonFileStoreTableTable, TablePath tablePath) {
198+
return toCatalogTable(paimonFileStoreTableTable, tablePath, null);
199+
}
200+
201+
private CatalogTable toCatalogTable(
202+
FileStoreTable paimonFileStoreTableTable, TablePath tablePath, int[] projectionIndex) {
184203
org.apache.paimon.schema.TableSchema schema = paimonFileStoreTableTable.schema();
185204
List<DataField> dataFields = schema.fields();
205+
if (!Objects.isNull(projectionIndex)) {
206+
Map<Integer, DataField> indexMap =
207+
IntStream.range(0, dataFields.size())
208+
.boxed()
209+
.collect(Collectors.toMap(i -> i, dataFields::get));
210+
211+
dataFields =
212+
java.util.Arrays.stream(projectionIndex)
213+
.distinct()
214+
.filter(indexMap::containsKey)
215+
.mapToObj(indexMap::get)
216+
.collect(Collectors.toList());
217+
}
186218
TableSchema.Builder builder = TableSchema.builder();
187219
dataFields.forEach(
188220
dataField -> {

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,6 @@ public class PaimonConfig implements Serializable {
8686
.noDefaultValue()
8787
.withDescription("The table you intend to access");
8888

89-
public static final Option<List<String>> READ_COLUMNS =
90-
Options.key("read_columns")
91-
.listType()
92-
.noDefaultValue()
93-
.withDescription("The read columns of the flink table store");
94-
9589
@Deprecated
9690
public static final Option<String> HDFS_SITE_PATH =
9791
Options.key("hdfs_site_path")

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,20 @@
2929
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
3030
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig;
3131
import org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter;
32+
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
3233

3334
import org.apache.paimon.predicate.Predicate;
3435
import org.apache.paimon.table.Table;
36+
import org.apache.paimon.types.RowType;
37+
38+
import net.sf.jsqlparser.statement.select.PlainSelect;
3539

3640
import java.util.Collections;
3741
import java.util.List;
42+
import java.util.Objects;
43+
44+
import static org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex;
45+
import static org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect;
3846

3947
/** Paimon connector source class. */
4048
public class PaimonSource
@@ -52,6 +60,8 @@ public class PaimonSource
5260

5361
private Predicate predicate;
5462

63+
private int[] projectionIndex;
64+
5565
private CatalogTable catalogTable;
5666

5767
public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog) {
@@ -61,12 +71,22 @@ public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog)
6171
TablePath.of(paimonSourceConfig.getNamespace(), paimonSourceConfig.getTable());
6272
this.catalogTable = paimonCatalog.getTable(tablePath);
6373
this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
64-
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
65-
// TODO: We can use this to realize the column projection feature later
74+
6675
String filterSql = readonlyConfig.get(PaimonSourceConfig.QUERY_SQL);
67-
this.predicate =
68-
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
69-
this.paimonTable.rowType(), filterSql);
76+
PlainSelect plainSelect = convertToPlainSelect(filterSql);
77+
RowType paimonRowType = this.paimonTable.rowType();
78+
String[] filedNames = paimonRowType.getFieldNames().toArray(new String[0]);
79+
if (!Objects.isNull(plainSelect)) {
80+
this.projectionIndex = convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect);
81+
if (!Objects.isNull(projectionIndex)) {
82+
this.catalogTable =
83+
paimonCatalog.getTableWithProjection(tablePath, projectionIndex);
84+
}
85+
this.predicate =
86+
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
87+
paimonRowType, plainSelect);
88+
}
89+
seaTunnelRowType = RowTypeConverter.convert(paimonRowType, projectionIndex);
7090
}
7191

7292
@Override
@@ -75,26 +95,27 @@ public String getPluginName() {
7595
}
7696

7797
@Override
78-
public Boundedness getBoundedness() {
79-
return Boundedness.BOUNDED;
98+
public List<CatalogTable> getProducedCatalogTables() {
99+
return Collections.singletonList(catalogTable);
80100
}
81101

82102
@Override
83-
public List<CatalogTable> getProducedCatalogTables() {
84-
return Collections.singletonList(catalogTable);
103+
public Boundedness getBoundedness() {
104+
return Boundedness.BOUNDED;
85105
}
86106

87107
@Override
88108
public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader(
89109
SourceReader.Context readerContext) throws Exception {
90-
91-
return new PaimonSourceReader(readerContext, paimonTable, seaTunnelRowType, predicate);
110+
return new PaimonSourceReader(
111+
readerContext, paimonTable, seaTunnelRowType, predicate, projectionIndex);
92112
}
93113

94114
@Override
95115
public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> createEnumerator(
96116
SourceSplitEnumerator.Context<PaimonSourceSplit> enumeratorContext) throws Exception {
97-
return new PaimonSourceSplitEnumerator(enumeratorContext, paimonTable, predicate);
117+
return new PaimonSourceSplitEnumerator(
118+
enumeratorContext, paimonTable, predicate, projectionIndex);
98119
}
99120

100121
@Override
@@ -103,6 +124,6 @@ public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> restoreEnumer
103124
PaimonSourceState checkpointState)
104125
throws Exception {
105126
return new PaimonSourceSplitEnumerator(
106-
enumeratorContext, paimonTable, checkpointState, predicate);
127+
enumeratorContext, paimonTable, checkpointState, predicate, projectionIndex);
107128
}
108129
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,19 @@ public class PaimonSourceReader implements SourceReader<SeaTunnelRow, PaimonSour
4949
private final SeaTunnelRowType seaTunnelRowType;
5050
private volatile boolean noMoreSplit;
5151
private final Predicate predicate;
52+
private int[] projection;
5253

5354
public PaimonSourceReader(
54-
Context context, Table table, SeaTunnelRowType seaTunnelRowType, Predicate predicate) {
55+
Context context,
56+
Table table,
57+
SeaTunnelRowType seaTunnelRowType,
58+
Predicate predicate,
59+
int[] projection) {
5560
this.context = context;
5661
this.table = table;
5762
this.seaTunnelRowType = seaTunnelRowType;
5863
this.predicate = predicate;
64+
this.projection = projection;
5965
}
6066

6167
@Override
@@ -76,6 +82,7 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
7682
// read logic
7783
try (final RecordReader<InternalRow> reader =
7884
table.newReadBuilder()
85+
.withProjection(projection)
7986
.withFilter(predicate)
8087
.newRead()
8188
.executeFilter()

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,23 +51,31 @@ public class PaimonSourceSplitEnumerator
5151

5252
private final Predicate predicate;
5353

54+
private int[] projection;
55+
5456
public PaimonSourceSplitEnumerator(
55-
Context<PaimonSourceSplit> context, Table table, Predicate predicate) {
57+
Context<PaimonSourceSplit> context,
58+
Table table,
59+
Predicate predicate,
60+
int[] projection) {
5661
this.context = context;
5762
this.table = table;
5863
this.assignedSplit = new HashSet<>();
5964
this.predicate = predicate;
65+
this.projection = projection;
6066
}
6167

6268
public PaimonSourceSplitEnumerator(
6369
Context<PaimonSourceSplit> context,
6470
Table table,
6571
PaimonSourceState sourceState,
66-
Predicate predicate) {
72+
Predicate predicate,
73+
int[] projection) {
6774
this.context = context;
6875
this.table = table;
6976
this.assignedSplit = sourceState.getAssignedSplits();
7077
this.predicate = predicate;
78+
this.projection = projection;
7179
}
7280

7381
@Override
@@ -154,9 +162,13 @@ private void assignSplit(int taskId) {
154162
/** Get all splits of table */
155163
private Set<PaimonSourceSplit> getTableSplits() {
156164
final Set<PaimonSourceSplit> tableSplits = new HashSet<>();
157-
// TODO Support columns projection
158165
final List<Split> splits =
159-
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
166+
table.newReadBuilder()
167+
.withProjection(projection)
168+
.withFilter(predicate)
169+
.newScan()
170+
.plan()
171+
.splits();
160172
splits.forEach(split -> tableSplits.add(new PaimonSourceSplit(split)));
161173
return tableSplits;
162174
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java

Lines changed: 77 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -51,48 +51,97 @@
5151
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
5252
import net.sf.jsqlparser.schema.Column;
5353
import net.sf.jsqlparser.statement.Statement;
54+
import net.sf.jsqlparser.statement.select.AllColumns;
5455
import net.sf.jsqlparser.statement.select.PlainSelect;
5556
import net.sf.jsqlparser.statement.select.Select;
5657
import net.sf.jsqlparser.statement.select.SelectBody;
58+
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
59+
import net.sf.jsqlparser.statement.select.SelectItem;
5760

5861
import java.math.BigDecimal;
62+
import java.util.ArrayList;
63+
import java.util.Arrays;
64+
import java.util.List;
5965
import java.util.Objects;
6066
import java.util.Optional;
67+
import java.util.stream.IntStream;
6168

6269
public class SqlToPaimonPredicateConverter {
6370

64-
public static Predicate convertSqlWhereToPaimonPredicate(RowType rowType, String query) {
71+
public static PlainSelect convertToPlainSelect(String query) {
72+
if (StringUtils.isBlank(query)) {
73+
return null;
74+
}
75+
Statement statement = null;
6576
try {
66-
if (StringUtils.isBlank(query)) {
67-
return null;
68-
}
69-
Statement statement = CCJSqlParserUtil.parse(query);
70-
// Confirm that the SQL statement is a Select statement
71-
if (!(statement instanceof Select)) {
72-
throw new IllegalArgumentException("Only SELECT statements are supported.");
73-
}
74-
Select select = (Select) statement;
75-
SelectBody selectBody = select.getSelectBody();
76-
if (!(selectBody instanceof PlainSelect)) {
77-
throw new IllegalArgumentException("Only simple SELECT statements are supported.");
78-
}
79-
PlainSelect plainSelect = (PlainSelect) selectBody;
80-
if (plainSelect.getHaving() != null
81-
|| plainSelect.getGroupBy() != null
82-
|| plainSelect.getOrderByElements() != null
83-
|| plainSelect.getLimit() != null) {
84-
throw new IllegalArgumentException(
85-
"Only SELECT statements with WHERE clause are supported. The Having, Group By, Order By, Limit clauses are currently unsupported.");
86-
}
87-
Expression whereExpression = plainSelect.getWhere();
88-
if (Objects.isNull(whereExpression)) {
77+
statement = CCJSqlParserUtil.parse(query);
78+
} catch (JSQLParserException e) {
79+
throw new IllegalArgumentException("Error parsing SQL.", e);
80+
}
81+
// Confirm that the SQL statement is a Select statement
82+
if (!(statement instanceof Select)) {
83+
throw new IllegalArgumentException("Only SELECT statements are supported.");
84+
}
85+
Select select = (Select) statement;
86+
SelectBody selectBody = select.getSelectBody();
87+
if (!(selectBody instanceof PlainSelect)) {
88+
throw new IllegalArgumentException("Only simple SELECT statements are supported.");
89+
}
90+
PlainSelect plainSelect = (PlainSelect) selectBody;
91+
if (plainSelect.getHaving() != null
92+
|| plainSelect.getGroupBy() != null
93+
|| plainSelect.getOrderByElements() != null
94+
|| plainSelect.getLimit() != null) {
95+
throw new IllegalArgumentException(
96+
"Only SELECT statements with WHERE clause are supported. The Having, Group By, Order By, Limit clauses are currently unsupported.");
97+
}
98+
return plainSelect;
99+
}
100+
101+
public static int[] convertSqlSelectToPaimonProjectionIndex(
102+
String[] fieldNames, PlainSelect plainSelect) {
103+
int[] projectionIndex = null;
104+
List<SelectItem> selectItems = plainSelect.getSelectItems();
105+
106+
List<String> columnNames = new ArrayList<>();
107+
for (SelectItem selectItem : selectItems) {
108+
if (selectItem instanceof AllColumns) {
89109
return null;
110+
} else if (selectItem instanceof SelectExpressionItem) {
111+
SelectExpressionItem selectExpressionItem = (SelectExpressionItem) selectItem;
112+
String columnName = selectExpressionItem.getExpression().toString();
113+
columnNames.add(columnName);
114+
} else {
115+
throw new IllegalArgumentException("Error encountered parsing query fields.");
90116
}
91-
PredicateBuilder builder = new PredicateBuilder(rowType);
92-
return parseExpressionToPredicate(builder, rowType, whereExpression);
93-
} catch (JSQLParserException e) {
94-
throw new IllegalArgumentException("Error parsing SQL WHERE clause", e);
95117
}
118+
119+
String[] columnNamesArray = columnNames.toArray(new String[0]);
120+
projectionIndex =
121+
IntStream.range(0, columnNamesArray.length)
122+
.map(
123+
i -> {
124+
String fieldName = columnNamesArray[i];
125+
int index = Arrays.asList(fieldNames).indexOf(fieldName);
126+
if (index == -1) {
127+
throw new IllegalArgumentException(
128+
"column " + fieldName + " does not exist.");
129+
}
130+
return index;
131+
})
132+
.toArray();
133+
134+
return projectionIndex;
135+
}
136+
137+
public static Predicate convertSqlWhereToPaimonPredicate(
138+
RowType rowType, PlainSelect plainSelect) {
139+
Expression whereExpression = plainSelect.getWhere();
140+
if (Objects.isNull(whereExpression)) {
141+
return null;
142+
}
143+
PredicateBuilder builder = new PredicateBuilder(rowType);
144+
return parseExpressionToPredicate(builder, rowType, whereExpression);
96145
}
97146

98147
private static Predicate parseExpressionToPredicate(

0 commit comments

Comments
 (0)