From 42ada26123d510b6456bbb648071f7816ca03952 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Aug 2019 20:15:40 +0200 Subject: [PATCH] stream: compact readable small element buffering When buffering lots of small elements the actual memory usage can unexpectedly and significantly exceed the configured highWaterMark. Try to compact the buffer when we estimate this is about to happen. Refs: #29310 --- lib/_stream_readable.js | 17 ++++++++++ .../test-stream-readable-hwm-0-small.js | 31 +++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 test/parallel/test-stream-readable-hwm-0-small.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index ac37930d122d91..fc8c2b95c1c9c0 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -320,6 +320,15 @@ function addChunk(stream, state, chunk, addToFront) { else state.buffer.push(chunk); + if (!state.objectMode && state.length < state.highWaterMark) { + // Compact buffer if it contains lots of small elements. + // We estimate that each buffer element is 16 bytes + // (i.e. 2 properties * 8 bytes each). + const bufferOverhead = state.buffer.length * 16; + if (state.length + bufferOverhead > state.highWaterMark) + compact(state); + } + if (state.needReadable) emitReadable(stream); } @@ -1131,6 +1140,14 @@ function fromList(n, state) { return ret; } +function compact({ buffer, decoder, objectMode, length }) { + if (!objectMode && buffer.length > 1) { + const v = decoder ? buffer.join('') : buffer.concat(length); + buffer.clear(); + buffer.push(v); + } +} + function endReadable(stream) { const state = stream._readableState; diff --git a/test/parallel/test-stream-readable-hwm-0-small.js b/test/parallel/test-stream-readable-hwm-0-small.js new file mode 100644 index 00000000000000..515dad2d480f6a --- /dev/null +++ b/test/parallel/test-stream-readable-hwm-0-small.js @@ -0,0 +1,31 @@ +'use strict'; + +const assert = require('assert'); +const { Readable } = require('stream'); + +{ + const r = new Readable({ + read: ()=> {}, + highWaterMark: 32, + }); + + r.push('a'); + r.push('b'); + r.push('c'); + + // buffer should be compacted. + assert.strictEqual(r._readableState.buffer.length, 1); +} + +{ + const r = new Readable({ + read: ()=> {}, + highWaterMark: 50, + }); + + r.push('a'); + r.push('b'); + + // buffer should not compacted. + assert.strictEqual(r._readableState.buffer.length, 2); +}