Skip to content

Commit d40c6c6

Browse files
committed
fix: batch update failed switch to smaller
1 parent 9092e0d commit d40c6c6

File tree

1 file changed

+37
-27
lines changed

1 file changed

+37
-27
lines changed

connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalRecordWriter.java

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.tapdata.entity.event.dml.TapUpdateRecordEvent;
1212
import io.tapdata.entity.logger.Log;
1313
import io.tapdata.entity.schema.TapTable;
14+
import io.tapdata.kit.DbKit;
1415
import io.tapdata.kit.EmptyKit;
1516
import io.tapdata.pdk.apis.entity.ConnectionOptions;
1617
import io.tapdata.pdk.apis.entity.WriteListResult;
@@ -86,6 +87,35 @@ public void write(List<TapRecordEvent> tapRecordEvents, Consumer<WriteListResult
8687
}
8788
}
8889
//insert,update,delete events must consecutive, so execute the other two first
90+
writePart(tapRecordEvents, listResult, isAlive);
91+
//release resource
92+
93+
} catch (SQLException e) {
94+
exceptionCollector.collectViolateUnique(toJson(tapTable.primaryKeys(true)), null, null, e);
95+
exceptionCollector.collectWritePrivileges("writeRecord", Collections.emptyList(), e);
96+
exceptionCollector.collectWriteType(null, null, null, e);
97+
exceptionCollector.collectWriteLength(null, null, null, e);
98+
exceptionCollector.revealException(e);
99+
throw e;
100+
} finally {
101+
insertRecorder.releaseResource();
102+
updateRecorder.releaseResource();
103+
deleteRecorder.releaseResource();
104+
if (!isTransaction) {
105+
if (needCloseIdentity) {
106+
openIdentity();
107+
}
108+
connection.close();
109+
}
110+
writeListResultConsumer.accept(listResult
111+
.insertedCount(insertRecorder.getAtomicLong().get())
112+
.modifiedCount(updateRecorder.getAtomicLong().get())
113+
.removedCount(deleteRecorder.getAtomicLong().get()));
114+
}
115+
}
116+
117+
protected void writePart(List<TapRecordEvent> tapRecordEvents, WriteListResult<TapRecordEvent> listResult, Supplier<Boolean> isAlive) {
118+
try {
89119
for (TapRecordEvent recordEvent : tapRecordEvents) {
90120
if (null != isAlive && !isAlive.get()) {
91121
break;
@@ -124,39 +154,19 @@ public void write(List<TapRecordEvent> tapRecordEvents, Consumer<WriteListResult
124154
if (!connection.getAutoCommit() && !isTransaction) {
125155
connection.commit();
126156
}
127-
//release resource
128-
129157
} catch (SQLException e) {
130158
try {
131159
connection.rollback();
132160
} catch (Exception ignore) {
133161
}
134-
exceptionCollector.collectTerminateByServer(e);
135-
exceptionCollector.collectViolateNull(null, e);
136-
TapRecordEvent errorEvent = null;
137-
if (EmptyKit.isNotNull(listResult.getErrorMap())) {
138-
errorEvent = listResult.getErrorMap().keySet().stream().findFirst().orElse(null);
162+
exceptionCollector.collectViolateUnique(toJson(tapTable.primaryKeys(true)), null, null, e);
163+
if (tapRecordEvents.size() == 1) {
164+
throw new RuntimeException(String.format("Error occurred when retrying write record: %s", tapRecordEvents.get(0)), e);
165+
} else {
166+
int eachPieceSize = Math.max(tapRecordEvents.size() / 10, 1);
167+
tapLogger.warn("writeRecord failed, dismantle them, size: {}", eachPieceSize);
168+
DbKit.splitToPieces(tapRecordEvents, eachPieceSize).forEach(pieces -> writePart(pieces, listResult, isAlive));
139169
}
140-
exceptionCollector.collectViolateUnique(toJson(tapTable.primaryKeys(true)), errorEvent, null, e);
141-
exceptionCollector.collectWritePrivileges("writeRecord", Collections.emptyList(), e);
142-
exceptionCollector.collectWriteType(null, null, errorEvent, e);
143-
exceptionCollector.collectWriteLength(null, null, errorEvent, e);
144-
exceptionCollector.revealException(e);
145-
throw e;
146-
} finally {
147-
insertRecorder.releaseResource();
148-
updateRecorder.releaseResource();
149-
deleteRecorder.releaseResource();
150-
if (!isTransaction) {
151-
if (needCloseIdentity) {
152-
openIdentity();
153-
}
154-
connection.close();
155-
}
156-
writeListResultConsumer.accept(listResult
157-
.insertedCount(insertRecorder.getAtomicLong().get())
158-
.modifiedCount(updateRecorder.getAtomicLong().get())
159-
.removedCount(deleteRecorder.getAtomicLong().get()));
160170
}
161171
}
162172

0 commit comments

Comments
 (0)