Skip to content

Commit 08f3fb8

Browse files
gongzexinhector.gong
andauthored
[FLINK-39289][paimon] Supports executing TRUNCATE schema change events on tables with deletion-vectors.enabled:true set (#4327)
Co-authored-by: hector.gong <hector.gong@feisu.com>
1 parent 723615b commit 08f3fb8

2 files changed

Lines changed: 2 additions & 18 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.flink.cdc.common.event.TruncateTableEvent;
3131
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
3232
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
33-
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
3433
import org.apache.flink.cdc.common.schema.Schema;
3534
import org.apache.flink.cdc.common.sink.MetadataApplier;
3635
import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
@@ -354,10 +353,6 @@ private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolv
354353
private void applyTruncateTable(TruncateTableEvent event) throws SchemaEvolveException {
355354
try {
356355
Table table = catalog.getTable(tableIdToIdentifier(event));
357-
if (table.options().get("deletion-vectors.enabled").equals("true")) {
358-
throw new UnsupportedSchemaChangeEventException(
359-
event, "Unable to truncate a table with deletion vectors enabled.", null);
360-
}
361356
try (BatchTableCommit batchTableCommit = table.newBatchWriteBuilder().newCommit()) {
362357
batchTableCommit.truncateTable();
363358
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.apache.flink.cdc.common.event.TableId;
4646
import org.apache.flink.cdc.common.event.TruncateTableEvent;
4747
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
48-
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
4948
import org.apache.flink.cdc.common.factories.DataSinkFactory;
5049
import org.apache.flink.cdc.common.factories.FactoryHelper;
5150
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
@@ -489,18 +488,8 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto
489488
Row.ofKind(RowKind.INSERT, "6", "6"));
490489

491490
TruncateTableEvent truncateTableEvent = new TruncateTableEvent(table1);
492-
if (enableDeleteVector) {
493-
Assertions.assertThatThrownBy(
494-
() -> metadataApplier.applySchemaChange(truncateTableEvent))
495-
.isExactlyInstanceOf(SchemaEvolveException.class)
496-
.cause()
497-
.isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class)
498-
.extracting("exceptionMessage")
499-
.isEqualTo("Unable to truncate a table with deletion vectors enabled.");
500-
} else {
501-
metadataApplier.applySchemaChange(truncateTableEvent);
502-
Assertions.assertThat(fetchResults(table1)).isEmpty();
503-
}
491+
metadataApplier.applySchemaChange(truncateTableEvent);
492+
Assertions.assertThat(fetchResults(table1)).isEmpty();
504493

505494
DropTableEvent dropTableEvent = new DropTableEvent(table1);
506495
metadataApplier.applySchemaChange(dropTableEvent);

0 commit comments

Comments
 (0)