Skip to content

Commit 0ce0450

Browse files
committed
feat: hash read retry for each
(cherry picked from commit 5ae1f54)
1 parent 934c7be commit 0ce0450

File tree

1 file changed

+29
-16
lines changed

1 file changed

+29
-16
lines changed

connectors-common/sql-core/src/main/java/io/tapdata/common/CommonDbConnector.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
import io.tapdata.pdk.apis.entity.TapFilter;
2727
import io.tapdata.pdk.apis.functions.connector.target.CreateTableOptions;
2828

29+
import java.io.IOException;
2930
import java.sql.Connection;
3031
import java.sql.ResultSet;
3132
import java.sql.SQLException;
33+
import java.sql.SQLRecoverableException;
3234
import java.util.*;
3335
import java.util.concurrent.ConcurrentHashMap;
3436
import java.util.concurrent.CountDownLatch;
@@ -646,24 +648,35 @@ protected void batchReadWithHashSplit(TapConnectorContext tapConnectorContext, T
646648
for (int ii = threadIndex; ii < commonDbConfig.getMaxSplit(); ii += commonDbConfig.getBatchReadThreadSize()) {
647649
String splitSql = sql + " WHERE " + getHashSplitModConditions(tapTable, commonDbConfig.getMaxSplit(), ii);
648650
tapLogger.info("batchRead, splitSql[{}]: {}", ii + 1, splitSql);
649-
jdbcContext.query(splitSql, resultSet -> {
650-
List<TapEvent> tapEvents = list();
651-
//get all column names
652-
List<String> columnNames = DbKit.getColumnsFromResultSet(resultSet);
653-
while (isAlive() && resultSet.next()) {
654-
DataMap dataMap = DbKit.getRowFromResultSet(resultSet, columnNames);
655-
processDataMap(dataMap, tapTable);
656-
tapEvents.add(insertRecordEvent(dataMap, tapTable.getId()));
657-
if (tapEvents.size() == eventBatchSize) {
658-
syncEventSubmit(tapEvents, eventsOffsetConsumer);
659-
tapEvents = list();
651+
int retry = 20;
652+
while (retry-- > 0 && isAlive()) {
653+
try {
654+
jdbcContext.query(splitSql, resultSet -> {
655+
List<TapEvent> tapEvents = list();
656+
//get all column names
657+
List<String> columnNames = DbKit.getColumnsFromResultSet(resultSet);
658+
while (isAlive() && resultSet.next()) {
659+
DataMap dataMap = DbKit.getRowFromResultSet(resultSet, columnNames);
660+
processDataMap(dataMap, tapTable);
661+
tapEvents.add(insertRecordEvent(dataMap, tapTable.getId()));
662+
if (tapEvents.size() == eventBatchSize) {
663+
syncEventSubmit(tapEvents, eventsOffsetConsumer);
664+
tapEvents = list();
665+
}
666+
}
667+
//last events those less than eventBatchSize
668+
if (EmptyKit.isNotEmpty(tapEvents)) {
669+
syncEventSubmit(tapEvents, eventsOffsetConsumer);
670+
}
671+
});
672+
break;
673+
} catch (Exception e) {
674+
if (retry == 0 || !(e instanceof SQLRecoverableException || e instanceof IOException)) {
675+
throw e;
660676
}
677+
tapLogger.warn("batchRead, splitSql[{}]: {} failed, retrying...", ii + 1, splitSql);
661678
}
662-
//last events those less than eventBatchSize
663-
if (EmptyKit.isNotEmpty(tapEvents)) {
664-
syncEventSubmit(tapEvents, eventsOffsetConsumer);
665-
}
666-
});
679+
}
667680
}
668681
} catch (Exception e) {
669682
throwable.set(e);

0 commit comments

Comments
 (0)