Skip to content

Commit 0044826

Browse files
authored
[Fix](batch) prevent writer deadlock from currentCacheBytes drift (#653)
DorisBatchStreamLoad increments currentCacheBytes by the client-side record bytes on insert, but decrements it by respContent.getLoadBytes() on a successful load. Whenever the BE-reported value is smaller than what the client buffered, either by partial_columns=true, compress_type=gz, etc, each load leaks a few bytes from the counter. Over time the leak accumulates above maxBlockedBytes, so writeRecord parks on block.await() forever even though bufferMap and flushQueue are empty. The job freezes with no exception, only repeating Cache full, waiting for flush and bufferMap is empty, no need to flush null logs.
1 parent 06edd0c commit 0044826

1 file changed

Lines changed: 13 additions & 15 deletions

File tree

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,18 @@ public void writeRecord(String database, String table, byte[] record) {
202202
currentCacheBytes.addAndGet(bytes);
203203
getLock(bufferKey).readLock().unlock();
204204

205+
if (flushQueue.size() < executionOptions.getFlushQueueSize()
206+
&& (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes()
207+
|| buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
208+
boolean flush = bufferFullFlush(bufferKey);
209+
LOG.info("trigger flush by buffer full, flush: {}", flush);
210+
} else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES
211+
|| buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) {
212+
// The buffer capacity exceeds the stream load limit, flush
213+
boolean flush = bufferFullFlush(bufferKey);
214+
LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush);
215+
}
216+
205217
if (currentCacheBytes.get() > maxBlockedBytes) {
206218
lock.lock();
207219
try {
@@ -220,20 +232,6 @@ public void writeRecord(String database, String table, byte[] record) {
220232
lock.unlock();
221233
}
222234
}
223-
224-
// queue has space, flush according to the bufferMaxRows/bufferMaxBytes
225-
if (flushQueue.size() < executionOptions.getFlushQueueSize()
226-
&& (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes()
227-
|| buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
228-
boolean flush = bufferFullFlush(bufferKey);
229-
LOG.info("trigger flush by buffer full, flush: {}", flush);
230-
231-
} else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES
232-
|| buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) {
233-
// The buffer capacity exceeds the stream load limit, flush
234-
boolean flush = bufferFullFlush(bufferKey);
235-
LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush);
236-
}
237235
}
238236

239237
public boolean bufferFullFlush(String bufferKey) {
@@ -499,7 +497,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
499497
OBJECT_MAPPER.readValue(loadResult, RespContent.class);
500498
if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
501499
long cacheByteBeforeFlush =
502-
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
500+
currentCacheBytes.getAndAdd(-buffer.getBufferSizeBytes());
503501
LOG.info(
504502
"load success, cacheBeforeFlushBytes: {}, currentCacheBytes : {}",
505503
cacheByteBeforeFlush,

0 commit comments

Comments
 (0)