Skip to content

Commit dd0cdf5

Browse files
committed
Merge branch 'develop'
2 parents 1566e33 + a3889fe commit dd0cdf5

File tree

3 files changed

+23
-6
lines changed

3 files changed

+23
-6
lines changed

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/PostgresTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ private boolean deployPgto() {
245245
process = processBuilder.start();
246246
process.waitFor(60, TimeUnit.SECONDS);
247247
new File(FileUtil.paths(toolPath, ctlDir)).mkdir();
248-
processBuilder.command("/bin/sh", "-c", String.format("walminer pgto -i -c %s -s '%s' -e %s -t 4 --source-connstr1='host=%s port=%s username=%s dbanme=%s password=%s'", toolDir.getAbsolutePath() + "/" + ctlDir, slotName, ((PostgresConfig) commonDbConfig).getPgtoPort(), commonDbConfig.getHost(), commonDbConfig.getPort(), commonDbConfig.getUser(), commonDbConfig.getDatabase(), commonDbConfig.getPassword()));
248+
processBuilder.command("/bin/sh", "-c", String.format("walminer pgto init -c %s -s '%s' -e %s -t 4 --source-connstr1='host=%s port=%s username=%s dbanme=%s password=%s'", toolDir.getAbsolutePath() + "/" + ctlDir, slotName, ((PostgresConfig) commonDbConfig).getPgtoPort(), commonDbConfig.getHost(), commonDbConfig.getPort(), commonDbConfig.getUser(), commonDbConfig.getDatabase(), commonDbConfig.getPassword()));
249249
process = processBuilder.start();
250250
process.waitFor(60, TimeUnit.SECONDS);
251251
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
@@ -256,7 +256,7 @@ private boolean deployPgto() {
256256
}
257257
int exitCode = process.waitFor();
258258
System.out.println("exit code: " + exitCode);
259-
processBuilder.command("/bin/sh", "-c", String.format("walminer pgto -m -c %s", toolDir.getAbsolutePath() + "/" + ctlDir));
259+
processBuilder.command("/bin/sh", "-c", String.format("walminer pgto run -m -c %s", toolDir.getAbsolutePath() + "/" + ctlDir));
260260
processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
261261
process = processBuilder.start();
262262
process.waitFor(60, TimeUnit.SECONDS);

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ protected void parseKeyAndValue(String tableName, Map.Entry<String, Object> stri
176176
case "integer":
177177
case "bigint":
178178
case "numeric":
179+
case "money":
179180
case "real":
180181
case "double precision":
181182
stringObjectEntry.setValue(new BigDecimal((String) value));

connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -333,16 +333,16 @@ protected TapField makeTapField(DataMap dataMap) {
333333
@Override
334334
public void onStop(TapConnectionContext connectionContext) {
335335
ErrorKit.ignoreAnyError(() -> {
336-
for (StarrocksStreamLoader StarrocksStreamLoader : starrocksStreamLoaderMap.values()) {
337-
if (EmptyKit.isNotNull(StarrocksStreamLoader)) {
336+
for (StarrocksStreamLoader starrocksStreamLoader : starrocksStreamLoaderMap.values()) {
337+
if (EmptyKit.isNotNull(starrocksStreamLoader)) {
338338
// 在停止前先刷新剩余数据
339339
try {
340-
StarrocksStreamLoader.flushOnStop();
340+
starrocksStreamLoader.flushOnStop();
341341
tapLogger.info("StarrocksConnector", "Flushed remaining data before stopping StarrocksStreamLoader");
342342
} catch (Exception e) {
343343
tapLogger.warn("StarrocksConnector", "Failed to flush data before stopping: {}", e.getMessage());
344344
}
345-
StarrocksStreamLoader.shutdown();
345+
starrocksStreamLoader.shutdown();
346346
}
347347
}
348348
});
@@ -367,6 +367,22 @@ protected void fieldDDLHandler(TapConnectorContext tapConnectorContext, TapField
367367
if (null == sqlList) {
368368
return;
369369
}
370+
//执行ddl前需要将缓存的数据先flush,清空StarrocksStreamLoaderMap对象
371+
ErrorKit.ignoreAnyError(() -> {
372+
for (StarrocksStreamLoader starrocksStreamLoader : starrocksStreamLoaderMap.values()) {
373+
if (EmptyKit.isNotNull(starrocksStreamLoader)) {
374+
// 在停止前先刷新剩余数据
375+
try {
376+
starrocksStreamLoader.flushOnStop();
377+
tapLogger.info("StarrocksConnector", "Flushed remaining data before stopping StarrocksStreamLoader");
378+
} catch (Exception e) {
379+
tapLogger.warn("StarrocksConnector", "Failed to flush data before stopping: {}", e.getMessage());
380+
}
381+
starrocksStreamLoader.shutdown();
382+
}
383+
}
384+
starrocksStreamLoaderMap.clear();
385+
});
370386
try {
371387
tapLogger.info("Field ddl sql: {}", sqlList);
372388
jdbcContext.batchExecute(sqlList);

0 commit comments

Comments
 (0)