Skip to content

Commit d993c6a

Browse files
authored
[FLINK-38455][elasticsearch][fix] Fix Elasticsearch Missing required property 'BulkRequest.operations' error (#4270)
1 parent 2a23f71 commit d993c6a

4 files changed

Lines changed: 53 additions & 0 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,21 @@ protected void submitRequestEntries(
123123
LOG.debug("submitRequestEntries with {} items", requestEntries.size());
124124

125125
BulkRequest.Builder br = new BulkRequest.Builder();
126+
boolean hasOperations = false;
126127
for (Operation operation : requestEntries) {
127128
if (operation.getBulkOperationVariant() == null) {
128129
continue;
129130
}
130131
br.operations(new BulkOperation(operation.getBulkOperationVariant()));
132+
hasOperations = true;
133+
}
134+
135+
if (!hasOperations) {
136+
LOG.debug(
137+
"Skipping empty BulkRequest, all {} operation(s) have null BulkOperationVariant",
138+
requestEntries.size());
139+
requestResult.accept(Collections.emptyList());
140+
return;
131141
}
132142

133143
esClient.bulk(br.build())

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,14 @@ void testElasticsearchAddColumn() throws Exception {
146146
verifyInsertedDataWithNewColumn(tableId, "3", 3, 3.0, "value3", true);
147147
}
148148

149+
@Test
150+
void testElasticsearchSinkWithOnlySchemaChangeEvents() throws Exception {
151+
TableId tableId = TableId.tableId("default", "schema", "schema_only_table");
152+
List<Event> events = ElasticsearchTestUtils.createTestEventsWithOnlySchemaChange(tableId);
153+
154+
runJobWithEvents(events);
155+
}
156+
149157
private static ElasticsearchContainer createElasticsearchContainer() {
150158
ElasticsearchContainer esContainer = new ElasticsearchContainer(ELASTICSEARCH_VERSION);
151159
esContainer.withLogConsumer(new Slf4jLogConsumer(LOG));

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,21 @@ void testTableShardingWithSeparator() {
9090
assertThat(index).isEqualTo("test$2025-01-01");
9191
}
9292

93+
@Test
94+
void testSchemaChangeEventReturnsNull() {
95+
Schema tableSchema =
96+
Schema.newBuilder()
97+
.physicalColumn("id", DataTypes.INT().notNull())
98+
.physicalColumn("name", DataTypes.VARCHAR(255).notNull())
99+
.primaryKey("id")
100+
.build();
101+
102+
ElasticsearchEventSerializer serializer =
103+
new ElasticsearchEventSerializer(ZoneId.of("UTC"));
104+
CreateTableEvent createTableEvent = new CreateTableEvent(tableId, tableSchema);
105+
assertThat(serializer.apply(createTableEvent, new MockContext())).isNull();
106+
}
107+
93108
private String getShardingString(Map<TableId, String> shardingKey, String shardingSeparator) {
94109
RowType rowType =
95110
RowType.of(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,4 +186,24 @@ public static List<Event> createTestEventsWithAddColumn(TableId tableId) {
186186
3, 3.0, BinaryStringData.fromString("value3"), true
187187
})));
188188
}
189+
190+
/**
191+
* Creates a list of test events that only contain SchemaChangeEvents (CreateTableEvent) without
192+
* any DataChangeEvents. This simulates the scenario where a batch only has schema change
193+
* events, which previously caused "Missing required property 'BulkRequest.operations'" error.
194+
*
195+
* @param tableId the identifier of the table.
196+
* @return a list of events containing only CreateTableEvent.
197+
*/
198+
public static List<Event> createTestEventsWithOnlySchemaChange(TableId tableId) {
199+
Schema schema =
200+
Schema.newBuilder()
201+
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
202+
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
203+
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
204+
.primaryKey("id")
205+
.build();
206+
207+
return Collections.singletonList(new CreateTableEvent(tableId, schema));
208+
}
189209
}

0 commit comments

Comments
 (0)