|
27 | 27 | import org.apache.flink.cdc.common.event.RenameColumnEvent; |
28 | 28 | import org.apache.flink.cdc.common.event.SchemaChangeEvent; |
29 | 29 | import org.apache.flink.cdc.common.event.TruncateTableEvent; |
| 30 | +import org.apache.flink.cdc.common.utils.Preconditions; |
| 31 | + |
| 32 | +import javax.annotation.Nonnull; |
30 | 33 |
|
31 | 34 | /** Visitor clas for all {@link SchemaChangeEvent}s and returns a specific typed object. */ |
32 | 35 | @Internal |
33 | 36 | public class SchemaChangeEventVisitor { |
34 | 37 | public static <T, E extends Throwable> T visit( |
35 | 38 | SchemaChangeEvent event, |
36 | | - AddColumnEventVisitor<T, E> addColumnVisitor, |
37 | | - AlterColumnTypeEventVisitor<T, E> alterColumnTypeEventVisitor, |
38 | | - CreateTableEventVisitor<T, E> createTableEventVisitor, |
39 | | - DropColumnEventVisitor<T, E> dropColumnEventVisitor, |
40 | | - DropTableEventVisitor<T, E> dropTableEventVisitor, |
41 | | - RenameColumnEventVisitor<T, E> renameColumnEventVisitor, |
42 | | - TruncateTableEventVisitor<T, E> truncateTableEventVisitor, |
43 | | - AlterTableCommentEventVisitor<T, E> alterTableCommentEventVisitor) |
| 39 | + @Nonnull VisitorHandler<AddColumnEvent, T, E> addColumnVisitor, |
| 40 | + @Nonnull VisitorHandler<AlterColumnTypeEvent, T, E> alterColumnTypeEventVisitor, |
| 41 | + @Nonnull VisitorHandler<CreateTableEvent, T, E> createTableEventVisitor, |
| 42 | + @Nonnull VisitorHandler<DropColumnEvent, T, E> dropColumnEventVisitor, |
| 43 | + @Nonnull VisitorHandler<DropTableEvent, T, E> dropTableEventVisitor, |
| 44 | + @Nonnull VisitorHandler<RenameColumnEvent, T, E> renameColumnEventVisitor, |
| 45 | + @Nonnull VisitorHandler<TruncateTableEvent, T, E> truncateTableEventVisitor, |
| 46 | + @Nonnull VisitorHandler<AlterTableCommentEvent, T, E> alterTableCommentEventVisitor) |
44 | 47 | throws E { |
45 | 48 | if (event instanceof AddColumnEvent) { |
46 | | - if (addColumnVisitor == null) { |
47 | | - return null; |
48 | | - } |
| 49 | + Preconditions.checkNotNull(addColumnVisitor); |
49 | 50 | return addColumnVisitor.visit((AddColumnEvent) event); |
50 | 51 | } else if (event instanceof AlterColumnTypeEvent) { |
51 | | - if (alterColumnTypeEventVisitor == null) { |
52 | | - return null; |
53 | | - } |
| 52 | + Preconditions.checkNotNull(alterColumnTypeEventVisitor); |
54 | 53 | return alterColumnTypeEventVisitor.visit((AlterColumnTypeEvent) event); |
55 | 54 | } else if (event instanceof CreateTableEvent) { |
56 | | - if (createTableEventVisitor == null) { |
57 | | - return null; |
58 | | - } |
| 55 | + Preconditions.checkNotNull(createTableEventVisitor); |
59 | 56 | return createTableEventVisitor.visit((CreateTableEvent) event); |
60 | 57 | } else if (event instanceof DropColumnEvent) { |
61 | | - if (dropColumnEventVisitor == null) { |
62 | | - return null; |
63 | | - } |
| 58 | + Preconditions.checkNotNull(dropColumnEventVisitor); |
64 | 59 | return dropColumnEventVisitor.visit((DropColumnEvent) event); |
65 | 60 | } else if (event instanceof DropTableEvent) { |
66 | | - if (dropTableEventVisitor == null) { |
67 | | - return null; |
68 | | - } |
| 61 | + Preconditions.checkNotNull(dropTableEventVisitor); |
69 | 62 | return dropTableEventVisitor.visit((DropTableEvent) event); |
70 | 63 | } else if (event instanceof RenameColumnEvent) { |
71 | | - if (renameColumnEventVisitor == null) { |
72 | | - return null; |
73 | | - } |
| 64 | + Preconditions.checkNotNull(renameColumnEventVisitor); |
74 | 65 | return renameColumnEventVisitor.visit((RenameColumnEvent) event); |
75 | 66 | } else if (event instanceof TruncateTableEvent) { |
76 | | - if (truncateTableEventVisitor == null) { |
77 | | - return null; |
78 | | - } |
| 67 | + Preconditions.checkNotNull(truncateTableEventVisitor); |
79 | 68 | return truncateTableEventVisitor.visit((TruncateTableEvent) event); |
80 | 69 | } else if (event instanceof AlterTableCommentEvent) { |
81 | | - if (alterTableCommentEventVisitor == null) { |
82 | | - return null; |
83 | | - } |
| 70 | + Preconditions.checkNotNull(alterTableCommentEventVisitor); |
84 | 71 | return alterTableCommentEventVisitor.visit((AlterTableCommentEvent) event); |
85 | 72 | } else { |
86 | 73 | throw new IllegalArgumentException( |
87 | 74 | "Unknown schema change event type " + event.getType()); |
88 | 75 | } |
89 | 76 | } |
| 77 | + |
| 78 | + public static <E extends Throwable> void voidVisit( |
| 79 | + SchemaChangeEvent event, |
| 80 | + @Nonnull VoidVisitorHandler<AddColumnEvent, E> addColumnVisitor, |
| 81 | + @Nonnull VoidVisitorHandler<AlterColumnTypeEvent, E> alterColumnTypeEventVisitor, |
| 82 | + @Nonnull VoidVisitorHandler<CreateTableEvent, E> createTableEventVisitor, |
| 83 | + @Nonnull VoidVisitorHandler<DropColumnEvent, E> dropColumnEventVisitor, |
| 84 | + @Nonnull VoidVisitorHandler<DropTableEvent, E> dropTableEventVisitor, |
| 85 | + @Nonnull VoidVisitorHandler<RenameColumnEvent, E> renameColumnEventVisitor, |
| 86 | + @Nonnull VoidVisitorHandler<TruncateTableEvent, E> truncateTableEventVisitor, |
| 87 | + @Nonnull VoidVisitorHandler<AlterTableCommentEvent, E> alterTableCommentEventHandler) |
| 88 | + throws E { |
| 89 | + visit( |
| 90 | + event, |
| 91 | + wrapVoidVisitor(addColumnVisitor), |
| 92 | + wrapVoidVisitor(alterColumnTypeEventVisitor), |
| 93 | + wrapVoidVisitor(createTableEventVisitor), |
| 94 | + wrapVoidVisitor(dropColumnEventVisitor), |
| 95 | + wrapVoidVisitor(dropTableEventVisitor), |
| 96 | + wrapVoidVisitor(renameColumnEventVisitor), |
| 97 | + wrapVoidVisitor(truncateTableEventVisitor), |
| 98 | + wrapVoidVisitor(alterTableCommentEventHandler)); |
| 99 | + } |
| 100 | + |
| 101 | + private static <EVT extends SchemaChangeEvent, E extends Throwable> |
| 102 | + VisitorHandler<EVT, Void, E> wrapVoidVisitor(VoidVisitorHandler<EVT, E> handler) { |
| 103 | + Preconditions.checkNotNull(handler); |
| 104 | + return event -> { |
| 105 | + handler.visit(event); |
| 106 | + return null; |
| 107 | + }; |
| 108 | + } |
90 | 109 | } |
0 commit comments