Skip to content

Commit 02d3fc6

Browse files
authored
Merge pull request #748 from tapdata/develop
Develop
2 parents 65bd5c1 + 06d4486 commit 02d3fc6

File tree

56 files changed

+1870
-835
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1870
-835
lines changed

.github/workflows/mr-ci.yaml

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
TAG_NAME: ${{ steps.set-output.outputs.TAG_NAME }}
4545
steps:
4646
- name: Checkout Tapdata Opensource
47-
uses: actions/checkout@v3
47+
uses: actions/checkout@v4
4848
with:
4949
repository: 'tapdata/tapdata'
5050
token: ${{ secrets.TAPDATA_ENT_CICD_TOKEN }}
@@ -73,33 +73,12 @@ jobs:
7373
echo "::set-output name=TAG_NAME::${TAG_NAME}"
7474
7575
Sync-Code-to-Office:
76-
runs-on: ubuntu-latest
77-
needs:
78-
- Get-Stable-Branch
79-
steps:
80-
- name: Checkout Tapdata Connectors Code
81-
uses: actions/checkout@v3
82-
with:
83-
repository: 'tapdata/tapdata-connectors'
84-
ref: ${{ needs.Get-Stable-Branch.outputs.CONNECTORS_BRANCH }}
85-
token: ${{ secrets.TAPDATA_ENT_CICD_TOKEN }}
86-
path: tapdata-connectors
87-
fetch-depth: 0
88-
- name: Checkout Tapdata Application
89-
uses: actions/checkout@v2
90-
with:
91-
repository: 'tapdata/tapdata-application'
92-
ref: main
93-
token: ${{ secrets.TAPDATA_ENT_CICD_TOKEN }}
94-
path: tapdata-application
95-
fetch-depth: 0
96-
- name: Push Tapdata Connectors to Gogs
97-
uses: nick-fields/retry@v2
98-
with:
99-
timeout_minutes: 10
100-
max_attempts: 3
101-
command: |
102-
bash tapdata-application/build/upgrade.sh --upgrade-code=true --upgrade-code-path=tapdata-connectors --gogs-project-name=tapdata-connectors
76+
needs: Get-Stable-Branch
77+
uses: tapdata/tapdata-application/.github/workflows/sync-code-to-office.yaml@main
78+
secrets: inherit
79+
with:
80+
tapdata-connectors: ${{ needs.Get-Current-Branch.outputs.branch }}
81+
gitee-token-user: "${{ vars.GITEE_TOKEN_USER }}"
10382

10483
Scan-Connectors:
10584
runs-on: ubuntu-latest
@@ -124,7 +103,7 @@ jobs:
124103
wait_workflow: true
125104
- name: Checkout Tapdata-Application
126105
if: ${{ always() && steps.sonar.outcome == 'failure' }}
127-
uses: actions/checkout@v2
106+
uses: actions/checkout@v4
128107
with:
129108
repository: 'tapdata/tapdata-application'
130109
ref: "main"

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/PostgresTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
import io.tapdata.common.CommonDbTest;
55
import io.tapdata.common.util.FileUtil;
66
import io.tapdata.connector.postgres.config.PostgresConfig;
7+
import io.tapdata.connector.postgres.error.PostgresErrorCode;
78
import io.tapdata.entity.simplify.TapSimplify;
9+
import io.tapdata.exception.TapCodeException;
810
import io.tapdata.kit.EmptyKit;
911
import io.tapdata.kit.StringKit;
1012
import io.tapdata.pdk.apis.entity.ConnectionOptions;
1113
import io.tapdata.pdk.apis.entity.TestItem;
14+
import io.tapdata.pdk.apis.exception.TapTestItemException;
1215
import io.tapdata.pdk.apis.exception.testItem.TapTestCurrentTimeConsistentEx;
1316
import io.tapdata.pdk.apis.exception.testItem.TapTestReadPrivilegeEx;
1417
import io.tapdata.pdk.apis.exception.testItem.TapTestStreamReadEx;
@@ -139,19 +142,24 @@ public Boolean testStreamRead() {
139142
properties.put("password", commonDbConfig.getPassword());
140143
properties.put("replication", "database");
141144
properties.put("assumeMinServerVersion", "9.4");
145+
List<String> testSqls = TapSimplify.list();
146+
long begin = System.currentTimeMillis();
142147
try {
143148
Connection connection = new Driver().connect(commonDbConfig.getDatabaseUrl(), properties);
144149
assert connection != null;
145150
connection.close();
146-
List<String> testSqls = TapSimplify.list();
147151
String testSlotName = "test_tapdata_" + UUID.randomUUID().toString().replaceAll("-", "_");
148152
testSqls.add(String.format(PG_LOG_PLUGIN_CREATE_TEST, testSlotName, ((PostgresConfig) commonDbConfig).getLogPluginName()));
149153
testSqls.add(PG_LOG_PLUGIN_DROP_TEST);
150-
jdbcContext.batchExecute(testSqls);
154+
jdbcContext.batchExecute(testSqls, 20);
151155
consumer.accept(testItem(TestItem.ITEM_READ_LOG, TestItem.RESULT_SUCCESSFULLY, "Cdc can work normally"));
152156
return true;
153157
} catch (Throwable e) {
154-
consumer.accept(new TestItem(TestItem.ITEM_READ_LOG, new TapTestStreamReadEx(e), TestItem.RESULT_SUCCESSFULLY_WITH_WARN));
158+
if (System.currentTimeMillis() - begin > 18000) {
159+
consumer.accept(new TestItem(TestItem.ITEM_READ_LOG, new TapTestItemException(new TapCodeException(PostgresErrorCode.CREATE_SLOT_TIMEOUT).dynamicDescriptionParameters(String.join(",", testSqls))), TestItem.RESULT_SUCCESSFULLY_WITH_WARN));
160+
} else {
161+
consumer.accept(new TestItem(TestItem.ITEM_READ_LOG, new TapTestItemException(new TapCodeException(PostgresErrorCode.CREATE_SLOT_FAILED).dynamicDescriptionParameters(String.join(",", testSqls))), TestItem.RESULT_SUCCESSFULLY_WITH_WARN));
162+
}
155163
return null;
156164
}
157165
}

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import io.tapdata.connector.postgres.PostgresJdbcContext;
99
import io.tapdata.connector.postgres.cdc.config.PostgresDebeziumConfig;
1010
import io.tapdata.connector.postgres.cdc.offset.PostgresOffset;
11-
import io.tapdata.connector.postgres.cdc.offset.PostgresOffsetStorage;
1211
import io.tapdata.connector.postgres.config.PostgresConfig;
1312
import io.tapdata.entity.event.TapEvent;
1413
import io.tapdata.entity.event.control.HeartbeatEvent;

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/offset/PostgresOffsetStorage.java

Lines changed: 0 additions & 10 deletions
This file was deleted.

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/error/PostgresErrorCode.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,58 @@ public interface PostgresErrorCode {
5454
type = TapExType.RUNTIME
5555
)
5656
String SELECT_PUBLICATION_FAILED = "410002";
57+
58+
@TapExCode(
59+
describe = "Failed to create slot, please analyze the specific error message. \n" +
60+
"Common errors and reasons: \n" +
61+
"ERROR: must be superuser or replication role to use replication slots. \n" +
62+
"The current user does not have permission to create slots.",
63+
describeCN = "创建 slot 失败,需结合具体报错信息分析。\n" +
64+
"常见的报错及原因:\n" +
65+
"ERROR: must be superuser or replication role to use replication slots \n" +
66+
"当前用户没有创建 slot 的权限。",
67+
solution = "Solution (choose one): \n" +
68+
"1. Use a superuser connection (such as postgres). \n" +
69+
"2. Grant the current user the required permissions and execute: ALTER USER username REPLICATION;",
70+
solutionCN = "解决方案(任选其一):\n" +
71+
"1. 使用超级用户连接(例如 postgres)。\n" +
72+
"2. 赋予当前用户所需权限,执行:ALTER USER username REPLICATION;",
73+
dynamicDescription = "Execute sql failed: {}",
74+
dynamicDescriptionCN = "执行语句失败:{}",
75+
level = TapExLevel.CRITICAL,
76+
type = TapExType.RUNTIME
77+
)
78+
String CREATE_SLOT_FAILED = "410003";
79+
80+
@TapExCode(
81+
describe = "Failed to create slot, please analyze the specific error message. \n" +
82+
"Common errors and reasons: \n" +
83+
"ERROR: timeout expired. \n" +
84+
"The creation of slot timed out, which is probably affected by a long time of uncommitted transaction or other database processes blocking wait.",
85+
describeCN = "创建 slot 超时,大概率是受长时间未提交事务影响或其它数据库进程阻塞等待",
86+
solution = "Solution (cautious handling): \n" +
87+
"1. Please check if there are any long uncommitted transactions in the database. If so, please commit them first. \n" +
88+
"2. Please check if there are any other database processes blocking wait. If so, please handle them first.\n" +
89+
"SELECT pid, datname, usename, state, xact_start, now() - xact_start AS duration, query\n" +
90+
"FROM pg_stat_activity\n" +
91+
"WHERE state != 'idle'\n" +
92+
" AND xact_start IS NOT NULL\n" +
93+
" AND now() - xact_start > interval '300 seconds' -- for example, find transactions that have been running for more than 5 minutes\n" +
94+
"ORDER BY xact_start;",
95+
solutionCN = "解决方案(需谨慎处理):\n" +
96+
"1. 请检查数据库是否存在长时间未提交事务,如存在请先提交事务。\n" +
97+
"SELECT pid, datname, usename, state, xact_start, now() - xact_start AS duration, query\n" +
98+
"FROM pg_stat_activity\n" +
99+
"WHERE state != 'idle'\n" +
100+
" AND xact_start IS NOT NULL\n" +
101+
" AND now() - xact_start > interval '300 seconds' -- 例如,查找超过5分钟的事务\n" +
102+
"ORDER BY xact_start;\n" +
103+
"2. 请检查数据库是否存在其它数据库进程阻塞等待,如存在请先处理阻塞。",
104+
dynamicDescription = "Execute sql failed: {}",
105+
dynamicDescriptionCN = "执行语句失败:{}",
106+
level = TapExLevel.CRITICAL,
107+
type = TapExType.RUNTIME,
108+
recoverable = true
109+
)
110+
String CREATE_SLOT_TIMEOUT = "410004";
57111
}

connectors-common/sql-core/src/main/java/io/tapdata/common/JdbcContext.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,17 @@ public void execute(String sql) throws SQLException {
227227
}
228228
}
229229

230+
public void execute(String sql, int timeout) throws SQLException {
231+
try (
232+
Connection connection = getConnection();
233+
Statement statement = connection.createStatement()
234+
) {
235+
statement.setQueryTimeout(timeout);
236+
statement.execute(sql);
237+
connection.commit();
238+
}
239+
}
240+
230241
public void batchExecute(List<String> sqlList) throws SQLException {
231242
try (
232243
Connection connection = getConnection();
@@ -239,6 +250,19 @@ public void batchExecute(List<String> sqlList) throws SQLException {
239250
}
240251
}
241252

253+
public void batchExecute(List<String> sqlList, int timeout) throws SQLException {
254+
try (
255+
Connection connection = getConnection();
256+
Statement statement = connection.createStatement()
257+
) {
258+
statement.setQueryTimeout(timeout);
259+
for (String sql : sqlList) {
260+
statement.execute(sql);
261+
}
262+
connection.commit();
263+
}
264+
}
265+
242266
public void queryAllTables(List<String> tableNames, int batchSize, Consumer<List<String>> consumer) throws SQLException {
243267
List<String> temp = list();
244268
query(queryAllTablesSql(getConfig().getSchema(), tableNames),

connectors/doris-connector/src/main/java/io/tapdata/connector/doris/bean/DorisConfig.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class DorisConfig extends CommonDbConfig {
2424
private List<LinkedHashMap<String, String>> tableProperties = new ArrayList<>();
2525
private Boolean jdbcCompletion = false;
2626

27-
private Boolean useHTTPS =false;
27+
private Boolean useHTTPS = false;
2828

2929
private Integer backendNum;
3030

@@ -38,9 +38,9 @@ public DorisConfig() {
3838
public DorisConfig load(Map<String, Object> map) {
3939
DorisConfig config = (DorisConfig) super.load(map);
4040
config.setSchema(config.getDatabase());
41-
if(Boolean.TRUE.equals(useHTTPS)){
41+
if (Boolean.TRUE.equals(useHTTPS)) {
4242
config.setDorisHttp(getDorisHttp().replace("https://", ""));
43-
}else{
43+
} else {
4444
config.setDorisHttp(getDorisHttp().replace("http://", ""));
4545
}
4646
return config;
@@ -109,10 +109,6 @@ public String getWriteFormat() {
109109
return writeFormat;
110110
}
111111

112-
public String getWriteFormat(String key) {
113-
return getTableConfigValue(key, "writeFormat", writeFormat);
114-
}
115-
116112
public void setWriteFormat(String writeFormat) {
117113
this.writeFormat = writeFormat;
118114
}
@@ -185,8 +181,4 @@ public enum WriteFormat {
185181
public WriteFormat getWriteFormatEnum() {
186182
return WriteFormat.valueOf(writeFormat);
187183
}
188-
189-
public WriteFormat getWriteFormatEnum(String key) {
190-
return WriteFormat.valueOf(getTableConfigValue(key, "writeFormat", writeFormat));
191-
}
192184
}

connectors/doris-connector/src/main/java/io/tapdata/connector/doris/streamload/DorisStreamLoader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public DorisStreamLoader(DorisJdbcContext dorisJdbcContext, CloseableHttpClient
7171
}
7272

7373
private void initMessageSerializer() {
74-
DorisConfig.WriteFormat writeFormat = dorisConfig.getWriteFormatEnum(tapTable.getId());
74+
DorisConfig.WriteFormat writeFormat = dorisConfig.getWriteFormatEnum();
7575
TapLogger.info(TAG, "Doris stream load run with {} format", writeFormat);
7676
switch (writeFormat) {
7777
case csv:
@@ -137,7 +137,7 @@ private Set<String> getDataColumns(TapRecordEvent recordEvent) {
137137
}
138138

139139
public RespContent put(final TapTable table) throws StreamLoadException, DorisRetryableException {
140-
DorisConfig.WriteFormat writeFormat = dorisConfig.getWriteFormatEnum(table.getId());
140+
DorisConfig.WriteFormat writeFormat = dorisConfig.getWriteFormatEnum();
141141
try {
142142
final String loadUrl = buildLoadUrl(dorisConfig.getDorisHttp(), dorisConfig.getDatabase(), table.getId());
143143
final String prefix = buildPrefix(URLEncoder.encode(table.getId()).replace("%", ""));

connectors/doris-connector/src/main/resources/spec_doris.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,6 @@
487487
"title": "${writeFormat}",
488488
"default": "json",
489489
"x-index": 7,
490-
"x-perTable": true,
491490
"x-decorator": "FormItem",
492491
"x-decorator-props": {
493492
"feedbackLayout": "none"
Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
11
package io.tapdata.connector.config;
22

33
import io.tapdata.connector.IConfigWithContext;
4-
import io.tapdata.connector.tester.IStep;
5-
import io.tapdata.connector.tester.items.ClusterURITesterItem;
6-
import io.tapdata.pdk.apis.entity.ConnectionOptions;
7-
import io.tapdata.pdk.apis.entity.TestItem;
8-
9-
import java.util.function.Consumer;
104

115
/**
126
* 测试项-集群地址
@@ -21,15 +15,4 @@ default String getConnectionClusterURI() {
2115
return connectionConfigGet(KEY_CLUSTER_URI, null);
2216
}
2317

24-
default boolean testClusterURI(TestItem item, Consumer<TestItem> consumer, ConnectionOptions options) {
25-
String clusterURI = getConnectionClusterURI();
26-
if (null == clusterURI) {
27-
item.setResult(TestItem.RESULT_FAILED);
28-
item.setInformation(String.format("not configured '%s' yet", KEY_CLUSTER_URI));
29-
} else {
30-
options.connectionString(clusterURI);
31-
ClusterURITesterItem.test(clusterURI, item, consumer);
32-
}
33-
return IStep.CHECK_ITEM_APPLY;
34-
}
3518
}

0 commit comments

Comments
 (0)