Skip to content

Commit 505c125

Browse files
authored
[Improve][Connector-V2] Add ElasticSearch type converter (#6546)
1 parent 470bb97 commit 505c125

File tree

17 files changed

+1104
-129
lines changed

17 files changed

+1104
-129
lines changed

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
3333
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
3434
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
35+
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
3536
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
37+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
3638
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
3739
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
3840

@@ -146,10 +148,8 @@ public CatalogTable getTable(TablePath tablePath)
146148
throws CatalogException, TableNotExistException {
147149
// Get the index mapping?
148150
checkNotNull(tablePath, "tablePath cannot be null");
149-
ElasticSearchDataTypeConvertor elasticSearchDataTypeConvertor =
150-
new ElasticSearchDataTypeConvertor();
151151
TableSchema.Builder builder = TableSchema.builder();
152-
Map<String, String> fieldTypeMapping =
152+
Map<String, BasicTypeDefine<EsType>> fieldTypeMapping =
153153
esRestClient.getFieldTypeMapping(tablePath.getTableName(), Collections.emptyList());
154154
buildColumnsWithErrorCheck(
155155
tablePath,
@@ -159,8 +159,9 @@ public CatalogTable getTable(TablePath tablePath)
159159
// todo: we need to add a new type TEXT or add length in STRING type
160160
return PhysicalColumn.of(
161161
nameAndType.getKey(),
162-
elasticSearchDataTypeConvertor.toSeaTunnelType(
163-
nameAndType.getKey(), nameAndType.getValue()),
162+
ElasticSearchTypeConverter.INSTANCE
163+
.convert(nameAndType.getValue())
164+
.getDataType(),
164165
(Long) null,
165166
true,
166167
null,

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java

+22-67
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,24 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
1919

20+
import org.apache.seatunnel.api.table.catalog.Column;
2021
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
21-
import org.apache.seatunnel.api.table.type.BasicType;
22-
import org.apache.seatunnel.api.table.type.LocalTimeType;
22+
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
23+
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
2324
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
24-
import org.apache.seatunnel.api.table.type.SqlType;
25+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
2526

2627
import com.google.auto.service.AutoService;
2728

2829
import java.util.Map;
2930

3031
import static com.google.common.base.Preconditions.checkNotNull;
3132

33+
/** @deprecated instead by {@link ElasticSearchTypeConverter} */
34+
@Deprecated
3235
@AutoService(DataTypeConvertor.class)
3336
public class ElasticSearchDataTypeConvertor implements DataTypeConvertor<String> {
3437

35-
public static final String STRING = "string";
36-
public static final String KEYWORD = "keyword";
37-
public static final String TEXT = "text";
38-
public static final String BOOLEAN = "boolean";
39-
public static final String BYTE = "byte";
40-
public static final String SHORT = "short";
41-
public static final String INTEGER = "integer";
42-
public static final String LONG = "long";
43-
public static final String FLOAT = "float";
44-
public static final String HALF_FLOAT = "half_float";
45-
public static final String DOUBLE = "double";
46-
public static final String DATE = "date";
47-
4838
@Override
4939
public SeaTunnelDataType<?> toSeaTunnelType(String field, String connectorDataType) {
5040
return toSeaTunnelType(field, connectorDataType, null);
@@ -54,34 +44,14 @@ public SeaTunnelDataType<?> toSeaTunnelType(String field, String connectorDataTy
5444
public SeaTunnelDataType<?> toSeaTunnelType(
5545
String field, String connectorDataType, Map<String, Object> dataTypeProperties) {
5646
checkNotNull(connectorDataType, "connectorDataType can not be null");
57-
switch (connectorDataType) {
58-
case STRING:
59-
return BasicType.STRING_TYPE;
60-
case KEYWORD:
61-
return BasicType.STRING_TYPE;
62-
case TEXT:
63-
return BasicType.STRING_TYPE;
64-
case BOOLEAN:
65-
return BasicType.BOOLEAN_TYPE;
66-
case BYTE:
67-
return BasicType.BYTE_TYPE;
68-
case SHORT:
69-
return BasicType.SHORT_TYPE;
70-
case INTEGER:
71-
return BasicType.INT_TYPE;
72-
case LONG:
73-
return BasicType.LONG_TYPE;
74-
case FLOAT:
75-
return BasicType.FLOAT_TYPE;
76-
case HALF_FLOAT:
77-
return BasicType.FLOAT_TYPE;
78-
case DOUBLE:
79-
return BasicType.DOUBLE_TYPE;
80-
case DATE:
81-
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
82-
default:
83-
return BasicType.STRING_TYPE;
84-
}
47+
BasicTypeDefine<EsType> typeDefine =
48+
BasicTypeDefine.<EsType>builder()
49+
.name(field)
50+
.columnType(connectorDataType)
51+
.dataType(connectorDataType)
52+
.build();
53+
54+
return ElasticSearchTypeConverter.INSTANCE.convert(typeDefine).getDataType();
8555
}
8656

8757
@Override
@@ -90,29 +60,14 @@ public String toConnectorType(
9060
SeaTunnelDataType<?> seaTunnelDataType,
9161
Map<String, Object> dataTypeProperties) {
9262
checkNotNull(seaTunnelDataType, "seaTunnelDataType can not be null");
93-
SqlType sqlType = seaTunnelDataType.getSqlType();
94-
switch (sqlType) {
95-
case STRING:
96-
return STRING;
97-
case BOOLEAN:
98-
return BOOLEAN;
99-
case BYTES:
100-
return BYTE;
101-
case TINYINT:
102-
return SHORT;
103-
case INT:
104-
return INTEGER;
105-
case BIGINT:
106-
return LONG;
107-
case FLOAT:
108-
return FLOAT;
109-
case DOUBLE:
110-
return DOUBLE;
111-
case TIMESTAMP:
112-
return DATE;
113-
default:
114-
return STRING;
115-
}
63+
Column column =
64+
PhysicalColumn.builder()
65+
.name(field)
66+
.dataType(seaTunnelDataType)
67+
.nullable(true)
68+
.build();
69+
BasicTypeDefine<EsType> typeDefine = ElasticSearchTypeConverter.INSTANCE.reconvert(column);
70+
return typeDefine.getColumnType();
11671
}
11772

11873
@Override

0 commit comments

Comments
 (0)