Skip to content

Commit 53db9c5

Browse files
authored
[Feature][Connector-V2] Support `` for the SQL To Paimon converter (#10206)
1 parent b9e5054 commit 53db9c5

File tree

2 files changed

+32
-8
lines changed

2 files changed

+32
-8
lines changed

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@
7474
import java.util.Optional;
7575
import java.util.regex.Matcher;
7676
import java.util.regex.Pattern;
77-
import java.util.stream.IntStream;
7877

7978
public class SqlToPaimonPredicateConverter {
8079

@@ -123,12 +122,11 @@ public static int[] convertSqlSelectToPaimonProjectionIndex(
123122
}
124123
}
125124

126-
String[] columnNamesArray = columnNames.toArray(new String[0]);
127125
projectionIndex =
128-
IntStream.range(0, columnNamesArray.length)
129-
.map(
130-
i -> {
131-
String fieldName = columnNamesArray[i];
126+
columnNames.stream()
127+
.mapToInt(
128+
columnName -> {
129+
String fieldName = columnName.replace("`", "");
132130
int index = Arrays.asList(fieldNames).indexOf(fieldName);
133131
if (index == -1) {
134132
throw new IllegalArgumentException(
@@ -360,7 +358,7 @@ private static Object convertValueByPaimonDataType(
360358
RowType rowType, String columnName, Object jsqlParserDataTypeValue) {
361359
Optional<DataField> theFiled =
362360
rowType.getFields().stream()
363-
.filter(field -> field.name().equalsIgnoreCase(columnName))
361+
.filter(field -> field.name().equalsIgnoreCase(columnName.replace("`", "")))
364362
.findFirst();
365363
String strValue = jsqlParserDataTypeValue.toString();
366364
if (theFiled.isPresent()) {
@@ -424,7 +422,7 @@ private static Object getJSQLParserDataTypeValue(Expression expression) {
424422
}
425423

426424
private static int getColumnIndex(PredicateBuilder builder, Column column) {
427-
int index = builder.indexOf(column.getColumnName());
425+
int index = builder.indexOf(column.getColumnName().replace("`", ""));
428426
if (index == -1) {
429427
throw new IllegalArgumentException(
430428
String.format("The column named [%s] is not exists", column.getColumnName()));

seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,4 +318,30 @@ public void testParseDynamicOptions() {
318318
dynamicOptions.get("incremental-between-timestamp"));
319319
assertEquals("my-tag", dynamicOptions.get("scan.tag-name"));
320320
}
321+
322+
@Test
323+
public void testPiamonQuoteIdentifier() {
324+
String query =
325+
"SELECT `decimal_col`, `int_col`, `char_col`, `timestamp_col`, `boolean_col`, time_col FROM table WHERE int_col > 3 OR `double_col` < 6.6 ";
326+
327+
PlainSelect plainSelect = convertToPlainSelect(query);
328+
assertNotNull(plainSelect);
329+
330+
int[] fieldIndex =
331+
SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex(
332+
rowType.getFieldNames().toArray(new String[0]), plainSelect);
333+
assertNotNull(fieldIndex);
334+
assertEquals(6, fieldIndex.length);
335+
assertEquals(4, fieldIndex[0]);
336+
assertEquals(7, fieldIndex[1]);
337+
assertEquals(0, fieldIndex[2]);
338+
assertEquals(12, fieldIndex[3]);
339+
assertEquals(2, fieldIndex[4]);
340+
assertEquals(13, fieldIndex[5]);
341+
342+
Predicate predicate =
343+
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
344+
rowType, plainSelect);
345+
assertNotNull(predicate);
346+
}
321347
}

0 commit comments

Comments
 (0)