Skip to content

Commit 46fcb23

Browse files
[hotfix][connector-elasticsearch-sink] Convert index to lowercase (#8429)
1 parent 71ab848 commit 46fcb23

File tree

4 files changed

+113
-6
lines changed

4 files changed

+113
-6
lines changed

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) {
388388
* @return true or false
389389
*/
390390
public boolean checkIndexExist(String index) {
391-
Request request = new Request("HEAD", "/" + index);
391+
Request request = new Request("HEAD", "/" + index.toLowerCase());
392392
try {
393393
Response response = restClient.performRequest(request);
394394
int statusCode = response.getStatusLine().getStatusCode();
@@ -400,7 +400,9 @@ public boolean checkIndexExist(String index) {
400400
}
401401

402402
public List<IndexDocsCount> getIndexDocsCount(String index) {
403-
String endpoint = String.format("/_cat/indices/%s?h=index,docsCount&format=json", index);
403+
String endpoint =
404+
String.format(
405+
"/_cat/indices/%s?h=index,docsCount&format=json", index.toLowerCase());
404406
Request request = new Request("GET", endpoint);
405407
try {
406408
Response response = restClient.performRequest(request);
@@ -458,7 +460,7 @@ public void createIndex(String indexName) {
458460
}
459461

460462
public void createIndex(String indexName, String mapping) {
461-
String endpoint = String.format("/%s", indexName);
463+
String endpoint = String.format("/%s", indexName.toLowerCase());
462464
Request request = new Request("PUT", endpoint);
463465
if (StringUtils.isNotEmpty(mapping)) {
464466
request.setJsonEntity(mapping);
@@ -484,7 +486,7 @@ public void createIndex(String indexName, String mapping) {
484486
}
485487

486488
public void dropIndex(String tableName) {
487-
String endpoint = String.format("/%s", tableName);
489+
String endpoint = String.format("/%s", tableName.toLowerCase());
488490
Request request = new Request("DELETE", endpoint);
489491
try {
490492
Response response = restClient.performRequest(request);
@@ -510,7 +512,7 @@ public void dropIndex(String tableName) {
510512
}
511513

512514
public void clearIndexData(String indexName) {
513-
String endpoint = String.format("/%s/_delete_by_query", indexName);
515+
String endpoint = String.format("/%s/_delete_by_query", indexName.toLowerCase());
514516
Request request = new Request("POST", endpoint);
515517
String jsonString = "{ \"query\": { \"match_all\": {} } }";
516518
request.setJsonEntity(jsonString);

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public ElasticsearchSinkWriter(
7070
this.context = context;
7171
this.maxBatchSize = maxBatchSize;
7272

73-
IndexInfo indexInfo = new IndexInfo(catalogTable.getTableId().getTableName(), config);
73+
IndexInfo indexInfo =
74+
new IndexInfo(catalogTable.getTableId().getTableName().toLowerCase(), config);
7475
esRestClient = EsRestClient.createInstance(config);
7576
this.seaTunnelRowSerializer =
7677
new ElasticsearchRowSerializer(

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java

+33
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
import org.apache.commons.io.IOUtils;
4444

45+
import org.awaitility.Awaitility;
4546
import org.junit.jupiter.api.AfterEach;
4647
import org.junit.jupiter.api.Assertions;
4748
import org.junit.jupiter.api.BeforeEach;
@@ -75,6 +76,7 @@
7576
import java.util.Objects;
7677
import java.util.Optional;
7778
import java.util.Set;
79+
import java.util.concurrent.CompletableFuture;
7880
import java.util.concurrent.TimeUnit;
7981
import java.util.concurrent.locks.LockSupport;
8082
import java.util.function.Function;
@@ -352,6 +354,37 @@ public void testElasticsearchWithFullType(TestContainer container)
352354
esRestClient.getIndexDocsCount("st_index_full_type_target").get(0).getDocsCount());
353355
}
354356

357+
@TestTemplate
358+
public void testFakeSourceToElasticsearchWithUpperCaseIndex(TestContainer container) {
359+
CompletableFuture.supplyAsync(
360+
() -> {
361+
try {
362+
Container.ExecResult execResult =
363+
container.executeJob(
364+
"/elasticsearch/fakesource_to_elasticsearch_with_upper_case_index.conf");
365+
} catch (IOException e) {
366+
throw new RuntimeException(e);
367+
} catch (InterruptedException e) {
368+
throw new RuntimeException(e);
369+
}
370+
return null;
371+
});
372+
Awaitility.await()
373+
.atMost(120, TimeUnit.SECONDS)
374+
.ignoreExceptions()
375+
.pollInterval(3, TimeUnit.SECONDS)
376+
.pollDelay(10, TimeUnit.SECONDS)
377+
.untilAsserted(
378+
() -> {
379+
Assertions.assertEquals(
380+
20,
381+
esRestClient
382+
.getIndexDocsCount("st_fake_table")
383+
.get(0)
384+
.getDocsCount());
385+
});
386+
}
387+
355388
@TestTemplate
356389
public void testElasticsearchWithoutSchema(TestContainer container)
357390
throws IOException, InterruptedException {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
######
19+
###### This config file is a demonstration of streaming processing in seatunnel config
20+
######
21+
22+
env {
23+
parallelism = 1
24+
job.mode = "STREAMING"
25+
}
26+
27+
source {
28+
FakeSource {
29+
plugin_output = "fake"
30+
row.num = 20
31+
schema {
32+
table = "FakeDatabase.FAKE_TABLE"
33+
columns = [
34+
{
35+
name = id
36+
type = bigint
37+
nullable = false
38+
comment = "primary key id"
39+
},
40+
{
41+
name = name
42+
type = "string"
43+
comment = "name"
44+
},
45+
{
46+
name = age
47+
type = int
48+
comment = "age"
49+
}
50+
]
51+
}
52+
}
53+
}
54+
55+
transform {
56+
}
57+
58+
sink {
59+
Elasticsearch {
60+
hosts = ["https://elasticsearch:9200"]
61+
username = "elastic"
62+
password = "elasticsearch"
63+
tls_verify_certificate = false
64+
tls_verify_hostname = false
65+
66+
index = "st_${table_name}"
67+
index_type = "_doc"
68+
"schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
69+
"data_save_mode"="APPEND_DATA"
70+
}
71+
}

0 commit comments

Comments
 (0)