From 889ac08ed294d4d45da507fb50976f749d8259a5 Mon Sep 17 00:00:00 2001 From: YangYu Date: Fri, 22 Aug 2025 12:02:32 +0800 Subject: [PATCH] [FLINK-38276][cdc-common] PaimonWriter does not invalidate cache when schema changes --- .../apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java index bf660454e69..5e6ec230588 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java @@ -133,6 +133,7 @@ public void write(InputT event, Context context) throws IOException { // remove the table temporarily, then add the table with latest schema when received // DataChangeEvent. tables.remove(tableId); + catalog.invalidateTable(tableId); try { if (writes.containsKey(tableId)) { writes.get(tableId).replace(getTable(tableId));