Skip to content

Commit 82673fc

Browse files
committed
fix: postgres money null bug
fix: starrocks update partial columns bug
1 parent e78236e commit 82673fc

File tree

3 files changed

+17
-8
lines changed

3 files changed

+17
-8
lines changed

connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1005,7 +1005,7 @@ private Map<String, Object> filterTimeForPG(ResultSet resultSet, Map<String, Str
10051005

10061006
} else if (dataType.equals("money")) {
10071007
String money = resultSet.getString(colName);
1008-
if ("null".equals(money)) {
1008+
if (EmptyKit.isBlank(money) || "null".equals(money)) {
10091009
dataMap.put(colName, null);
10101010
} else {
10111011
dataMap.put(colName, new BigDecimal(money.replaceAll("[^\\d.-]", "")));

connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/StarrocksConnector.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class StarrocksConnector extends CommonDbConnector {
5858
private StarrocksConfig starrocksConfig;
5959
private final Map<String, StarrocksStreamLoader> starrocksStreamLoaderMap = new ConcurrentHashMap<>();
6060
private Consumer<Object> flushOffsetCallback;
61+
private boolean hasMethodError = false;
6162

6263

6364
@Override
@@ -181,6 +182,7 @@ protected RetryOptions errorHandle(TapConnectionContext tapConnectionContext, PD
181182
retryOptions.needRetry(true);
182183
return retryOptions;
183184
}
185+
hasMethodError = true;
184186
return retryOptions;
185187
}
186188

@@ -196,6 +198,19 @@ public StarrocksStreamLoader getStarrocksStreamLoader() {
196198
}
197199

198200
private void writeRecord(TapConnectorContext connectorContext, List<TapRecordEvent> tapRecordEvents, TapTable tapTable, Consumer<WriteListResult<TapRecordEvent>> writeListResultConsumer) throws Throwable {
201+
if (hasMethodError) {
202+
for (StarrocksStreamLoader starrocksStreamLoader : starrocksStreamLoaderMap.values()) {
203+
if (EmptyKit.isNotNull(starrocksStreamLoader)) {
204+
try {
205+
starrocksStreamLoader.flushOnStop();
206+
tapLogger.info("StarrocksConnector", "Flushed remaining data before stopping StarrocksStreamLoader");
207+
} catch (Exception e) {
208+
tapLogger.warn("StarrocksConnector", "Failed to flush data before stopping: {}", e.getMessage());
209+
}
210+
}
211+
}
212+
hasMethodError = false;
213+
}
199214
try {
200215
if (checkStreamLoad()) {
201216
getStarrocksStreamLoader().writeRecord(tapRecordEvents, tapTable, writeListResultConsumer);

connectors/starrocks-connector/src/main/java/io/tapdata/connector/starrocks/streamload/StarrocksStreamLoader.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -963,19 +963,13 @@ public RespContent flushTable(String tableName, TapTable table) throws Starrocks
963963
taplogger.warn("Table {} flush failed: flushed_size={}, waiting_time={} ms, " +
964964
"flush_duration={} ms, error={}",
965965
tableName, formatBytes(tableDataSize), waitTime, flushDuration, e.getMessage());
966-
967-
// 失败:保留缓存文件(不删除),仅关闭文件流
968-
cleanupCacheFileForTable(tableName, false);
969966
throw e;
970967
} catch (Exception e) {
971968
long flushEndTime = System.currentTimeMillis();
972969
long flushDuration = flushEndTime - flushStartTime;
973970
taplogger.warn("Table {} flush failed: flushed_size={}, waiting_time={} ms, " +
974971
"flush_duration={} ms, error={}",
975972
tableName, formatBytes(tableDataSize), waitTime, flushDuration, e.getMessage());
976-
977-
// 失败:保留缓存文件(不删除),仅关闭文件流
978-
cleanupCacheFileForTable(tableName, false);
979973
throw new StarrocksRuntimeException(e);
980974
} finally {
981975

@@ -1275,7 +1269,7 @@ private void cleanupCacheFileForTable(String tableName, boolean deleteFile) {
12751269

12761270
// 清理相关状态
12771271
isFirstRecordByTable.remove(tableName);
1278-
// dataColumnsByTable.remove(tableName);
1272+
dataColumnsByTable.remove(tableName);
12791273
currentBatchSizeByTable.remove(tableName);
12801274
// 注意:lastFlushTimeByTable 不清理,需要保持刷新时间记录
12811275
// 注意:tableNameToTapTableMap 不清理,因为表结构信息需要持久保存

0 commit comments

Comments
 (0)