Skip to content

Commit eaa15e4

Browse files
authored
[Feature][connector-elasticsearch] elasticsearch support nested type (#8462)
1 parent 5560e3d commit eaa15e4

File tree

8 files changed

+212
-3
lines changed

8 files changed

+212
-3
lines changed

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class ArrayType<T, E> implements SeaTunnelDataType<T> {
5656
private final Class<T> arrayClass;
5757
private final SeaTunnelDataType<E> elementType;
5858

59-
protected ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType) {
59+
public ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType) {
6060
this.arrayClass = arrayClass;
6161
this.elementType = elementType;
6262
}

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java

+10
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,16 @@ private int getBytesForValue(Object v) {
305305
return getBytesForArray(v, BasicType.FLOAT_TYPE);
306306
case "Double[]":
307307
return getBytesForArray(v, BasicType.DOUBLE_TYPE);
308+
case "Map[]":
309+
int sizeMaps = 0;
310+
for (Map o : (Map[]) v) {
311+
for (Map.Entry<?, ?> entry : ((Map<?, ?>) o).entrySet()) {
312+
sizeMaps +=
313+
getBytesForValue(entry.getKey())
314+
+ getBytesForValue(entry.getValue());
315+
}
316+
}
317+
return sizeMaps;
308318
case "HashMap":
309319
case "LinkedHashMap":
310320
int size = 0;

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG;
6565
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG_RANGE;
6666
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.MATCH_ONLY_TEXT;
67-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.NESTED;
6867
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT;
6968
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.PERCOLATOR;
7069
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.POINT;
@@ -150,6 +149,12 @@ public Column convert(BasicTypeDefine<EsType> typeDefine) {
150149
});
151150
builder.dataType(rowType);
152151
break;
152+
case EsType.NESTED:
153+
builder.dataType(
154+
new ArrayType<>(
155+
Map[].class,
156+
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE)));
157+
break;
153158
case INTEGER:
154159
case TOKEN_COUNT:
155160
builder.dataType(BasicType.INT_TYPE);
@@ -207,7 +212,6 @@ public Column convert(BasicTypeDefine<EsType> typeDefine) {
207212
case COMPLETION:
208213
case STRING:
209214
case GEO_SHAPE:
210-
case NESTED:
211215
case PERCOLATOR:
212216
case POINT:
213217
case RANK_FEATURES:

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java

+30
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.HashMap;
4747
import java.util.List;
4848
import java.util.Map;
49+
import java.util.Objects;
4950

5051
import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
5152
import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
@@ -177,6 +178,35 @@ Object convertValue(SeaTunnelDataType<?> fieldType, String fieldValue)
177178
} else if (fieldType instanceof ArrayType) {
178179
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
179180
SeaTunnelDataType<?> elementType = arrayType.getElementType();
181+
if (elementType instanceof MapType) {
182+
MapType<?, ?> mapType = (MapType<?, ?>) elementType;
183+
List<Map> mapList = JsonUtils.toList(fieldValue, Map.class);
184+
Object arr = Array.newInstance(elementType.getTypeClass(), mapList.size());
185+
SeaTunnelDataType<?> keyType = mapType.getKeyType();
186+
SeaTunnelDataType<?> valueType = mapType.getValueType();
187+
for (int i = 0; i < mapList.size(); i++) {
188+
Map<String, String> map = mapList.get(i);
189+
Map<Object, Object> convertMap = new HashMap<>();
190+
for (Map.Entry entry : map.entrySet()) {
191+
Object convertKey =
192+
convertValue(
193+
keyType,
194+
Objects.isNull(entry.getKey())
195+
? null
196+
: String.valueOf(entry.getKey()));
197+
Object convertValue =
198+
convertValue(
199+
valueType,
200+
Objects.isNull(entry.getValue())
201+
? null
202+
: String.valueOf(entry.getValue()));
203+
convertMap.put(convertKey, convertValue);
204+
}
205+
Array.set(arr, i, convertMap);
206+
}
207+
return arr;
208+
}
209+
180210
List<String> stringList = JsonUtils.toList(fieldValue, String.class);
181211
Object arr = Array.newInstance(elementType.getTypeClass(), stringList.size());
182212
for (int i = 0; i < stringList.size(); i++) {

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java

+74
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public void startUp() throws Exception {
133133
createIndexDocs();
134134
createIndexWithFullType();
135135
createIndexForResourceNull("st_index4");
136+
createIndexWithNestType();
136137
}
137138

138139
/** create a index,and bulk some documents */
@@ -156,6 +157,31 @@ private void createIndexDocsByName(String indexName, List<String> testDataSet) {
156157
esRestClient.bulk(requestBody.toString());
157158
}
158159

160+
private void createIndexWithNestType() throws IOException, InterruptedException {
161+
String mapping =
162+
IOUtils.toString(
163+
ContainerUtil.getResourcesFile("/elasticsearch/st_index_nest_mapping.json")
164+
.toURI(),
165+
StandardCharsets.UTF_8);
166+
esRestClient.createIndex("st_index_nest", mapping);
167+
esRestClient.createIndex("st_index_nest_copy", mapping);
168+
BulkResponse response =
169+
esRestClient.bulk(
170+
"{ \"index\" : { \"_index\" : \"st_index_nest\", \"_id\" : \"1\" } }\n"
171+
+ IOUtils.toString(
172+
ContainerUtil.getResourcesFile(
173+
"/elasticsearch/st_index_nest_data.json")
174+
.toURI(),
175+
StandardCharsets.UTF_8)
176+
.replace("\n", "")
177+
+ "\n");
178+
Assertions.assertFalse(response.isErrors(), response.getResponse());
179+
// waiting index refresh
180+
Thread.sleep(INDEX_REFRESH_MILL_DELAY);
181+
Assertions.assertEquals(
182+
3, esRestClient.getIndexDocsCount("st_index_nest").get(0).getDocsCount());
183+
}
184+
159185
private void createIndexWithFullType() throws IOException, InterruptedException {
160186
String mapping =
161187
IOUtils.toString(
@@ -202,6 +228,21 @@ public void testElasticsearchWithSchema(TestContainer container)
202228
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
203229
}
204230

231+
@TestTemplate
232+
public void testElasticsearchWithNestSchema(TestContainer container)
233+
throws IOException, InterruptedException {
234+
Container.ExecResult execResult =
235+
container.executeJob("/elasticsearch/elasticsearch_source_and_sink_with_nest.conf");
236+
Assertions.assertEquals(0, execResult.getExitCode());
237+
238+
List<String> sinkData = readSinkDataWithNestSchema("st_index_nest_copy");
239+
String data =
240+
"{\"address\":[{\"zipcode\":\"10001\",\"city\":\"New York\",\"street\":\"123 Main St\"},"
241+
+ "{\"zipcode\":\"90001\",\"city\":\"Los Angeles\",\"street\":\"456 Elm St\"}],\"name\":\"John Doe\"}";
242+
243+
Assertions.assertIterableEquals(Lists.newArrayList(data), sinkData);
244+
}
245+
205246
@TestTemplate
206247
public void testElasticsSearchWithMultiSourceByFilter(TestContainer container)
207248
throws InterruptedException, IOException {
@@ -546,6 +587,13 @@ private List<String> readSinkDataWithSchema(String index) throws InterruptedExce
546587
return getDocsWithTransformTimestamp(source, index);
547588
}
548589

590+
private List<String> readSinkDataWithNestSchema(String index) throws InterruptedException {
591+
// wait for index refresh
592+
Thread.sleep(INDEX_REFRESH_MILL_DELAY);
593+
List<String> source = Lists.newArrayList("name", "address");
594+
return getDocsWithNestType(source, index);
595+
}
596+
549597
private List<String> readMultiSinkData(String index, List<String> source)
550598
throws InterruptedException {
551599
// wait for index refresh
@@ -604,6 +652,25 @@ private List<String> getDocsWithTransformTimestamp(List<String> source, String i
604652
return docs;
605653
}
606654

655+
private List<String> getDocsWithNestType(List<String> source, String index) {
656+
Map<String, Object> query = new HashMap<>();
657+
query.put("match_all", new HashMap<>());
658+
ScrollResult scrollResult = esRestClient.searchByScroll(index, source, query, "1m", 1000);
659+
scrollResult
660+
.getDocs()
661+
.forEach(
662+
x -> {
663+
x.remove("_index");
664+
x.remove("_type");
665+
x.remove("_id");
666+
});
667+
List<String> docs =
668+
scrollResult.getDocs().stream()
669+
.map(JsonUtils::toJsonString)
670+
.collect(Collectors.toList());
671+
return docs;
672+
}
673+
607674
private List<String> getDocsWithTransformDate(List<String> source, String index) {
608675
return getDocsWithTransformDate(source, index, Collections.emptyList());
609676
}
@@ -739,6 +806,13 @@ private List<String> mapTestDatasetForDSL(List<String> testDataset) {
739806
.collect(Collectors.toList());
740807
}
741808

809+
private List<String> mapTestDatasetForNest(List<String> testDataset) {
810+
return testDataset.stream()
811+
.map(JsonUtils::parseObject)
812+
.map(JsonNode::toString)
813+
.collect(Collectors.toList());
814+
}
815+
742816
/**
743817
* Use custom filtering criteria to query data
744818
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
######
19+
###### This config file is a demonstration of streaming processing in seatunnel config
20+
######
21+
22+
env {
23+
parallelism = 1
24+
job.mode = "BATCH"
25+
#checkpoint.interval = 10000
26+
}
27+
28+
source {
29+
Elasticsearch {
30+
hosts = ["https://elasticsearch:9200"]
31+
username = "elastic"
32+
password = "elasticsearch"
33+
index = "st_index_nest"
34+
source = ["address","name"]
35+
query = {"match_all": {}}
36+
tls_verify_certificate = false
37+
tls_verify_hostname = false
38+
}
39+
}
40+
41+
transform {
42+
}
43+
44+
sink {
45+
Elasticsearch {
46+
hosts = ["https://elasticsearch:9200"]
47+
username = "elastic"
48+
password = "elasticsearch"
49+
index = "st_index_nest_copy"
50+
tls_verify_certificate = false
51+
tls_verify_hostname = false
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"name": "John Doe",
3+
"address": [
4+
{
5+
"street": "123 Main St",
6+
"city": "New York",
7+
"zipcode": "10001"
8+
},
9+
{
10+
"street": "456 Elm St",
11+
"city": "Los Angeles",
12+
"zipcode": "90001"
13+
}
14+
]
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"mappings": {
3+
"properties": {
4+
"name": {
5+
"type": "text"
6+
},
7+
"address": {
8+
"type": "nested",
9+
"properties": {
10+
"street": {
11+
"type": "text"
12+
},
13+
"city": {
14+
"type": "keyword"
15+
},
16+
"zipcode": {
17+
"type": "keyword"
18+
}
19+
}
20+
}
21+
}
22+
}
23+
}

0 commit comments

Comments
 (0)