Skip to content

Commit cc169fb

Browse files
committed
[Fix](batch) prevent writer deadlock from currentCacheBytes drift
1 parent 06edd0c commit cc169fb

1 file changed

Lines changed: 13 additions & 15 deletions

File tree

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,18 @@ public void writeRecord(String database, String table, byte[] record) {
202202
currentCacheBytes.addAndGet(bytes);
203203
getLock(bufferKey).readLock().unlock();
204204

205+
if (flushQueue.size() < executionOptions.getFlushQueueSize()
206+
&& (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes()
207+
|| buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
208+
boolean flush = bufferFullFlush(bufferKey);
209+
LOG.info("trigger flush by buffer full, flush: {}", flush);
210+
} else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES
211+
|| buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) {
212+
// The buffer capacity exceeds the stream load limit, flush
213+
boolean flush = bufferFullFlush(bufferKey);
214+
LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush);
215+
}
216+
205217
if (currentCacheBytes.get() > maxBlockedBytes) {
206218
lock.lock();
207219
try {
@@ -220,20 +232,6 @@ public void writeRecord(String database, String table, byte[] record) {
220232
lock.unlock();
221233
}
222234
}
223-
224-
// queue has space, flush according to the bufferMaxRows/bufferMaxBytes
225-
if (flushQueue.size() < executionOptions.getFlushQueueSize()
226-
&& (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes()
227-
|| buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
228-
boolean flush = bufferFullFlush(bufferKey);
229-
LOG.info("trigger flush by buffer full, flush: {}", flush);
230-
231-
} else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES
232-
|| buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) {
233-
// The buffer capacity exceeds the stream load limit, flush
234-
boolean flush = bufferFullFlush(bufferKey);
235-
LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush);
236-
}
237235
}
238236

239237
public boolean bufferFullFlush(String bufferKey) {
@@ -499,7 +497,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
499497
OBJECT_MAPPER.readValue(loadResult, RespContent.class);
500498
if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
501499
long cacheByteBeforeFlush =
502-
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
500+
currentCacheBytes.getAndAdd(-buffer.getBufferSizeBytes());
503501
LOG.info(
504502
"load success, cacheBeforeFlushBytes: {}, currentCacheBytes : {}",
505503
cacheByteBeforeFlush,

0 commit comments

Comments
 (0)