Skip to content

Commit 1796eb6

Browse files
authored
[FLINK-38596][pipeline-connector][maxcompute] Fix the column comment of CreateTableEvent and AddColumnEvent (#4215)
1 parent a40b870 commit 1796eb6

3 files changed

Lines changed: 55 additions & 4 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,15 @@ public static void addColumns(
122122
sqlBuilder
123123
.append(addColumn.getAddColumn().getName())
124124
.append(" ")
125-
.append(string(addColumn.getAddColumn().getType()))
126-
.append(" comment '")
127-
.append(addColumn.getAddColumn().getType().asSummaryString())
128-
.append("',");
125+
.append(string(addColumn.getAddColumn().getType()));
126+
// Add comment if available
127+
if (addColumn.getAddColumn().getComment() != null) {
128+
sqlBuilder
129+
.append(" comment '")
130+
.append(addColumn.getAddColumn().getComment())
131+
.append("'");
132+
}
133+
sqlBuilder.append(",");
129134
} else {
130135
throw new UnsupportedOperationException(
131136
"Not support position: "

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ public static Column toMaxCompute(
117117
if (!type.isNullable() || notNull) {
118118
columnBuilder.notNull();
119119
}
120+
// Set column comment if available
121+
if (flinkColumn.getComment() != null) {
122+
columnBuilder.withComment(flinkColumn.getComment());
123+
}
120124
return columnBuilder.build();
121125
}
122126

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtilsTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,4 +184,46 @@ void testRecordConvert() {
184184
"char,varchar,string,false,=01=02=03=04=05,=01=02=03=04=05=06=07=08=09=0A,0.00,1,2,12345,12345,123.456,123456.789,00:20:34.567,2003-10-20,1970-01-01T00:00,1970-01-01T00:00:00Z,1970-01-01T00:00:00Z";
185185
assertThat(arrayRecord).hasToString(expect);
186186
}
187+
188+
@Test
189+
void testColumnCommentConversion() {
190+
// Test column with comment
191+
org.apache.flink.cdc.common.schema.Column columnWithComment =
192+
org.apache.flink.cdc.common.schema.Column.physicalColumn(
193+
"user_id", DataTypes.BIGINT(), "Primary key for user ID");
194+
com.aliyun.odps.Column maxComputeColumn =
195+
TypeConvertUtils.toMaxCompute(columnWithComment, false);
196+
197+
assertThat(maxComputeColumn.getName()).isEqualTo("user_id");
198+
assertThat(maxComputeColumn.getTypeInfo().getTypeName().toLowerCase()).isEqualTo("bigint");
199+
assertThat(maxComputeColumn.getComment()).isEqualTo("Primary key for user ID");
200+
201+
// Test column without comment
202+
org.apache.flink.cdc.common.schema.Column columnWithoutComment =
203+
org.apache.flink.cdc.common.schema.Column.physicalColumn(
204+
"name", DataTypes.STRING());
205+
com.aliyun.odps.Column maxComputeColumnNoComment =
206+
TypeConvertUtils.toMaxCompute(columnWithoutComment, false);
207+
208+
assertThat(maxComputeColumnNoComment.getName()).isEqualTo("name");
209+
assertThat(maxComputeColumnNoComment.getTypeInfo().getTypeName().toLowerCase())
210+
.isEqualTo("string");
211+
assertThat(maxComputeColumnNoComment.getComment()).isNull();
212+
213+
// Test schema conversion with comments
214+
Schema schemaWithComments =
215+
Schema.newBuilder()
216+
.physicalColumn("id", DataTypes.BIGINT(), "Primary key ID")
217+
.physicalColumn("username", DataTypes.STRING(), "User name")
218+
.physicalColumn("email", DataTypes.STRING(), "Email address")
219+
.build();
220+
221+
TableSchema maxComputeSchema = TypeConvertUtils.toMaxCompute(schemaWithComments);
222+
List<com.aliyun.odps.Column> columns = maxComputeSchema.getAllColumns();
223+
224+
assertThat(columns).hasSize(3);
225+
assertThat(columns.get(0).getComment()).isEqualTo("Primary key ID");
226+
assertThat(columns.get(1).getComment()).isEqualTo("User name");
227+
assertThat(columns.get(2).getComment()).isEqualTo("Email address");
228+
}
187229
}

0 commit comments

Comments
 (0)