From cc169fb38717caa342ab6c54ff283697427f8705 Mon Sep 17 00:00:00 2001 From: Adesh Nalpet Adimurthy <390.adesh@gmail.com> Date: Sat, 2 May 2026 16:24:38 -0400 Subject: [PATCH] [Fix](batch) prevent writer deadlock from currentCacheBytes drift --- .../sink/batch/DorisBatchStreamLoad.java | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 4bc0c19dc..fc21bb979 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -202,6 +202,18 @@ public void writeRecord(String database, String table, byte[] record) { currentCacheBytes.addAndGet(bytes); getLock(bufferKey).readLock().unlock(); + if (flushQueue.size() < executionOptions.getFlushQueueSize() + && (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() + || buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { + boolean flush = bufferFullFlush(bufferKey); + LOG.info("trigger flush by buffer full, flush: {}", flush); + } else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES + || buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) { + // The buffer capacity exceeds the stream load limit, flush + boolean flush = bufferFullFlush(bufferKey); + LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush); + } + if (currentCacheBytes.get() > maxBlockedBytes) { lock.lock(); try { @@ -220,20 +232,6 @@ public void writeRecord(String database, String table, byte[] record) { lock.unlock(); } } - - // queue has space, flush according to the bufferMaxRows/bufferMaxBytes - if (flushQueue.size() < executionOptions.getFlushQueueSize() - && (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() - || buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { - boolean flush = bufferFullFlush(bufferKey); - LOG.info("trigger flush by buffer full, flush: {}", flush); - - } else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES - || buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) { - // The buffer capacity exceeds the stream load limit, flush - boolean flush = bufferFullFlush(bufferKey); - LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush); - } } public boolean bufferFullFlush(String bufferKey) { @@ -499,7 +497,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { OBJECT_MAPPER.readValue(loadResult, RespContent.class); if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { long cacheByteBeforeFlush = - currentCacheBytes.getAndAdd(-respContent.getLoadBytes()); + currentCacheBytes.getAndAdd(-buffer.getBufferSizeBytes()); LOG.info( "load success, cacheBeforeFlushBytes: {}, currentCacheBytes : {}", cacheByteBeforeFlush,