Skip to content

Commit f52fc6b

Browse files
committed
[hotfix] Optimize SchemaChangeEventVisitor: use generic visitor handler class & introduce voidVisitors
1 parent b569d64 commit f52fc6b

13 files changed

Lines changed: 92 additions & 288 deletions

File tree

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AddColumnEventVisitor.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterTableCommentEventVisitor.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/CreateTableEventVisitor.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropColumnEventVisitor.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/DropTableEventVisitor.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/RenameColumnEventVisitor.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@
3333
public class SchemaChangeEventVisitor {
3434
public static <T, E extends Throwable> T visit(
3535
SchemaChangeEvent event,
36-
AddColumnEventVisitor<T, E> addColumnVisitor,
37-
AlterColumnTypeEventVisitor<T, E> alterColumnTypeEventVisitor,
38-
CreateTableEventVisitor<T, E> createTableEventVisitor,
39-
DropColumnEventVisitor<T, E> dropColumnEventVisitor,
40-
DropTableEventVisitor<T, E> dropTableEventVisitor,
41-
RenameColumnEventVisitor<T, E> renameColumnEventVisitor,
42-
TruncateTableEventVisitor<T, E> truncateTableEventVisitor,
43-
AlterTableCommentEventVisitor<T, E> alterTableCommentEventVisitor)
36+
VisitorHandler<AddColumnEvent, T, E> addColumnVisitor,
37+
VisitorHandler<AlterColumnTypeEvent, T, E> alterColumnTypeEventVisitor,
38+
VisitorHandler<CreateTableEvent, T, E> createTableEventVisitor,
39+
VisitorHandler<DropColumnEvent, T, E> dropColumnEventVisitor,
40+
VisitorHandler<DropTableEvent, T, E> dropTableEventVisitor,
41+
VisitorHandler<RenameColumnEvent, T, E> renameColumnEventVisitor,
42+
VisitorHandler<TruncateTableEvent, T, E> truncateTableEventVisitor,
43+
VisitorHandler<AlterTableCommentEvent, T, E> alterTableCommentEventVisitor)
4444
throws E {
4545
if (event instanceof AddColumnEvent) {
4646
if (addColumnVisitor == null) {
@@ -87,4 +87,53 @@ public static <T, E extends Throwable> T visit(
8787
"Unknown schema change event type " + event.getType());
8888
}
8989
}
90+
91+
public static <E extends Throwable> void voidVisit(
92+
SchemaChangeEvent event,
93+
VoidVisitorHandler<AddColumnEvent, E> addColumnVisitor,
94+
VoidVisitorHandler<AlterColumnTypeEvent, E> alterColumnTypeEventVisitor,
95+
VoidVisitorHandler<CreateTableEvent, E> createTableEventVisitor,
96+
VoidVisitorHandler<DropColumnEvent, E> dropColumnEventVisitor,
97+
VoidVisitorHandler<DropTableEvent, E> dropTableEventVisitor,
98+
VoidVisitorHandler<RenameColumnEvent, E> renameColumnEventVisitor,
99+
VoidVisitorHandler<TruncateTableEvent, E> truncateTableEventVisitor,
100+
VoidVisitorHandler<AlterTableCommentEvent, E> alterTableCommentEventHandler)
101+
throws E {
102+
if (event instanceof AddColumnEvent) {
103+
if (addColumnVisitor != null) {
104+
addColumnVisitor.visit((AddColumnEvent) event);
105+
}
106+
} else if (event instanceof AlterColumnTypeEvent) {
107+
if (alterColumnTypeEventVisitor != null) {
108+
alterColumnTypeEventVisitor.visit((AlterColumnTypeEvent) event);
109+
}
110+
} else if (event instanceof CreateTableEvent) {
111+
if (createTableEventVisitor != null) {
112+
createTableEventVisitor.visit((CreateTableEvent) event);
113+
}
114+
} else if (event instanceof DropColumnEvent) {
115+
if (dropColumnEventVisitor != null) {
116+
dropColumnEventVisitor.visit((DropColumnEvent) event);
117+
}
118+
} else if (event instanceof DropTableEvent) {
119+
if (dropTableEventVisitor != null) {
120+
dropTableEventVisitor.visit((DropTableEvent) event);
121+
}
122+
} else if (event instanceof RenameColumnEvent) {
123+
if (renameColumnEventVisitor != null) {
124+
renameColumnEventVisitor.visit((RenameColumnEvent) event);
125+
}
126+
} else if (event instanceof TruncateTableEvent) {
127+
if (truncateTableEventVisitor != null) {
128+
truncateTableEventVisitor.visit((TruncateTableEvent) event);
129+
}
130+
} else if (event instanceof AlterTableCommentEvent) {
131+
if (alterTableCommentEventHandler != null) {
132+
alterTableCommentEventHandler.visit((AlterTableCommentEvent) event);
133+
}
134+
} else {
135+
throw new IllegalArgumentException(
136+
"Unknown schema change event type " + event.getType());
137+
}
138+
}
90139
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/TruncateTableEventVisitor.java renamed to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VisitorHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
package org.apache.flink.cdc.common.event.visitor;
1919

2020
import org.apache.flink.cdc.common.annotation.Internal;
21+
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2122
import org.apache.flink.cdc.common.event.TruncateTableEvent;
2223

2324
/** Visitor for {@link TruncateTableEvent}s. */
2425
@Internal
2526
@FunctionalInterface
26-
public interface TruncateTableEventVisitor<T, E extends Throwable> {
27-
T visit(TruncateTableEvent event) throws E;
27+
public interface VisitorHandler<EVT extends SchemaChangeEvent, T, E extends Throwable> {
28+
T visit(EVT event) throws E;
2829
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnTypeEventVisitor.java renamed to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/VoidVisitorHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
package org.apache.flink.cdc.common.event.visitor;
1919

2020
import org.apache.flink.cdc.common.annotation.Internal;
21-
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
21+
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
22+
import org.apache.flink.cdc.common.event.TruncateTableEvent;
2223

23-
/** Visitor for {@link AlterColumnTypeEvent}s. */
24+
/** Visitor for {@link TruncateTableEvent}s. */
2425
@Internal
2526
@FunctionalInterface
26-
public interface AlterColumnTypeEventVisitor<T, E extends Throwable> {
27-
T visit(AlterColumnTypeEvent event) throws E;
27+
public interface VoidVisitorHandler<EVT extends SchemaChangeEvent, E extends Throwable> {
28+
void visit(EVT event) throws E;
2829
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -112,40 +112,16 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
112112

113113
@Override
114114
public void applySchemaChange(SchemaChangeEvent event) {
115-
SchemaChangeEventVisitor.<Void, SchemaEvolveException>visit(
115+
SchemaChangeEventVisitor.voidVisit(
116116
event,
117-
addColumnEvent -> {
118-
applyAddColumnEvent(addColumnEvent);
119-
return null;
120-
},
121-
alterColumnTypeEvent -> {
122-
applyAlterColumnTypeEvent(alterColumnTypeEvent);
123-
return null;
124-
},
125-
createTableEvent -> {
126-
applyCreateTableEvent(createTableEvent);
127-
return null;
128-
},
129-
dropColumnEvent -> {
130-
applyDropColumnEvent(dropColumnEvent);
131-
return null;
132-
},
133-
dropTableEvent -> {
134-
applyDropTableEvent(dropTableEvent);
135-
return null;
136-
},
137-
renameColumnEvent -> {
138-
applyRenameColumnEvent(renameColumnEvent);
139-
return null;
140-
},
141-
truncateTableEvent -> {
142-
applyTruncateTableEvent(truncateTableEvent);
143-
return null;
144-
},
145-
alterTableCommentEvent -> {
146-
applyAlterTableCommentEvent(alterTableCommentEvent);
147-
return null;
148-
});
117+
this::applyAddColumnEvent,
118+
this::applyAlterColumnTypeEvent,
119+
this::applyCreateTableEvent,
120+
this::applyDropColumnEvent,
121+
this::applyDropTableEvent,
122+
this::applyRenameColumnEvent,
123+
this::applyTruncateTableEvent,
124+
this::applyAlterTableCommentEvent);
149125
}
150126

151127
private void applyCreateTableEvent(CreateTableEvent event) throws SchemaEvolveException {

0 commit comments

Comments
 (0)