diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index ac37930d122d91..fc8c2b95c1c9c0 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -320,6 +320,15 @@ function addChunk(stream, state, chunk, addToFront) { else state.buffer.push(chunk); + if (!state.objectMode && state.length < state.highWaterMark) { + // Compact buffer if it contains lots of small elements. + // We estimate that each buffer element is 16 bytes + // (i.e. 2 properties * 8 bytes each). + const bufferOverhead = state.buffer.length * 16; + if (state.length + bufferOverhead > state.highWaterMark) + compact(state); + } + if (state.needReadable) emitReadable(stream); } @@ -1131,6 +1140,14 @@ function fromList(n, state) { return ret; } +function compact({ buffer, decoder, objectMode, length }) { + if (!objectMode && buffer.length > 1) { + const v = decoder ? buffer.join('') : buffer.concat(length); + buffer.clear(); + buffer.push(v); + } +} + function endReadable(stream) { const state = stream._readableState; diff --git a/test/parallel/test-stream-readable-hwm-0-small.js b/test/parallel/test-stream-readable-hwm-0-small.js new file mode 100644 index 00000000000000..515dad2d480f6a --- /dev/null +++ b/test/parallel/test-stream-readable-hwm-0-small.js @@ -0,0 +1,31 @@ +'use strict'; + +const assert = require('assert'); +const { Readable } = require('stream'); + +{ + const r = new Readable({ + read: ()=> {}, + highWaterMark: 32, + }); + + r.push('a'); + r.push('b'); + r.push('c'); + + // buffer should be compacted. + assert.strictEqual(r._readableState.buffer.length, 1); +} + +{ + const r = new Readable({ + read: ()=> {}, + highWaterMark: 50, + }); + + r.push('a'); + r.push('b'); + + // buffer should not compacted. + assert.strictEqual(r._readableState.buffer.length, 2); +}