Skip to content

Commit 6f74663

Browse files
authored
[Improve][mysql-cdc] Fallback to desc table when show create table failed (#6701)
* [Improve][mysql-cdc] Fallback to desc table when show create table failed * Update MySqlSchema.java
1 parent 1b269e8 commit 6f74663

File tree

3 files changed

+339
-32
lines changed

3 files changed

+339
-32
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
19+
20+
import io.debezium.relational.TableId;
21+
import lombok.Builder;
22+
import lombok.Getter;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.stream.Collectors;
27+
28+
public class MySqlDdlBuilder {
29+
private final TableId tableId;
30+
private final List<Column> columns;
31+
private List<String> primaryKeys;
32+
33+
public MySqlDdlBuilder(TableId tableId) {
34+
this.tableId = tableId;
35+
this.columns = new ArrayList<>();
36+
this.primaryKeys = new ArrayList<>();
37+
}
38+
39+
public MySqlDdlBuilder addColumn(Column column) {
40+
columns.add(column);
41+
if (column.isPrimaryKey()) {
42+
primaryKeys.add(column.getColumnName());
43+
}
44+
return this;
45+
}
46+
47+
public String generateDdl() {
48+
String columnDefinitions =
49+
columns.stream().map(Column::generateDdl).collect(Collectors.joining(", "));
50+
String keyDefinitions =
51+
primaryKeys.stream()
52+
.map(MySqlUtils::quote)
53+
.collect(Collectors.joining(", ", "PRIMARY KEY (", ")"));
54+
return String.format(
55+
"CREATE TABLE %s (%s, %s)", tableId.table(), columnDefinitions, keyDefinitions);
56+
}
57+
58+
@Getter
59+
@Builder
60+
public static class Column {
61+
private String columnName;
62+
private String columnType;
63+
private boolean nullable;
64+
private boolean primaryKey;
65+
private boolean uniqueKey;
66+
private String defaultValue;
67+
private String extra;
68+
69+
public String generateDdl() {
70+
return MySqlUtils.quote(columnName)
71+
+ " "
72+
+ columnType
73+
+ " "
74+
+ (nullable ? "" : "NOT NULL");
75+
}
76+
}
77+
}

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java

+77-32
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
1919

2020
import org.apache.seatunnel.api.table.catalog.CatalogTable;
21+
import org.apache.seatunnel.common.utils.SeaTunnelException;
2122
import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
2223
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
2324

@@ -30,14 +31,17 @@
3031
import io.debezium.relational.history.TableChanges;
3132
import io.debezium.relational.history.TableChanges.TableChange;
3233
import io.debezium.schema.SchemaChangeEvent;
34+
import lombok.extern.slf4j.Slf4j;
3335

3436
import java.sql.SQLException;
3537
import java.time.Instant;
3638
import java.util.HashMap;
3739
import java.util.List;
3840
import java.util.Map;
41+
import java.util.concurrent.atomic.AtomicReference;
3942

4043
/** A component used to get schema by table path. */
44+
@Slf4j
4145
public class MySqlSchema {
4246
private static final String SHOW_CREATE_TABLE = "SHOW CREATE TABLE ";
4347
private static final String DESC_TABLE = "DESC ";
@@ -74,43 +78,84 @@ public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
7478
}
7579

7680
private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
77-
final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
78-
final String sql = SHOW_CREATE_TABLE + MySqlUtils.quote(tableId);
81+
Map<TableId, TableChange> tableChangeMap = new HashMap<>();
7982
try {
80-
jdbc.query(
81-
sql,
82-
rs -> {
83-
if (rs.next()) {
84-
final String ddl = rs.getString(2);
85-
final MySqlOffsetContext offsetContext =
86-
MySqlOffsetContext.initial(connectorConfig);
87-
List<SchemaChangeEvent> schemaChangeEvents =
88-
databaseSchema.parseSnapshotDdl(
89-
ddl, tableId.catalog(), offsetContext, Instant.now());
90-
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
91-
for (TableChange tableChange :
92-
schemaChangeEvent.getTableChanges()) {
93-
Table table =
94-
CatalogTableUtils.mergeCatalogTableConfig(
95-
tableChange.getTable(), tableMap.get(tableId));
96-
TableChange newTableChange =
97-
new TableChange(
98-
TableChanges.TableChangeType.CREATE, table);
99-
tableChangeMap.put(tableId, newTableChange);
100-
}
101-
}
102-
}
103-
});
104-
} catch (SQLException e) {
105-
throw new RuntimeException(
106-
String.format("Failed to read schema for table %s by running %s", tableId, sql),
107-
e);
83+
tableChangeMap = getTableSchemaByShowCreateTable(jdbc, tableId);
84+
if (tableChangeMap.isEmpty()) {
85+
log.debug("Load schema is empty for table {}", tableId);
86+
}
87+
} catch (Exception e) {
88+
log.debug("Ignore exception when execute `SHOW CREATE TABLE {}` failed", tableId, e);
89+
}
90+
if (tableChangeMap.isEmpty()) {
91+
try {
92+
log.info("Fallback to use `DESC {}` load schema", tableId);
93+
tableChangeMap = getTableSchemaByDescTable(jdbc, tableId);
94+
} catch (SQLException ex) {
95+
throw new SeaTunnelException(
96+
String.format("Failed to read schema for table %s", tableId), ex);
97+
}
10898
}
10999
if (!tableChangeMap.containsKey(tableId)) {
110-
throw new RuntimeException(
111-
String.format("Can't obtain schema for table %s by running %s", tableId, sql));
100+
throw new RuntimeException(String.format("Can't obtain schema for table %s", tableId));
112101
}
113102

114103
return tableChangeMap.get(tableId);
115104
}
105+
106+
private Map<TableId, TableChange> getTableSchemaByShowCreateTable(
107+
JdbcConnection jdbc, TableId tableId) throws SQLException {
108+
AtomicReference<String> ddl = new AtomicReference<>();
109+
String sql = SHOW_CREATE_TABLE + MySqlUtils.quote(tableId);
110+
jdbc.query(
111+
sql,
112+
rs -> {
113+
rs.next();
114+
ddl.set(rs.getString(2));
115+
});
116+
return parseSnapshotDdl(tableId, ddl.get());
117+
}
118+
119+
private Map<TableId, TableChange> getTableSchemaByDescTable(
120+
JdbcConnection jdbc, TableId tableId) throws SQLException {
121+
MySqlDdlBuilder ddlBuilder = new MySqlDdlBuilder(tableId);
122+
String sql = DESC_TABLE + MySqlUtils.quote(tableId);
123+
jdbc.query(
124+
sql,
125+
rs -> {
126+
while (rs.next()) {
127+
ddlBuilder.addColumn(
128+
MySqlDdlBuilder.Column.builder()
129+
.columnName(rs.getString("Field"))
130+
.columnType(rs.getString("Type"))
131+
.nullable(rs.getString("Null").equalsIgnoreCase("YES"))
132+
.primaryKey("PRI".equals(rs.getString("Key")))
133+
.uniqueKey("UNI".equals(rs.getString("Key")))
134+
.defaultValue(rs.getString("Default"))
135+
.extra(rs.getString("Extra"))
136+
.build());
137+
}
138+
});
139+
140+
return parseSnapshotDdl(tableId, ddlBuilder.generateDdl());
141+
}
142+
143+
private Map<TableId, TableChange> parseSnapshotDdl(TableId tableId, String ddl) {
144+
Map<TableId, TableChange> tableChangeMap = new HashMap<>();
145+
final MySqlOffsetContext offsetContext = MySqlOffsetContext.initial(connectorConfig);
146+
List<SchemaChangeEvent> schemaChangeEvents =
147+
databaseSchema.parseSnapshotDdl(
148+
ddl, tableId.catalog(), offsetContext, Instant.now());
149+
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
150+
for (TableChange tableChange : schemaChangeEvent.getTableChanges()) {
151+
Table table =
152+
CatalogTableUtils.mergeCatalogTableConfig(
153+
tableChange.getTable(), tableMap.get(tableId));
154+
TableChange newTableChange =
155+
new TableChange(TableChanges.TableChangeType.CREATE, table);
156+
tableChangeMap.put(tableId, newTableChange);
157+
}
158+
}
159+
return tableChangeMap;
160+
}
116161
}

0 commit comments

Comments
 (0)