Skip to content

Commit b96fa35

Browse files
ChaomingZhangCNlvyanquan
authored andcommitted
[FLINK-37203] Alter table comment or column comment (apache#3893)
Co-authored-by: lvyanquan <lvyanquan.lyq@alibaba-inc.com>
1 parent 10296cc commit b96fa35

41 files changed

Lines changed: 827 additions & 147 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
4747
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
48+
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_TABLE_COMMENT;
4849
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
4950
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
5051
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
@@ -222,6 +223,7 @@ void testSchemaEvolutionTypesConfiguration() throws Exception {
222223
null,
223224
null,
224225
ImmutableSet.of(
226+
ALTER_TABLE_COMMENT,
225227
ADD_COLUMN,
226228
ALTER_COLUMN_TYPE,
227229
CREATE_TABLE,
@@ -234,6 +236,7 @@ void testSchemaEvolutionTypesConfiguration() throws Exception {
234236
null,
235237
null,
236238
ImmutableSet.of(
239+
ALTER_TABLE_COMMENT,
237240
ADD_COLUMN,
238241
ALTER_COLUMN_TYPE,
239242
CREATE_TABLE,
@@ -246,6 +249,7 @@ void testSchemaEvolutionTypesConfiguration() throws Exception {
246249
"[column, table]",
247250
"[drop]",
248251
ImmutableSet.of(
252+
ALTER_TABLE_COMMENT,
249253
ADD_COLUMN,
250254
ALTER_COLUMN_TYPE,
251255
CREATE_TABLE,
@@ -256,12 +260,18 @@ void testSchemaEvolutionTypesConfiguration() throws Exception {
256260
null,
257261
null,
258262
ImmutableSet.of(
259-
ADD_COLUMN, ALTER_COLUMN_TYPE, CREATE_TABLE, DROP_COLUMN, RENAME_COLUMN));
263+
ALTER_TABLE_COMMENT,
264+
ADD_COLUMN,
265+
ALTER_COLUMN_TYPE,
266+
CREATE_TABLE,
267+
DROP_COLUMN,
268+
RENAME_COLUMN));
260269
testSchemaEvolutionTypesParsing(
261270
"lenient",
262271
null,
263272
"[]",
264273
ImmutableSet.of(
274+
ALTER_TABLE_COMMENT,
265275
ADD_COLUMN,
266276
ALTER_COLUMN_TYPE,
267277
CREATE_TABLE,
@@ -594,6 +604,7 @@ void testParsingFullDefinitionFromString() throws Exception {
594604
.put("bootstrap-servers", "localhost:9092")
595605
.build()),
596606
ImmutableSet.of(
607+
ALTER_TABLE_COMMENT,
597608
DROP_COLUMN,
598609
ALTER_COLUMN_TYPE,
599610
ADD_COLUMN,
@@ -620,6 +631,7 @@ void testParsingFullDefinitionFromString() throws Exception {
620631
null,
621632
new Configuration(),
622633
ImmutableSet.of(
634+
ALTER_TABLE_COMMENT,
623635
DROP_COLUMN,
624636
ALTER_COLUMN_TYPE,
625637
ADD_COLUMN,
@@ -706,6 +718,7 @@ void testParsingFullDefinitionFromString() throws Exception {
706718
null,
707719
new Configuration(),
708720
ImmutableSet.of(
721+
ALTER_TABLE_COMMENT,
709722
DROP_COLUMN,
710723
ALTER_COLUMN_TYPE,
711724
ADD_COLUMN,
@@ -742,6 +755,7 @@ void testParsingFullDefinitionFromString() throws Exception {
742755
null,
743756
new Configuration(),
744757
ImmutableSet.of(
758+
ALTER_TABLE_COMMENT,
745759
DROP_COLUMN,
746760
ALTER_COLUMN_TYPE,
747761
ADD_COLUMN,
@@ -797,6 +811,7 @@ void testParsingFullDefinitionFromString() throws Exception {
797811
.put("password", "")
798812
.build()),
799813
ImmutableSet.of(
814+
ALTER_TABLE_COMMENT,
800815
DROP_COLUMN,
801816
ALTER_COLUMN_TYPE,
802817
ADD_COLUMN,

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.flink.cdc.common.schema.Schema;
2323
import org.apache.flink.cdc.common.types.DataType;
2424

25+
import javax.annotation.Nullable;
26+
2527
import java.util.HashMap;
2628
import java.util.Map;
2729
import java.util.Objects;
@@ -44,26 +46,46 @@ public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema, Sch
4446

4547
private final Map<String, DataType> oldTypeMapping;
4648

47-
public AlterColumnTypeEvent(TableId tableId, Map<String, DataType> typeMapping) {
49+
/** key => column name, value => column comment after changing. */
50+
private final Map<String, String> comments;
51+
52+
public AlterColumnTypeEvent(
53+
TableId tableId,
54+
Map<String, DataType> typeMapping,
55+
Map<String, DataType> oldTypeMapping,
56+
Map<String, String> comments) {
4857
this.tableId = tableId;
4958
this.typeMapping = typeMapping;
50-
this.oldTypeMapping = new HashMap<>();
59+
this.oldTypeMapping = oldTypeMapping;
60+
this.comments = comments;
61+
}
62+
63+
public AlterColumnTypeEvent(TableId tableId, Map<String, DataType> typeMapping) {
64+
this(tableId, typeMapping, new HashMap<>());
5165
}
5266

5367
public AlterColumnTypeEvent(
5468
TableId tableId,
5569
Map<String, DataType> typeMapping,
5670
Map<String, DataType> oldTypeMapping) {
57-
this.tableId = tableId;
58-
this.typeMapping = typeMapping;
59-
this.oldTypeMapping = oldTypeMapping;
71+
this(tableId, typeMapping, oldTypeMapping, new HashMap<>());
72+
}
73+
74+
public void addColumnComment(String columnName, @Nullable String comment) {
75+
if (comment != null) {
76+
this.comments.put(columnName, comment);
77+
}
6078
}
6179

6280
/** Returns the type mapping. */
6381
public Map<String, DataType> getTypeMapping() {
6482
return typeMapping;
6583
}
6684

85+
public Map<String, String> getComments() {
86+
return comments;
87+
}
88+
6789
@Override
6890
public boolean equals(Object o) {
6991
if (this == o) {
@@ -75,12 +97,13 @@ public boolean equals(Object o) {
7597
AlterColumnTypeEvent that = (AlterColumnTypeEvent) o;
7698
return Objects.equals(tableId, that.tableId)
7799
&& Objects.equals(typeMapping, that.typeMapping)
78-
&& Objects.equals(oldTypeMapping, that.oldTypeMapping);
100+
&& Objects.equals(oldTypeMapping, that.oldTypeMapping)
101+
&& Objects.equals(comments, that.comments);
79102
}
80103

81104
@Override
82105
public int hashCode() {
83-
return Objects.hash(tableId, typeMapping, oldTypeMapping);
106+
return Objects.hash(tableId, typeMapping, oldTypeMapping, comments);
84107
}
85108

86109
@Override
@@ -93,13 +116,17 @@ public String toString() {
93116
+ typeMapping
94117
+ ", oldTypeMapping="
95118
+ oldTypeMapping
119+
+ ", comments="
120+
+ comments
96121
+ '}';
97122
} else {
98123
return "AlterColumnTypeEvent{"
99124
+ "tableId="
100125
+ tableId
101126
+ ", typeMapping="
102127
+ typeMapping
128+
+ ", comments="
129+
+ comments
103130
+ '}';
104131
}
105132
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.event;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
import org.apache.flink.cdc.common.utils.Preconditions;
22+
23+
import java.util.Objects;
24+
25+
/**
26+
* A {@link SchemaChangeEvent} that represents an {@code ALTER TABLE COMMENT} or {@code ALTER TABLE
27+
* SET COMMENT} DDL.
28+
*/
29+
@PublicEvolving
30+
public class AlterTableCommentEvent implements SchemaChangeEvent {
31+
32+
private static final long serialVersionUID = 1L;
33+
34+
private final TableId tableId;
35+
private final String comment;
36+
37+
public AlterTableCommentEvent(TableId tableId, String comment) {
38+
Preconditions.checkArgument(comment != null, "comment should not be null.");
39+
this.tableId = tableId;
40+
this.comment = comment;
41+
}
42+
43+
public String getComment() {
44+
return comment;
45+
}
46+
47+
@Override
48+
public SchemaChangeEventType getType() {
49+
return SchemaChangeEventType.ALTER_TABLE_COMMENT;
50+
}
51+
52+
@Override
53+
public SchemaChangeEvent copy(TableId newTableId) {
54+
return new AlterTableCommentEvent(newTableId, comment);
55+
}
56+
57+
@Override
58+
public TableId tableId() {
59+
return tableId;
60+
}
61+
62+
@Override
63+
public boolean equals(Object o) {
64+
if (this == o) {
65+
return true;
66+
}
67+
if (!(o instanceof AlterTableCommentEvent)) {
68+
return false;
69+
}
70+
AlterTableCommentEvent that = (AlterTableCommentEvent) o;
71+
return Objects.equals(tableId, that.tableId) && Objects.equals(comment, that.comment);
72+
}
73+
74+
@Override
75+
public int hashCode() {
76+
return Objects.hash(tableId, comment);
77+
}
78+
79+
@Override
80+
public String toString() {
81+
return "AlterTableCommentEvent{"
82+
+ "tableId="
83+
+ tableId
84+
+ ", comment='"
85+
+ comment
86+
+ '\''
87+
+ '}';
88+
}
89+
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ public enum SchemaChangeEventType {
2828
DROP_COLUMN("drop.column"),
2929
DROP_TABLE("drop.table"),
3030
RENAME_COLUMN("rename.column"),
31-
TRUNCATE_TABLE("truncate.table");
31+
TRUNCATE_TABLE("truncate.table"),
32+
ALTER_TABLE_COMMENT("alter.table.comment");
3233

3334
private final String tag;
3435

@@ -55,6 +56,8 @@ public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
5556
return RENAME_COLUMN;
5657
} else if (event instanceof TruncateTableEvent) {
5758
return TRUNCATE_TABLE;
59+
} else if (event instanceof AlterTableCommentEvent) {
60+
return ALTER_TABLE_COMMENT;
5861
} else {
5962
throw new RuntimeException("Unknown schema change event type: " + event.getClass());
6063
}
@@ -76,6 +79,8 @@ public static SchemaChangeEventType ofTag(String tag) {
7679
return RENAME_COLUMN;
7780
case "truncate.table":
7881
return TRUNCATE_TABLE;
82+
case "alter.table.comment":
83+
return ALTER_TABLE_COMMENT;
7984
default:
8085
throw new RuntimeException("Unknown schema change event type: " + tag);
8186
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
2323
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
24+
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_TABLE_COMMENT;
2425
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
2526
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
2627
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
@@ -44,7 +45,9 @@ public class SchemaChangeEventTypeFamily {
4445

4546
public static final SchemaChangeEventType[] RENAME = {RENAME_COLUMN};
4647

47-
public static final SchemaChangeEventType[] TABLE = {CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE};
48+
public static final SchemaChangeEventType[] TABLE = {
49+
CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE, ALTER_TABLE_COMMENT
50+
};
4851

4952
public static final SchemaChangeEventType[] COLUMN = {
5053
ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN
@@ -57,7 +60,8 @@ public class SchemaChangeEventTypeFamily {
5760
DROP_COLUMN,
5861
DROP_TABLE,
5962
RENAME_COLUMN,
60-
TRUNCATE_TABLE
63+
TRUNCATE_TABLE,
64+
ALTER_TABLE_COMMENT
6165
};
6266

6367
public static final SchemaChangeEventType[] NONE = {};
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.event.visitor;
19+
20+
import org.apache.flink.cdc.common.annotation.Internal;
21+
import org.apache.flink.cdc.common.event.AlterTableCommentEvent;
22+
23+
/** Visitor for {@link AlterTableCommentEvent}s. */
24+
@Internal
25+
@FunctionalInterface
26+
public interface AlterTableCommentEventVisitor<T, E extends Throwable> {
27+
T visit(AlterTableCommentEvent event) throws E;
28+
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.cdc.common.annotation.Internal;
2121
import org.apache.flink.cdc.common.event.AddColumnEvent;
2222
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
23+
import org.apache.flink.cdc.common.event.AlterTableCommentEvent;
2324
import org.apache.flink.cdc.common.event.CreateTableEvent;
2425
import org.apache.flink.cdc.common.event.DropColumnEvent;
2526
import org.apache.flink.cdc.common.event.DropTableEvent;
@@ -38,7 +39,8 @@ public static <T, E extends Throwable> T visit(
3839
DropColumnEventVisitor<T, E> dropColumnEventVisitor,
3940
DropTableEventVisitor<T, E> dropTableEventVisitor,
4041
RenameColumnEventVisitor<T, E> renameColumnEventVisitor,
41-
TruncateTableEventVisitor<T, E> truncateTableEventVisitor)
42+
TruncateTableEventVisitor<T, E> truncateTableEventVisitor,
43+
AlterTableCommentEventVisitor<T, E> alterTableCommentEventVisitor)
4244
throws E {
4345
if (event instanceof AddColumnEvent) {
4446
if (addColumnVisitor == null) {
@@ -75,6 +77,11 @@ public static <T, E extends Throwable> T visit(
7577
return null;
7678
}
7779
return truncateTableEventVisitor.visit((TruncateTableEvent) event);
80+
} else if (event instanceof AlterTableCommentEvent) {
81+
if (alterTableCommentEventVisitor == null) {
82+
return null;
83+
}
84+
return alterTableCommentEventVisitor.visit((AlterTableCommentEvent) event);
7885
} else {
7986
throw new IllegalArgumentException(
8087
"Unknown schema change event type " + event.getType());

0 commit comments

Comments
 (0)