diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 917c26f2c7659c1..e37792486b4a2f5 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -331,24 +331,24 @@ function pipe(src, dst) { src .on('end', end) .on('readable', pump) - .on('error', cleanup); + .on('error', done); dst .on('drain', pump) - .on('error', cleanup); + .on('error', done); - function cleanup() { + function done() { src .off('end', end) .off('readable', pump) - .off('error', cleanup); + .off('error', done); dst .off('drain', pump) - .off('error', cleanup); + .off('error', done); } function end() { dst.end(); - cleanup(); + done(); } const objectMode = ( diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 7e609e02b1a5ae4..fa5402c2c8669ea 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -215,10 +215,9 @@ const tsp = require('timers/promises'); let sent = 0; const rs = new Readable({ read() { - if (sent++ > 10) { - return; - } - rs.push('hello'); + setImmediate(() => { + rs.push('hello'); + }); }, destroy: common.mustCall((err, cb) => { cb();