Skip to content

Commit eb54fdd

Browse files
authored
[Fix][Paimon] nullable and comment attribute was lost during automatic table creation (#9020)
1 parent a5aafa7 commit eb54fdd

File tree

12 files changed

+244
-22
lines changed

12 files changed

+244
-22
lines changed

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ public CatalogTable getTable(TablePath tablePath)
211211
Connection conn = getConnection(dbUrl);
212212
try {
213213
DatabaseMetaData metaData = conn.getMetaData();
214+
Optional<String> comment = getTableComment(metaData, tablePath);
214215
Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData, tablePath);
215216
List<ConstraintKey> constraintKeys = getConstraintKeys(metaData, tablePath);
216217
TableSchema.Builder tableSchemaBuilder =
@@ -225,7 +226,7 @@ public CatalogTable getTable(TablePath tablePath)
225226
tableSchemaBuilder.build(),
226227
buildConnectorOptions(tablePath),
227228
Collections.emptyList(),
228-
"",
229+
comment.orElse(""),
229230
catalogName);
230231

231232
} catch (SeaTunnelRuntimeException e) {
@@ -283,6 +284,21 @@ protected Optional<PrimaryKey> getPrimaryKey(
283284
return CatalogUtils.getPrimaryKey(metaData, TablePath.of(database, schema, table));
284285
}
285286

287+
protected Optional<String> getTableComment(DatabaseMetaData metaData, TablePath tablePath)
288+
throws SQLException {
289+
return getTableComment(
290+
metaData,
291+
tablePath.getDatabaseName(),
292+
tablePath.getSchemaName(),
293+
tablePath.getTableName());
294+
}
295+
296+
protected Optional<String> getTableComment(
297+
DatabaseMetaData metaData, String database, String schema, String table)
298+
throws SQLException {
299+
return CatalogUtils.getTableComment(metaData, TablePath.of(database, schema, table));
300+
}
301+
286302
protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath)
287303
throws SQLException {
288304
return getConstraintKeys(

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java

+15
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,21 @@ public static String quoteTableIdentifier(String identifier, String fieldIde) {
101101
return getFieldIde(identifier, fieldIde);
102102
}
103103

104+
public static Optional<String> getTableComment(DatabaseMetaData metaData, TablePath tablePath)
105+
throws SQLException {
106+
try (ResultSet rs =
107+
metaData.getTables(
108+
tablePath.getDatabaseName(),
109+
tablePath.getSchemaName(),
110+
tablePath.getTableName(),
111+
new String[] {"TABLE"})) {
112+
if (rs.next()) {
113+
return Optional.ofNullable(rs.getString("REMARKS"));
114+
}
115+
}
116+
return Optional.empty();
117+
}
118+
104119
public static Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData, TablePath tablePath)
105120
throws SQLException {
106121
// According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,

Diff for: seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ void testConstraintKeysNameWithOutSpecialChar() throws SQLException {
5252
}
5353

5454
@Test
55-
void testGetCommentWithJdbcDialectTypeMapper() throws SQLException {
55+
void testGetTableCommentWithJdbcDialectTypeMapper() throws SQLException {
5656
TableSchema tableSchema =
5757
CatalogUtils.getTableSchema(
5858
new TestDatabaseMetaData(),

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
163163
try {
164164
Schema paimonSchema =
165165
SchemaUtil.toPaimonSchema(
166-
table.getTableSchema(), new PaimonSinkConfig(readonlyConfig));
166+
table.getTableSchema(),
167+
new PaimonSinkConfig(readonlyConfig),
168+
table.getComment());
167169
catalog.createTable(toIdentifier(tablePath), paimonSchema, ignoreIfExists);
168170
} catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException e) {
169171
throw new TableAlreadyExistException(this.catalogName, tablePath);
@@ -276,7 +278,7 @@ private CatalogTable toCatalogTable(
276278
builder.build(),
277279
paimonFileStoreTableTable.options(),
278280
partitionKeys,
279-
null,
281+
paimonFileStoreTableTable.comment().orElse(null),
280282
catalogName);
281283
}
282284

@@ -343,4 +345,17 @@ public void alterTable(
343345
throw new CatalogException("ColumnNotExistException: {}", e);
344346
}
345347
}
348+
349+
public void alterTable(
350+
Identifier identifier, List<SchemaChange> schemaChanges, boolean ignoreIfNotExists) {
351+
try {
352+
catalog.alterTable(identifier, schemaChanges, true);
353+
} catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
354+
throw new CatalogException("TableNotExistException: {}", e);
355+
} catch (org.apache.paimon.catalog.Catalog.ColumnAlreadyExistException e) {
356+
throw new CatalogException("ColumnAlreadyExistException: {}", e);
357+
} catch (org.apache.paimon.catalog.Catalog.ColumnNotExistException e) {
358+
throw new CatalogException("ColumnNotExistException: {}", e);
359+
}
360+
}
346361
}

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@
4141

4242
import lombok.extern.slf4j.Slf4j;
4343

44+
import java.util.ArrayList;
45+
import java.util.List;
46+
4447
import static org.apache.seatunnel.connectors.seatunnel.paimon.sink.schema.UpdatedDataFields.canConvert;
4548

4649
@Slf4j
@@ -95,13 +98,16 @@ private void applySingleSchemaChangeEvent(SchemaChangeEvent event) {
9598
? null
9699
: SchemaChange.Move.after(column.getName(), afterColumnName);
97100
BasicTypeDefine<DataType> reconvertColumn = PaimonTypeMapper.INSTANCE.reconvert(column);
98-
SchemaChange schemaChange =
101+
DataType nativeType = reconvertColumn.getNativeType();
102+
List<SchemaChange> schemaChanges = new ArrayList<>();
103+
schemaChanges.add(
99104
SchemaChange.addColumn(
100-
column.getName(),
101-
reconvertColumn.getNativeType(),
102-
column.getComment(),
103-
move);
104-
paimonCatalog.alterTable(identifier, schemaChange, false);
105+
column.getName(), nativeType.copy(true), column.getComment(), move));
106+
if (!nativeType.isNullable()) {
107+
schemaChanges.add(
108+
SchemaChange.updateColumnType(column.getName(), nativeType.copy(false)));
109+
}
110+
paimonCatalog.alterTable(identifier, schemaChanges, false);
105111
} else if (event instanceof AlterTableDropColumnEvent) {
106112
String columnName = ((AlterTableDropColumnEvent) event).getColumn();
107113
paimonCatalog.alterTable(identifier, SchemaChange.dropColumn(columnName), true);

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ public BasicTypeDefine<DataType> visit(Column column) {
284284
int timestampScale =
285285
Objects.isNull(scale) ? TimestampType.DEFAULT_PRECISION : scale;
286286
TimestampType timestampType = DataTypes.TIMESTAMP(timestampScale);
287-
builder.nativeType(timestampType);
287+
builder.nativeType(timestampType.copy(column.isNullable()));
288288
builder.dataType(timestampType.getTypeRoot().name());
289289
builder.columnType(timestampType.toString());
290290
builder.scale(timestampScale);
@@ -293,7 +293,7 @@ public BasicTypeDefine<DataType> visit(Column column) {
293293
case TIME:
294294
int timeScale = Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION : scale;
295295
TimeType timeType = DataTypes.TIME(timeScale);
296-
builder.nativeType(timeType);
296+
builder.nativeType(timeType.copy(column.isNullable()));
297297
builder.columnType(timeType.toString());
298298
builder.dataType(timeType.getTypeRoot().name());
299299
builder.scale(timeScale);
@@ -356,15 +356,15 @@ public BasicTypeDefine<DataType> visit(Column column) {
356356
}
357357

358358
DecimalType paimonDecimalType = DataTypes.DECIMAL(precision, scale);
359-
builder.nativeType(paimonDecimalType);
359+
builder.nativeType(paimonDecimalType.copy(column.isNullable()));
360360
builder.columnType(paimonDecimalType.toString());
361361
builder.dataType(paimonDecimalType.getTypeRoot().name());
362362
builder.scale(scale);
363363
builder.precision((long) precision);
364364
builder.length(column.getColumnLength());
365365
return builder.build();
366366
default:
367-
builder.nativeType(visit(column.getName(), dataType));
367+
builder.nativeType(visit(column.getName(), dataType).copy(column.isNullable()));
368368
builder.columnType(dataType.toString());
369369
builder.length(column.getColumnLength());
370370
builder.dataType(dataType.getSqlType().name());

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import org.apache.paimon.CoreOptions;
2929
import org.apache.paimon.schema.Schema;
30+
import org.apache.paimon.shade.org.apache.commons.lang.StringUtils;
3031
import org.apache.paimon.types.DataField;
3132
import org.apache.paimon.types.DataType;
3233

@@ -44,11 +45,16 @@ public static DataType toPaimonType(Column column) {
4445
}
4546

4647
public static Schema toPaimonSchema(
47-
TableSchema tableSchema, PaimonSinkConfig paimonSinkConfig) {
48+
TableSchema tableSchema, PaimonSinkConfig paimonSinkConfig, String comment) {
4849
Schema.Builder paiSchemaBuilder = Schema.newBuilder();
4950
for (int i = 0; i < tableSchema.getColumns().size(); i++) {
5051
Column column = tableSchema.getColumns().get(i);
51-
paiSchemaBuilder.column(column.getName(), toPaimonType(column));
52+
if (StringUtils.isNotBlank(column.getComment())) {
53+
paiSchemaBuilder.column(
54+
column.getName(), toPaimonType(column), column.getComment());
55+
} else {
56+
paiSchemaBuilder.column(column.getName(), toPaimonType(column));
57+
}
5258
}
5359
List<String> primaryKeys = paimonSinkConfig.getPrimaryKeys();
5460
if (primaryKeys.isEmpty() && Objects.nonNull(tableSchema.getPrimaryKey())) {
@@ -69,6 +75,9 @@ public static Schema toPaimonSchema(
6975
if (!writeProps.isEmpty()) {
7076
paiSchemaBuilder.options(writeProps);
7177
}
78+
if (StringUtils.isNotBlank(comment)) {
79+
paiSchemaBuilder.comment(comment);
80+
}
7281
return paiSchemaBuilder.build();
7382
}
7483

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
22+
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
23+
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
24+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
25+
import org.apache.seatunnel.api.table.catalog.TablePath;
26+
import org.apache.seatunnel.api.table.catalog.TableSchema;
27+
import org.apache.seatunnel.api.table.type.BasicType;
28+
29+
import org.apache.paimon.catalog.Catalog;
30+
import org.apache.paimon.catalog.CatalogContext;
31+
import org.apache.paimon.catalog.CatalogFactory;
32+
import org.apache.paimon.catalog.Identifier;
33+
import org.apache.paimon.fs.Path;
34+
import org.apache.paimon.table.FileStoreTable;
35+
36+
import org.junit.jupiter.api.AfterEach;
37+
import org.junit.jupiter.api.Assertions;
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.Test;
40+
41+
import lombok.extern.slf4j.Slf4j;
42+
43+
import java.util.ArrayList;
44+
import java.util.Collections;
45+
import java.util.HashMap;
46+
import java.util.Map;
47+
48+
@Slf4j
49+
public class PaimonWithCommentTest {
50+
51+
private PaimonCatalog paimonCatalog;
52+
private TableSchema.Builder schemaBuilder;
53+
private final String CATALOG_NAME = "paimon_catalog";
54+
private final String DATABASE_NAME = "default";
55+
private final String TABLE_NAME = "test_with_comment";
56+
private final String warehousePath = "/tmp/paimon";
57+
private Catalog catalog;
58+
59+
@BeforeEach
60+
public void before() {
61+
Map<String, Object> properties = new HashMap<>();
62+
properties.put("warehouse", warehousePath);
63+
properties.put("plugin_name", "Paimon");
64+
properties.put("database", DATABASE_NAME);
65+
properties.put("table", TABLE_NAME);
66+
Map<String, String> writeProps = new HashMap<>();
67+
writeProps.put("bucket", "1");
68+
properties.put("paimon.table.write-props", writeProps);
69+
ReadonlyConfig config = ReadonlyConfig.fromMap(properties);
70+
CatalogContext catalogContext = CatalogContext.create(new Path(warehousePath));
71+
catalog = CatalogFactory.createCatalog(catalogContext);
72+
paimonCatalog = new PaimonCatalog(CATALOG_NAME, config);
73+
paimonCatalog.open();
74+
paimonCatalog.createDatabase(TablePath.of(DATABASE_NAME, TABLE_NAME), true);
75+
this.schemaBuilder =
76+
TableSchema.builder()
77+
.column(
78+
PhysicalColumn.of(
79+
"c_string",
80+
BasicType.STRING_TYPE,
81+
(Long) null,
82+
true,
83+
null,
84+
"c_string"))
85+
.column(
86+
PhysicalColumn.of(
87+
"c_int",
88+
BasicType.INT_TYPE,
89+
(Long) null,
90+
false,
91+
null,
92+
"c_int"))
93+
.column(
94+
PhysicalColumn.of(
95+
"c_bigint",
96+
BasicType.LONG_TYPE,
97+
(Long) null,
98+
false,
99+
null,
100+
"c_bigint"));
101+
}
102+
103+
@Test
104+
public void testCreateTableWithCommentAndNullable() throws Catalog.TableNotExistException {
105+
TableSchema tableSchema =
106+
schemaBuilder
107+
.primaryKey(PrimaryKey.of("pk", Collections.singletonList("c_int")))
108+
.build();
109+
CatalogTable catalogTable =
110+
CatalogTable.of(
111+
TableIdentifier.of(CATALOG_NAME, DATABASE_NAME, TABLE_NAME),
112+
tableSchema,
113+
new HashMap<>(),
114+
new ArrayList<>(),
115+
"test table");
116+
paimonCatalog.createTable(
117+
TablePath.of(DATABASE_NAME, null, TABLE_NAME), catalogTable, true);
118+
119+
FileStoreTable table =
120+
(FileStoreTable) catalog.getTable(Identifier.create(DATABASE_NAME, TABLE_NAME));
121+
Assertions.assertEquals("test table", table.comment().get());
122+
table.schema()
123+
.fields()
124+
.forEach(
125+
field -> {
126+
Assertions.assertEquals(field.name(), field.description());
127+
if (field.name().equals("c_string")) {
128+
Assertions.assertTrue(field.type().isNullable());
129+
} else {
130+
Assertions.assertFalse(field.type().isNullable());
131+
}
132+
});
133+
}
134+
135+
@AfterEach
136+
public void after() {
137+
paimonCatalog.dropDatabase(TablePath.of(DATABASE_NAME, TABLE_NAME), false);
138+
paimonCatalog.close();
139+
}
140+
}

0 commit comments

Comments
 (0)