[Fix](batch) Prevent writer deadlock from currentCacheBytes drift#653
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes a batch sink stall in DorisBatchStreamLoad by making cache-byte accounting symmetric with what the client actually buffered and by triggering per-buffer flushes before entering global backpressure waits. In the Flink Doris connector, these changes target the writer path that can otherwise freeze when currentCacheBytes drifts upward over time.
Changes:
- Flush full buffers before waiting on global cache pressure so newly full buffers are not stranded behind backpressure.
- Decrement
currentCacheBytesusing the buffered byte count instead of Doris-reportedloadBytes. - Keep the fix local to the batch stream-load implementation without changing public APIs.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (flushQueue.size() < executionOptions.getFlushQueueSize() | ||
| && (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() | ||
| || buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { | ||
| boolean flush = bufferFullFlush(bufferKey); | ||
| LOG.info("trigger flush by buffer full, flush: {}", flush); | ||
| } else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES | ||
| || buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) { | ||
| // The buffer capacity exceeds the stream load limit, flush | ||
| boolean flush = bufferFullFlush(bufferKey); | ||
| LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush); |
| long cacheByteBeforeFlush = | ||
| currentCacheBytes.getAndAdd(-respContent.getLoadBytes()); | ||
| currentCacheBytes.getAndAdd(-buffer.getBufferSizeBytes()); |
JNSimba
left a comment
There was a problem hiding this comment.
LGTM, Thank you for your contribution.
|
@JNSimba Thanks for the review. When would the next release cutoff be? |
Yes, a fix will be released quickly, and a vote is expected to be launched within the next two days. |
## Versions - [x] dev - [x] 4.x - [ ] 3.x - [ ] 2.1 or older (not covered by version/language sync gate) ## Languages - [x] Chinese - [x] English - [ ] Japanese candidate translation needed ## Docs Checklist - [ ] Checked by AI - [ ] Test Cases Built - [x] Updated required version and language counterparts, or explained why not - [x] If only one language changed, confirmed whether source/translation counterparts need sync ## Summary Release Flink Doris Connector 26.1.1, superseding 26.1.0. - Version table in `flink-doris-connector.md` (dev + 4.x, EN + zh-CN): replace `26.1.0` row with `26.1.1`. - `release-notes.md` (dev + 4.x, EN + zh-CN): prepend a `26.1.1` section. - Bug fix: batch sink potentially freezing during prolonged operation when compression is enabled (apache/doris-flink-connector#653). - Credits: @addu390 - Download page (`src/constant/download.data.ts`): replace 26.1.0 entry (label/value/source/binary URLs and the `FLINK_SAME_SOURCE_2610` constant) with 26.1.1. Release notes reference: apache/doris-flink-connector#654
Proposed changes
Issue Number: close #614
Problem Summary:
DorisBatchStreamLoadincrementscurrentCacheBytesby the client-side record bytes on insert, but decrements it byrespContent.getLoadBytes()on a successful load. Whenever the BE-reported value is smaller than what the client buffered, either bypartial_columns=true,compress_type=gz, etc, each load leaks a few bytes from the counter.Over time the leak accumulates above
maxBlockedBytes, sowriteRecordparks onblock.await()forever even thoughbufferMapandflushQueueare empty. The job freezes with no exception, only repeatingCache full, waiting for flushandbufferMap is empty, no need to flush nulllogs.Two changes:
currentCacheBytesbybuffer.getBufferSizeBytes()so the add and subtract are symmetric regardless of compression / projection.bufferFlushMaxBytesactually gets flushed instead of being stranded behind backpressure.Checklist(Required)
Further comments
Same root cause was independently reported in #614 (gz compression trigger). The fix is config-agnostic, it covers
partial_columns, gz, and any future source of client-vs-BE byte asymmetry.