From e0c9d4342fcb6a3beb27cdf16b101d8e4bc77fe0 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Sat, 3 Feb 2018 10:29:43 +0100 Subject: [PATCH] stream: defer readable and flow when sync PR-URL: https://github.com/nodejs/node/pull/18515 Reviewed-By: Matteo Collina Reviewed-By: James M Snell --- lib/_stream_readable.js | 17 +++-- test/parallel/test-stream-pipe-flow.js | 67 +++++++++++++++++++ .../test-stream-readable-pause-and-resume.js | 40 +++++++++++ 3 files changed, 119 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-stream-pipe-flow.js create mode 100644 test/parallel/test-stream-readable-pause-and-resume.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index dc2c587aed64df..dd483adb410498 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -486,11 +486,18 @@ function onEofChunk(stream, state) { } state.ended = true; - // emit 'readable' now to make sure it gets picked up. - state.needReadable = false; - if (!state.emittedReadable) { - state.emittedReadable = true; - emitReadable_(stream); + if (state.sync && state.length) { + // if we are sync and have data in the buffer, wait until next tick + // to emit the data. otherwise we risk emitting data in the flow() + // the readable code triggers during a read() call + emitReadable(stream); + } else { + // emit 'readable' now to make sure it gets picked up. + state.needReadable = false; + if (!state.emittedReadable) { + state.emittedReadable = true; + emitReadable_(stream); + } } } diff --git a/test/parallel/test-stream-pipe-flow.js b/test/parallel/test-stream-pipe-flow.js new file mode 100644 index 00000000000000..1f8564182a3107 --- /dev/null +++ b/test/parallel/test-stream-pipe-flow.js @@ -0,0 +1,67 @@ +'use strict'; +const common = require('../common'); +const { Readable, Writable, PassThrough } = require('stream'); + +{ + let ticks = 17; + + const rs = new Readable({ + objectMode: true, + read: () => { + if (ticks-- > 0) + return process.nextTick(() => rs.push({})); + rs.push({}); + rs.push(null); + } + }); + + const ws = new Writable({ + highWaterMark: 0, + objectMode: true, + write: (data, end, cb) => setImmediate(cb) + }); + + rs.on('end', common.mustCall()); + ws.on('finish', common.mustCall()); + rs.pipe(ws); +} + +{ + let missing = 8; + + const rs = new Readable({ + objectMode: true, + read: () => { + if (missing--) rs.push({}); + else rs.push(null); + } + }); + + const pt = rs + .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })) + .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })); + + pt.on('end', function() { + wrapper.push(null); + }); + + const wrapper = new Readable({ + objectMode: true, + read: () => { + process.nextTick(function() { + let data = pt.read(); + if (data === null) { + pt.once('readable', function() { + data = pt.read(); + if (data !== null) wrapper.push(data); + }); + } else { + wrapper.push(data); + } + }); + } + }); + + wrapper.resume(); + wrapper.on('end', common.mustCall()); +} diff --git a/test/parallel/test-stream-readable-pause-and-resume.js b/test/parallel/test-stream-readable-pause-and-resume.js new file mode 100644 index 00000000000000..505327e247da38 --- /dev/null +++ b/test/parallel/test-stream-readable-pause-and-resume.js @@ -0,0 +1,40 @@ +'use strict'; + +const { Readable } = require('stream'); +const common = require('../common'); + +let ticks = 18; +let expectedData = 19; + +const rs = new Readable({ + objectMode: true, + read: () => { + if (ticks-- > 0) + return process.nextTick(() => rs.push({})); + rs.push({}); + rs.push(null); + } +}); + +rs.on('end', common.mustCall()); +readAndPause(); + +function readAndPause() { + // Does a on(data) -> pause -> wait -> resume -> on(data) ... loop. + // Expects on(data) to never fire if the stream is paused. + const ondata = common.mustCall((data) => { + rs.pause(); + + expectedData--; + if (expectedData <= 0) + return; + + setImmediate(function() { + rs.removeListener('data', ondata); + readAndPause(); + rs.resume(); + }); + }, 1); // only call ondata once + + rs.on('data', ondata); +}