Skip to content

Commit fb8695c

Browse files
authored
Merge pull request #528 from tapdata/TAP-6244-connector
fix: TAP-6244 optimize pg error code when create publication failed
2 parents 8fbc011 + 0f5c997 commit fb8695c

File tree

2 files changed

+71
-2
lines changed

2 files changed

+71
-2
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.tapdata.connector.postgres.error;
2+
3+
import io.tapdata.exception.TapExClass;
4+
import io.tapdata.exception.TapExCode;
5+
import io.tapdata.exception.TapExLevel;
6+
import io.tapdata.exception.TapExType;
7+
8+
@TapExClass(
9+
code = 41,
10+
module = "postgres-connector",
11+
describe = "Postgres Error Code",
12+
prefix = "PG")
13+
public interface PostgresErrorCode {
14+
@TapExCode(
15+
describe = "Failed to create publication, please analyze the specific error message. \n" +
16+
"Common errors and reasons: \n" +
17+
" ERROR: permission denied for database. \n" +
18+
"The current user does not have permission to create publications.",
19+
describeCN = "创建 publication 失败,需结合具体报错信息分析。\n" +
20+
"常见的报错及原因:\n" +
21+
"ERROR: permission denied for database. \n" +
22+
"当前用户没有创建 publication 的权限。",
23+
solution = "Solution (choose one): \n" +
24+
"1. Use a superuser connection (such as postgres). \n" +
25+
"2. Grant the current user the required permissions and execute: ALTER USER username REPLICATION;",
26+
solutionCN = "解决方案(任选其一):\n" +
27+
"1. 使用超级用户连接(例如 postgres)。\n" +
28+
"2. 赋予当前用户所需权限,执行:ALTER USER username REPLICATION;",
29+
dynamicDescription = "Execute sql failed: {}",
30+
dynamicDescriptionCN = "执行语句失败:{}",
31+
level = TapExLevel.CRITICAL,
32+
type = TapExType.RUNTIME
33+
)
34+
String CREATE_PUBLICATION_FAILED = "410001";
35+
36+
@TapExCode(
37+
describe = "Failed to select publication, please analyze the specific error message. \n" +
38+
"Common errors and reasons: \n" +
39+
"ERROR: must be superuser or replication role to use replication slots. \n" +
40+
"The current user does not have permission to create publications.",
41+
describeCN = "查询 publication 失败,需结合具体报错信息分析。\n" +
42+
"常见的报错及原因:\n" +
43+
"ERROR: must be superuser or replication role to use replication slots \n" +
44+
"当前用户没有查询 publication 的权限。",
45+
solution = "Solution (choose one): \n" +
46+
"1. Use a superuser connection (such as postgres). \n" +
47+
"2. Grant the current user the required permissions and execute: ALTER USER username REPLICATION;",
48+
solutionCN = "解决方案(任选其一):\n" +
49+
"1. 使用超级用户连接(例如 postgres)。\n" +
50+
"2. 赋予当前用户所需权限,执行:ALTER USER username REPLICATION;",
51+
dynamicDescription = "Execute sql failed: {}",
52+
dynamicDescriptionCN = "执行语句失败:{}",
53+
level = TapExLevel.CRITICAL,
54+
type = TapExType.RUNTIME
55+
)
56+
String SELECT_PUBLICATION_FAILED = "410002";
57+
}

connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.tapdata.connector.postgres.config.PostgresConfig;
1111
import io.tapdata.connector.postgres.ddl.PostgresDDLSqlGenerator;
1212
import io.tapdata.connector.postgres.dml.PostgresRecordWriter;
13+
import io.tapdata.connector.postgres.error.PostgresErrorCode;
1314
import io.tapdata.connector.postgres.exception.PostgresExceptionCollector;
1415
import io.tapdata.connector.postgres.partition.PostgresPartitionContext;
1516
import io.tapdata.connector.postgres.partition.TableType;
@@ -30,6 +31,7 @@
3031
import io.tapdata.entity.utils.cache.Entry;
3132
import io.tapdata.entity.utils.cache.Iterator;
3233
import io.tapdata.entity.utils.cache.KVReadOnlyMap;
34+
import io.tapdata.exception.TapCodeException;
3335
import io.tapdata.kit.DbKit;
3436
import io.tapdata.kit.EmptyKit;
3537
import io.tapdata.kit.ErrorKit;
@@ -269,7 +271,12 @@ private void clearSlot() throws Throwable {
269271
private void buildSlot(TapConnectorContext connectorContext, Boolean needCheck) throws Throwable {
270272
if (EmptyKit.isNull(slotName)) {
271273
slotName = "tapdata_cdc_" + UUID.randomUUID().toString().replaceAll("-", "_");
272-
postgresJdbcContext.execute("SELECT pg_create_logical_replication_slot('" + slotName + "','" + postgresConfig.getLogPluginName() + "')");
274+
String sql = "SELECT pg_create_logical_replication_slot('" + slotName + "','" + postgresConfig.getLogPluginName() + "')";
275+
try {
276+
postgresJdbcContext.execute(sql);
277+
} catch (SQLException e) {
278+
throw new TapCodeException(PostgresErrorCode.SELECT_PUBLICATION_FAILED, "Select publication failed. Error message: " + e.getMessage()).dynamicDescriptionParameters(sql);
279+
}
273280
tapLogger.info("new logical replication slot created, slotName:{}", slotName);
274281
connectorContext.getStateMap().put("tapdata_pg_slot", slotName);
275282
} else if (needCheck) {
@@ -713,7 +720,12 @@ private void createPublicationIfNotExist() throws SQLException {
713720
}
714721
});
715722
if (needCreate.get()) {
716-
postgresJdbcContext.execute(String.format("CREATE PUBLICATION %s FOR ALL TABLES %s", publicationName, postgresConfig.getPartitionRoot() ? "WITH (publish_via_partition_root = true)" : ""));
723+
String sql = String.format("CREATE PUBLICATION %s FOR ALL TABLES %s", publicationName, postgresConfig.getPartitionRoot() ? "WITH (publish_via_partition_root = true)" : "");
724+
try {
725+
postgresJdbcContext.execute(sql);
726+
} catch (SQLException e) {
727+
throw new TapCodeException(PostgresErrorCode.CREATE_PUBLICATION_FAILED, "create publication for all tables failed. Error message: " + e.getMessage()).dynamicDescriptionParameters(sql);
728+
}
717729
}
718730
}
719731

0 commit comments

Comments
 (0)