Skip to content

Commit 32ec9f5

Browse files
authored
[FLINK-38359][pipeline-connector][paimon] Avoid exception when writing update event to Append only table. (apache#4120)
1 parent f22b214 commit 32ec9f5

4 files changed

Lines changed: 52 additions & 8 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,12 @@ public PaimonEvent serialize(Event event) {
8080
return new PaimonEvent(tableId, null, true);
8181
} else if (event instanceof DataChangeEvent) {
8282
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
83+
TableSchemaInfo schemaInfo = schemaMaps.get(dataChangeEvent.tableId());
8384
List<GenericRow> genericRows =
8485
PaimonWriterHelper.convertEventToFullGenericRows(
8586
dataChangeEvent,
86-
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
87+
schemaInfo.getFieldGetters(),
88+
schemaInfo.hasPrimaryKey());
8789
return new PaimonEvent(tableId, genericRows, false, bucket);
8890
} else {
8991
throw new IllegalArgumentException(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,9 @@ public static GenericRow convertEventToGenericRow(
222222

223223
/** create full {@link GenericRow}s from a {@link DataChangeEvent} for {@link PaimonWriter}. */
224224
public static List<GenericRow> convertEventToFullGenericRows(
225-
DataChangeEvent dataChangeEvent, List<RecordData.FieldGetter> fieldGetters) {
225+
DataChangeEvent dataChangeEvent,
226+
List<RecordData.FieldGetter> fieldGetters,
227+
boolean hasPrimaryKey) {
226228
List<GenericRow> fullGenericRows = new ArrayList<>();
227229
switch (dataChangeEvent.op()) {
228230
case INSERT:
@@ -235,19 +237,25 @@ public static List<GenericRow> convertEventToFullGenericRows(
235237
case UPDATE:
236238
case REPLACE:
237239
{
238-
fullGenericRows.add(
239-
convertRecordDataToGenericRow(
240-
dataChangeEvent.before(), fieldGetters, RowKind.UPDATE_BEFORE));
240+
if (hasPrimaryKey) {
241+
fullGenericRows.add(
242+
convertRecordDataToGenericRow(
243+
dataChangeEvent.before(),
244+
fieldGetters,
245+
RowKind.UPDATE_BEFORE));
246+
}
241247
fullGenericRows.add(
242248
convertRecordDataToGenericRow(
243249
dataChangeEvent.after(), fieldGetters, RowKind.UPDATE_AFTER));
244250
break;
245251
}
246252
case DELETE:
247253
{
248-
fullGenericRows.add(
249-
convertRecordDataToGenericRow(
250-
dataChangeEvent.before(), fieldGetters, RowKind.DELETE));
254+
if (hasPrimaryKey) {
255+
fullGenericRows.add(
256+
convertRecordDataToGenericRow(
257+
dataChangeEvent.before(), fieldGetters, RowKind.DELETE));
258+
}
251259
break;
252260
}
253261
default:

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@ public class TableSchemaInfo {
3030

3131
private final List<RecordData.FieldGetter> fieldGetters;
3232

33+
private final boolean hasPrimaryKey;
34+
3335
public TableSchemaInfo(Schema schema, ZoneId zoneId) {
3436
this.schema = schema;
3537
this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, zoneId);
38+
this.hasPrimaryKey = !schema.primaryKeys().isEmpty();
3639
}
3740

3841
public Schema getSchema() {
@@ -42,4 +45,8 @@ public Schema getSchema() {
4245
public List<RecordData.FieldGetter> getFieldGetters() {
4346
return fieldGetters;
4447
}
48+
49+
public boolean hasPrimaryKey() {
50+
return hasPrimaryKey;
51+
}
4552
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,33 @@ public void testSinkWithDataChangeForAppendOnlyTable(String metastore, boolean e
356356
Row.ofKind(RowKind.INSERT, "1", "1"),
357357
Row.ofKind(RowKind.INSERT, "2", "2"),
358358
Row.ofKind(RowKind.INSERT, "3", "3"));
359+
360+
// Delete
361+
writeAndCommit(
362+
writer,
363+
committer,
364+
generateDelete(
365+
table1, Arrays.asList(Tuple2.of(STRING(), "3"), Tuple2.of(STRING(), "3"))));
366+
Assertions.assertThat(fetchResults(table1))
367+
.containsExactlyInAnyOrder(
368+
Row.ofKind(RowKind.INSERT, "1", "1"),
369+
Row.ofKind(RowKind.INSERT, "2", "2"),
370+
Row.ofKind(RowKind.INSERT, "3", "3"));
371+
372+
// Update
373+
writeAndCommit(
374+
writer,
375+
committer,
376+
generateUpdate(
377+
table1,
378+
Arrays.asList(Tuple2.of(STRING(), "3"), Tuple2.of(STRING(), "3")),
379+
Arrays.asList(Tuple2.of(STRING(), "3"), Tuple2.of(STRING(), "x"))));
380+
Assertions.assertThat(fetchResults(table1))
381+
.containsExactlyInAnyOrder(
382+
Row.ofKind(RowKind.INSERT, "1", "1"),
383+
Row.ofKind(RowKind.INSERT, "2", "2"),
384+
Row.ofKind(RowKind.INSERT, "3", "3"),
385+
Row.ofKind(RowKind.INSERT, "3", "x"));
359386
}
360387

361388
@ParameterizedTest

0 commit comments

Comments
 (0)