Skip to content

Commit 3455316

Browse files
authored
[Bugfix][Elasticsearch] Fix add column event (#9069)
1 parent 01d31a6 commit 3455316

File tree

2 files changed

+39
-4
lines changed
  • seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink
  • seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch

2 files changed

+39
-4
lines changed

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@
2323
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
2424
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2525
import org.apache.seatunnel.api.table.catalog.Column;
26+
import org.apache.seatunnel.api.table.catalog.TableSchema;
2627
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
2728
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
2829
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
2930
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
3031
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
32+
import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
33+
import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventHandler;
3134
import org.apache.seatunnel.api.table.type.RowKind;
3235
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3336
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -66,12 +69,14 @@ public class ElasticsearchSinkWriter
6669

6770
private final int maxBatchSize;
6871

69-
private final SeaTunnelRowSerializer seaTunnelRowSerializer;
72+
private SeaTunnelRowSerializer seaTunnelRowSerializer;
7073
private final List<String> requestEsList;
7174
private EsRestClient esRestClient;
7275
private RetryMaterial retryMaterial;
7376
private static final long DEFAULT_SLEEP_TIME_MS = 200L;
7477
private final IndexInfo indexInfo;
78+
private TableSchema tableSchema;
79+
private final TableSchemaChangeEventHandler tableSchemaChangeEventHandler;
7580

7681
public ElasticsearchSinkWriter(
7782
Context context,
@@ -94,6 +99,8 @@ public ElasticsearchSinkWriter(
9499
this.requestEsList = new ArrayList<>(maxBatchSize);
95100
this.retryMaterial =
96101
new RetryMaterial(maxRetryCount, true, exception -> true, DEFAULT_SLEEP_TIME_MS);
102+
this.tableSchema = catalogTable.getTableSchema();
103+
this.tableSchemaChangeEventHandler = new TableSchemaChangeEventDispatcher();
97104
}
98105

99106
@Override
@@ -120,6 +127,13 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
120127
} else {
121128
throw new UnsupportedOperationException("Unsupported alter table event: " + event);
122129
}
130+
131+
this.tableSchema = tableSchemaChangeEventHandler.reset(tableSchema).apply(event);
132+
this.seaTunnelRowSerializer =
133+
new ElasticsearchRowSerializer(
134+
esRestClient.getClusterInfo(),
135+
indexInfo,
136+
tableSchema.toPhysicalRowDataType());
123137
}
124138

125139
private void applySingleSchemaChangeEvent(SchemaChangeEvent event) {

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,30 @@ public void testSchemaChange(TestContainer container) throws InterruptedExceptio
192192
this.container.execInContainer(
193193
"bash",
194194
"-c",
195-
"curl -k -u elastic:elasticsearch https://localhost:9200/schema_change_index/_count");
196-
Assertions.assertTrue(
197-
indexCountResult.getStdout().contains("\"count\":18"));
195+
"curl -k -u elastic:elasticsearch -H \"Content-Type:application/json\" -d '{ \"from\": 0, \"size\": 10000, \"query\": { \"match_all\": {}}}' https://localhost:9200/schema_change_index/_search");
196+
log.info("indexCountResult: {}", indexCountResult.getStdout());
197+
ObjectNode jsonNode =
198+
JsonUtils.parseObject(indexCountResult.getStdout());
199+
JsonNode hits = jsonNode.get("hits");
200+
long totalCount = hits.get("total").get("value").asLong();
201+
Assertions.assertEquals(18L, totalCount);
202+
203+
hits.get("hits")
204+
.forEach(
205+
hit -> {
206+
JsonNode source = hit.get("_source");
207+
int id = source.get("id").asInt();
208+
if (id >= 119 && id <= 127) {
209+
Assertions.assertTrue(
210+
source.has("add_column1"));
211+
Assertions.assertFalse(
212+
source.get("add_column1").isNull());
213+
Assertions.assertTrue(
214+
source.has("add_column2"));
215+
Assertions.assertFalse(
216+
source.get("add_column2").isNull());
217+
}
218+
});
198219
});
199220
}
200221

0 commit comments

Comments
 (0)