Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.tapdata.connector.postgres.error;

import io.tapdata.exception.TapExClass;
import io.tapdata.exception.TapExCode;
import io.tapdata.exception.TapExLevel;
import io.tapdata.exception.TapExType;

@TapExClass(
code = 41,
module = "postgres-connector",
describe = "Postgres Error Code",
prefix = "PG")
public interface PostgresErrorCode {
@TapExCode(
describe = "Failed to create publication, please analyze the specific error message. \n" +
"Common errors and reasons: \n" +
" ERROR: permission denied for database. \n" +
"The current user does not have permission to create publications.",
describeCN = "创建 publication 失败,需结合具体报错信息分析。\n" +
"常见的报错及原因:\n" +
"ERROR: permission denied for database. \n" +
"当前用户没有创建 publication 的权限。",
solution = "Solution (choose one): \n" +
"1. Use a superuser connection (such as postgres). \n" +
"2. Grant the current user the required permissions and execute: ALTER USER username REPLICATION;",
solutionCN = "解决方案(任选其一):\n" +
"1. 使用超级用户连接(例如 postgres)。\n" +
"2. 赋予当前用户所需权限,执行:ALTER USER username REPLICATION;",
dynamicDescription = "Execute sql failed: {}",
dynamicDescriptionCN = "执行语句失败:{}",
level = TapExLevel.CRITICAL,
type = TapExType.RUNTIME
)
String CREATE_PUBLICATION_FAILED = "410001";

@TapExCode(
describe = "Failed to select publication, please analyze the specific error message. \n" +
"Common errors and reasons: \n" +
"ERROR: must be superuser or replication role to use replication slots. \n" +
"The current user does not have permission to create publications.",
describeCN = "查询 publication 失败,需结合具体报错信息分析。\n" +
"常见的报错及原因:\n" +
"ERROR: must be superuser or replication role to use replication slots \n" +
"当前用户没有查询 publication 的权限。",
solution = "Solution (choose one): \n" +
"1. Use a superuser connection (such as postgres). \n" +
"2. Grant the current user the required permissions and execute: ALTER USER username REPLICATION;",
solutionCN = "解决方案(任选其一):\n" +
"1. 使用超级用户连接(例如 postgres)。\n" +
"2. 赋予当前用户所需权限,执行:ALTER USER username REPLICATION;",
dynamicDescription = "Execute sql failed: {}",
dynamicDescriptionCN = "执行语句失败:{}",
level = TapExLevel.CRITICAL,
type = TapExType.RUNTIME
)
String SELECT_PUBLICATION_FAILED = "410002";
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.tapdata.connector.postgres.config.PostgresConfig;
import io.tapdata.connector.postgres.ddl.PostgresDDLSqlGenerator;
import io.tapdata.connector.postgres.dml.PostgresRecordWriter;
import io.tapdata.connector.postgres.error.PostgresErrorCode;
import io.tapdata.connector.postgres.exception.PostgresExceptionCollector;
import io.tapdata.connector.postgres.partition.PostgresPartitionContext;
import io.tapdata.connector.postgres.partition.TableType;
Expand All @@ -30,6 +31,7 @@
import io.tapdata.entity.utils.cache.Entry;
import io.tapdata.entity.utils.cache.Iterator;
import io.tapdata.entity.utils.cache.KVReadOnlyMap;
import io.tapdata.exception.TapCodeException;
import io.tapdata.kit.DbKit;
import io.tapdata.kit.EmptyKit;
import io.tapdata.kit.ErrorKit;
Expand Down Expand Up @@ -269,7 +271,12 @@ private void clearSlot() throws Throwable {
private void buildSlot(TapConnectorContext connectorContext, Boolean needCheck) throws Throwable {
if (EmptyKit.isNull(slotName)) {
slotName = "tapdata_cdc_" + UUID.randomUUID().toString().replaceAll("-", "_");
postgresJdbcContext.execute("SELECT pg_create_logical_replication_slot('" + slotName + "','" + postgresConfig.getLogPluginName() + "')");
String sql = "SELECT pg_create_logical_replication_slot('" + slotName + "','" + postgresConfig.getLogPluginName() + "')";
try {
postgresJdbcContext.execute(sql);
} catch (SQLException e) {
throw new TapCodeException(PostgresErrorCode.SELECT_PUBLICATION_FAILED, "Select publication failed. Error message: " + e.getMessage()).dynamicDescriptionParameters(sql);
}
tapLogger.info("new logical replication slot created, slotName:{}", slotName);
connectorContext.getStateMap().put("tapdata_pg_slot", slotName);
} else if (needCheck) {
Expand Down Expand Up @@ -713,7 +720,12 @@ private void createPublicationIfNotExist() throws SQLException {
}
});
if (needCreate.get()) {
postgresJdbcContext.execute(String.format("CREATE PUBLICATION %s FOR ALL TABLES %s", publicationName, postgresConfig.getPartitionRoot() ? "WITH (publish_via_partition_root = true)" : ""));
String sql = String.format("CREATE PUBLICATION %s FOR ALL TABLES %s", publicationName, postgresConfig.getPartitionRoot() ? "WITH (publish_via_partition_root = true)" : "");
try {
postgresJdbcContext.execute(sql);
} catch (SQLException e) {
throw new TapCodeException(PostgresErrorCode.CREATE_PUBLICATION_FAILED, "create publication for all tables failed. Error message: " + e.getMessage()).dynamicDescriptionParameters(sql);
}
}
}

Expand Down
Loading